“SpringBoot2.0不容错过的新特性 WebFlux响应式编程”学习笔记(一)


WebFlux响应式编程好记性不如烂笔头
简书地址

第五节 webflux服务端开发讲解

  • 5.1 异步servlet
    1、为什么要使用 异步servlet?
    回答:同步servlet一直占用服务器tomcat的线程数,是tomcat无暇接受更多的请求。因此有必要使用异步servlet。
    2、同步servlet阻塞了什么?
    也就是说同步servlet其实是阻塞了tomcat的servlet线程。
    3、异步servlet是怎么工作的?
    先看一个同步servlet的例子:
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * Servlet implementation class SyncServlet
 */
@WebServlet("/SyncServlet")
public class SyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public SyncServlet() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 执行业务代码
        doSomeThing(request, response);

        System.out.println("sync use:" + (System.currentTimeMillis() - t1));
    }

    private void doSomeThing(HttpServletRequest request,
            HttpServletResponse response) throws IOException {

        // 模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
        }

        //
        response.getWriter().append("done");
    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
        doGet(request, response);
    }
}

再模拟一个异步servlet的例子:

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

/**
 * Servlet implementation class AsyncServlet
 */
@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
    private static final long serialVersionUID = 1L;

    /**
     * @see HttpServlet#HttpServlet()
     */
    public AsyncServlet() {
        super();
        // TODO Auto-generated constructor stub
    }

    /**
     * @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doGet(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        long t1 = System.currentTimeMillis();

        // 开启异步
        AsyncContext asyncContext = request.startAsync();

        // 执行业务代码
        CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
                asyncContext.getRequest(), asyncContext.getResponse()));

        System.out.println("async use:" + (System.currentTimeMillis() - t1));
    }

    private void doSomeThing(AsyncContext asyncContext,
            ServletRequest servletRequest, ServletResponse servletResponse) {

        // 模拟耗时操作
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
        }

        //
        try {
            servletResponse.getWriter().append("done");
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 业务代码处理完毕, 通知结束
        asyncContext.complete();
    }

    /**
     * @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
     *      response)
     */
    protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
        // TODO Auto-generated method stub
        doGet(request, response);
    }
}

前后两次在浏览器上访问http://localhost:8080/SyncServlet和http://localhost:8080/AsyncServlet,在前台看都是大约5s返回。
但是查看服务端控制台:

sync use:5002
async use:16

可以看到,同步是5s,异步的是16ms返回了,这样tomcat就又有时间处理别的业务了,也就是提高了服务器的并发。

  • 5.2 Mono和Flux返回流接口
    reactor = java8的流 + java9的响应式流
import java.util.concurrent.TimeUnit;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import reactor.core.publisher.Flux;

public class ReactorDemo {

    public static void main(String[] args) {
        // reactor = jdk8 stream + jdk9 reactive stream
        // Mono 0-1个元素
        // Flux 0-N个元素
        String[] strs = { "1", "2", "3" };

        // 2. 定义订阅者
        Subscriber<Integer> subscriber = new Subscriber<Integer>() &#123;

            private Subscription subscription;

            @Override
            public void onSubscribe(Subscription subscription) &#123;
                // 保存订阅关系, 需要用它来给发布者响应
                this.subscription = subscription;

                // 请求一个数据
                this.subscription.request(1);
            &#125;

            @Override
            public void onNext(Integer item) &#123;
                // 接受到一个数据, 处理
                System.out.println("接受到数据: " + item);

                try &#123;
                    TimeUnit.SECONDS.sleep(3);
                &#125; catch (InterruptedException e) &#123;
                    e.printStackTrace();
                &#125;
                
                // 处理完调用request再请求一个数据
                this.subscription.request(1);

                // 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
                // this.subscription.cancel();
            &#125;

            @Override
            public void onError(Throwable throwable) &#123;
                // 出现了异常(例如处理数据的时候产生了异常)
                throwable.printStackTrace();

                // 我们可以告诉发布者, 后面不接受数据了
                this.subscription.cancel();
            &#125;

            @Override
            public void onComplete() &#123;
                // 全部数据处理完了(发布者关闭了)
                System.out.println("处理完了!");
            &#125;

        &#125;;
        
        // 这里就是jdk8的stream
        Flux.fromArray(strs).map(s -> Integer.parseInt(s))
        // 最终操作
        // 这里就是jdk9的reactive stream
        .subscribe(subscriber);

    &#125;
&#125;
  • 5.2 完整的fluxweb
    步骤:
    1、添加依赖
    https://start.spring.io/上选择Reactive MongoDB这个dependencies,按下【Genarate Project】按钮,下载demo.zip,查看mongodb的依赖,加入到pom文件中。
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

<!-- lombok -->
<dependency>
     <groupId>org.projectlombok</groupId>
     <artifactId>lombok</artifactId>
     <scope>provided</scope>
</dependency>

2、启动类application上添加mongodb支持注解@EnableReactiveMongoRepositories
3、定义对象

@Document(collection = "user")
@Data
public class User &#123;
    @Id
    private String id;
    private String name;
    private int age;
&#125;

4、定义仓库repository
创建UserRepository接口,继承ReactiveMongoRepository,并使用@Repository注解。
5、定义controller

@RestController
@RequestMapping("/user")
public class UserController &#123;

    private final UserRepository userRepository;

    //官方推荐使用构造函数的方式注入依赖的bean
    public UserController(UserRepository userRepository) &#123;
        this.userRepository = userRepository;
    &#125;

    @GetMapping("/")
    public Flux<User> findAll() &#123;
        return userRepository.findAll();
    &#125;

    @GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<User> findAllByStream() &#123;
        return userRepository.findAll();
    &#125;
&#125;

6、本地安装配置mongodb
下载mongodb,安装配置,根据平台自行查询,我的windows平台,参考的https://www.cnblogs.com/weschen/p/8213746.html

选择custom修改安装位置

选择数据库和log位置

next

next

执行mongod --dbpath D:\MongoDB\data\db --logpath=D:\MongoDB\log\mongodb.log --logappend启动mongodb

启动mongodb

命令行链接

测试启动

7、mongodb数据库配置mongodb,在application.properties里添加如下配置

spring.data.mongodb.uri=mongodb://localhost:27017/webflux

8、启动项目并用客户端访问验证
http://127.0.0.1:8080/user

http://127.0.0.1:8080/user/stream/all

后续加入CURD的普通接口和stream接口。
添加validation
参数check,统一exception抛出及切面拦截。
具体代码参考
全局异常:

import lombok.Data;

/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author 马海强
 * @create 2018-07-10 22:54
 * @desc $&#123;DESCRIPTION&#125;
 **/
@Data
public class CheckException extends RuntimeException &#123;
    private static final long serialVersionUID = 1L;

    /**
     * 出错字段的名字
     */
    private String fieldName;

    /**
     * 出错字段的值
     */
    private String fieldValue;

    public CheckException() &#123;
        super();
    &#125;

    public CheckException(String message, Throwable cause,
            boolean enableSuppression, boolean writableStackTrace) &#123;
        super(message, cause, enableSuppression, writableStackTrace);
    &#125;

    public CheckException(String message, Throwable cause) &#123;
        super(message, cause);
    &#125;

    public CheckException(String message) &#123;
        super(message);
    &#125;

    public CheckException(Throwable cause) &#123;
        super(cause);
    &#125;

    public CheckException(String fieldName, String fieldValue) &#123;
        super();
        this.fieldName = fieldName;
        this.fieldValue = fieldValue;
    &#125;
&#125;

参数检查公用方法:

import com.marvin.springboot.webflux.demo.exception.CheckException;

import java.util.stream.Stream;

/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author 马海强
 * @create 2018-07-10 22:53
 * @desc tool
 **/
public class CheckUtil &#123;

    private static final String[] INVALID_NAMES = &#123;"admin", "guanliyuan"&#125;;

    /**
     * 用户名校验,不成功时抛出异常
     * @param value
     */
    public static void checkUserName(String value) &#123;
        Stream.of(INVALID_NAMES).filter(name -> name.equalsIgnoreCase(value))
                .findAny().ifPresent(name -> &#123;
                    throw new CheckException(name, value);
        &#125;);
    &#125;
&#125;

异常拦截切面:

import com.marvin.springboot.webflux.demo.exception.CheckException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.support.WebExchangeBindException;

/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author 马海强
 * @create 2018-07-10 22:43
 * @desc 异常处理切面
 **/
@ControllerAdvice
public class CheckAdvice &#123;

    @ExceptionHandler(WebExchangeBindException.class)
    public ResponseEntity handleBindException(WebExchangeBindException e) &#123;
        return new ResponseEntity<String>(toStr(e), HttpStatus.BAD_REQUEST);

    &#125;

    @ExceptionHandler(CheckException.class)
    public ResponseEntity handleBindException(CheckException e) &#123;
        return new ResponseEntity<String>(toStr(e), HttpStatus.BAD_REQUEST);

    &#125;

    /**
     * 把校验异常转换成字符串
     * @param ex
     * @return
     */
    private String toStr(WebExchangeBindException ex) &#123;
        return ex.getFieldErrors().stream().map(e -> e.getField() + ":" + e.getDefaultMessage()).reduce("", (s1, s2) -> s1 + "\n" + s2);
    &#125;

    /**
     * 把校验异常转换成字符串
     * @param ex
     * @return
     */
    private String toStr(CheckException ex) &#123;
        return ex.getFieldName() + ":错误的值 " + ex.getFieldValue();
    &#125;
&#125;

controller层加校验:

 /**
     * 修改数据
     * 存在的时候返回200,数据不存在的话返回404
     */
    @PutMapping("/&#123;id&#125;")
    public Mono<ResponseEntity<User>> updateUser(@PathVariable("id") String id, @Valid @RequestBody User user) &#123;
        CheckUtil.checkUserName(user.getName());
        return this.userRepository.findById(id)
                //flatMat: 操作数据
            .flatMap(u -> &#123;
                u.setAge(user.getAge());
                u.setName(user.getName());
                return this.userRepository.save(u);
            &#125;)
                //map:转换数据
            .map(u -> new ResponseEntity<>(u, HttpStatus.OK)).defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
    &#125;

测试:
参数校验

  • 5.3 RouterFunction模式
    实现基本的增删改查
    除了上一节的User实体类,和UserRepository接口类两个,新建一个UserHandler类,内容如下:
import com.marvin.springboot.webfluxrouter.router.domain.User;
import com.marvin.springboot.webfluxrouter.router.repository.UserRepository;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import javax.annotation.Resource;

/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author 马海强
 * @create 2018-07-26 22:32
 * @desc UserHandler 
 **/

@Component
public class UserHandler &#123;

    @Resource
    private UserRepository userRepository;

    /**
     * 得到所有用户
     * @param request
     * @return
     */
    public Mono<ServerResponse> getAllUser(ServerRequest request) &#123;
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8).body(userRepository.findAll(), User.class);
    &#125;

    /**
     * 创建用户
     * @param request
     * @return
     */
    public Mono<ServerResponse> createUser(ServerRequest request) &#123;
        Mono<User> user = request.bodyToMono(User.class);
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8).body(userRepository.saveAll(user), User.class);
    &#125;

    /**
     * 根据id删除用户
     * @param request
     * @return
     */
    public Mono<ServerResponse> deleteUserById(ServerRequest request) &#123;
        String id = request.pathVariable("id");
        return this.userRepository.findById(id).flatMap(user ->
            this.userRepository.delete(user).then(ServerResponse.ok().build())).switchIfEmpty(ServerResponse.notFound().build());

    &#125;
&#125;

在创建一个AllRouters类,并且类似通常写的RequestMapping,处理URL路由,内容如下:

import com.marvin.springboot.webfluxrouter.router.handler.UserHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;

/**
 * 精诚所至,金石为开。
 * 石の上にも三年;陽気の発する所金石亦透る。
 * Faith moves mountains.
 *
 * @author 马海强
 * @create 2018-07-26 22:45
 * @desc 路由
 **/
@Configuration
public class AllRouters &#123;

    @Bean
    RouterFunction<ServerResponse> userRouter(UserHandler handler) &#123;
        return RouterFunctions.nest(
          //相当于类上面的@RequestMapping("/user")
          RequestPredicates.path("/user"),
                //相当于方法上面的@RequestMapping,获取所有用户
                RouterFunctions.route(RequestPredicates.GET(""), handler::getAllUser)
                //添加用户
                .andRoute(RequestPredicates.POST("").and(RequestPredicates.accept(MediaType.APPLICATION_JSON_UTF8)), handler::createUser)
                //删除用户
                .andRoute(RequestPredicates.DELETE("/&#123;id&#125;"), handler::deleteUserById)

        );
    &#125;
&#125;

运行程序,并且添加,列表,删除接口,完美~~~

实现数据校验功能
仍然使用上一节中的CheckExceptionCheckUtil两个类。
修改UserHander:



评论
  目录