WebFlux响应式编程好记性不如烂笔头
简书地址
第五节 webflux服务端开发讲解
- 5.1 异步servlet
1、为什么要使用 异步servlet?
回答:同步servlet一直占用服务器tomcat的线程数,是tomcat无暇接受更多的请求。因此有必要使用异步servlet。
2、同步servlet阻塞了什么?
也就是说同步servlet其实是阻塞了tomcat的servlet线程。
3、异步servlet是怎么工作的?
先看一个同步servlet的例子:
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Servlet implementation class SyncServlet
*/
@WebServlet("/SyncServlet")
public class SyncServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#HttpServlet()
*/
public SyncServlet() {
super();
// TODO Auto-generated constructor stub
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
long t1 = System.currentTimeMillis();
// 执行业务代码
doSomeThing(request, response);
System.out.println("sync use:" + (System.currentTimeMillis() - t1));
}
private void doSomeThing(HttpServletRequest request,
HttpServletResponse response) throws IOException {
// 模拟耗时操作
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
//
response.getWriter().append("done");
}
/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
// TODO Auto-generated method stub
doGet(request, response);
}
}
再模拟一个异步servlet的例子:
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Servlet implementation class AsyncServlet
*/
@WebServlet(asyncSupported = true, urlPatterns = { "/AsyncServlet" })
public class AsyncServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
/**
* @see HttpServlet#HttpServlet()
*/
public AsyncServlet() {
super();
// TODO Auto-generated constructor stub
}
/**
* @see HttpServlet#doGet(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doGet(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
long t1 = System.currentTimeMillis();
// 开启异步
AsyncContext asyncContext = request.startAsync();
// 执行业务代码
CompletableFuture.runAsync(() -> doSomeThing(asyncContext,
asyncContext.getRequest(), asyncContext.getResponse()));
System.out.println("async use:" + (System.currentTimeMillis() - t1));
}
private void doSomeThing(AsyncContext asyncContext,
ServletRequest servletRequest, ServletResponse servletResponse) {
// 模拟耗时操作
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
//
try {
servletResponse.getWriter().append("done");
} catch (IOException e) {
e.printStackTrace();
}
// 业务代码处理完毕, 通知结束
asyncContext.complete();
}
/**
* @see HttpServlet#doPost(HttpServletRequest request, HttpServletResponse
* response)
*/
protected void doPost(HttpServletRequest request,
HttpServletResponse response) throws ServletException, IOException {
// TODO Auto-generated method stub
doGet(request, response);
}
}
前后两次在浏览器上访问http://localhost:8080/SyncServlet和http://localhost:8080/AsyncServlet,在前台看都是大约5s返回。
但是查看服务端控制台:
sync use:5002
async use:16
可以看到,同步是5s,异步的是16ms返回了,这样tomcat就又有时间处理别的业务了,也就是提高了服务器的并发。
- 5.2 Mono和Flux返回流接口
reactor = java8的流 + java9的响应式流
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
public class ReactorDemo {
public static void main(String[] args) {
// reactor = jdk8 stream + jdk9 reactive stream
// Mono 0-1个元素
// Flux 0-N个元素
String[] strs = { "1", "2", "3" };
// 2. 定义订阅者
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
// 保存订阅关系, 需要用它来给发布者响应
this.subscription = subscription;
// 请求一个数据
this.subscription.request(1);
}
@Override
public void onNext(Integer item) {
// 接受到一个数据, 处理
System.out.println("接受到数据: " + item);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完调用request再请求一个数据
this.subscription.request(1);
// 或者 已经达到了目标, 调用cancel告诉发布者不再接受数据了
// this.subscription.cancel();
}
@Override
public void onError(Throwable throwable) {
// 出现了异常(例如处理数据的时候产生了异常)
throwable.printStackTrace();
// 我们可以告诉发布者, 后面不接受数据了
this.subscription.cancel();
}
@Override
public void onComplete() {
// 全部数据处理完了(发布者关闭了)
System.out.println("处理完了!");
}
};
// 这里就是jdk8的stream
Flux.fromArray(strs).map(s -> Integer.parseInt(s))
// 最终操作
// 这里就是jdk9的reactive stream
.subscribe(subscriber);
}
}
- 5.2 完整的fluxweb
步骤:
1、添加依赖
在https://start.spring.io/上选择Reactive MongoDB这个dependencies,按下【Genarate Project】按钮,下载demo.zip,查看mongodb的依赖,加入到pom文件中。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
2、启动类application上添加mongodb支持注解@EnableReactiveMongoRepositories
3、定义对象
@Document(collection = "user")
@Data
public class User {
@Id
private String id;
private String name;
private int age;
}
4、定义仓库repository
创建UserRepository接口,继承ReactiveMongoRepository
,并使用@Repository
注解。
5、定义controller
@RestController
@RequestMapping("/user")
public class UserController {
private final UserRepository userRepository;
//官方推荐使用构造函数的方式注入依赖的bean
public UserController(UserRepository userRepository) {
this.userRepository = userRepository;
}
@GetMapping("/")
public Flux<User> findAll() {
return userRepository.findAll();
}
@GetMapping(value = "/stream/all", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<User> findAllByStream() {
return userRepository.findAll();
}
}
6、本地安装配置mongodb
先下载mongodb,安装配置,根据平台自行查询,我的windows平台,参考的https://www.cnblogs.com/weschen/p/8213746.html
执行mongod --dbpath D:\MongoDB\data\db --logpath=D:\MongoDB\log\mongodb.log --logappend
启动mongodb
7、mongodb数据库配置mongodb,在application.properties里添加如下配置
spring.data.mongodb.uri=mongodb://localhost:27017/webflux
8、启动项目并用客户端访问验证
后续加入CURD的普通接口和stream接口。
添加validation
参数check,统一exception抛出及切面拦截。
具体代码参考
全局异常:
import lombok.Data;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author 马海强
* @create 2018-07-10 22:54
* @desc ${DESCRIPTION}
**/
@Data
public class CheckException extends RuntimeException {
private static final long serialVersionUID = 1L;
/**
* 出错字段的名字
*/
private String fieldName;
/**
* 出错字段的值
*/
private String fieldValue;
public CheckException() {
super();
}
public CheckException(String message, Throwable cause,
boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public CheckException(String message, Throwable cause) {
super(message, cause);
}
public CheckException(String message) {
super(message);
}
public CheckException(Throwable cause) {
super(cause);
}
public CheckException(String fieldName, String fieldValue) {
super();
this.fieldName = fieldName;
this.fieldValue = fieldValue;
}
}
参数检查公用方法:
import com.marvin.springboot.webflux.demo.exception.CheckException;
import java.util.stream.Stream;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author 马海强
* @create 2018-07-10 22:53
* @desc tool
**/
public class CheckUtil {
private static final String[] INVALID_NAMES = {"admin", "guanliyuan"};
/**
* 用户名校验,不成功时抛出异常
* @param value
*/
public static void checkUserName(String value) {
Stream.of(INVALID_NAMES).filter(name -> name.equalsIgnoreCase(value))
.findAny().ifPresent(name -> {
throw new CheckException(name, value);
});
}
}
异常拦截切面:
import com.marvin.springboot.webflux.demo.exception.CheckException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.support.WebExchangeBindException;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author 马海强
* @create 2018-07-10 22:43
* @desc 异常处理切面
**/
@ControllerAdvice
public class CheckAdvice {
@ExceptionHandler(WebExchangeBindException.class)
public ResponseEntity handleBindException(WebExchangeBindException e) {
return new ResponseEntity<String>(toStr(e), HttpStatus.BAD_REQUEST);
}
@ExceptionHandler(CheckException.class)
public ResponseEntity handleBindException(CheckException e) {
return new ResponseEntity<String>(toStr(e), HttpStatus.BAD_REQUEST);
}
/**
* 把校验异常转换成字符串
* @param ex
* @return
*/
private String toStr(WebExchangeBindException ex) {
return ex.getFieldErrors().stream().map(e -> e.getField() + ":" + e.getDefaultMessage()).reduce("", (s1, s2) -> s1 + "\n" + s2);
}
/**
* 把校验异常转换成字符串
* @param ex
* @return
*/
private String toStr(CheckException ex) {
return ex.getFieldName() + ":错误的值 " + ex.getFieldValue();
}
}
controller层加校验:
/**
* 修改数据
* 存在的时候返回200,数据不存在的话返回404
*/
@PutMapping("/{id}")
public Mono<ResponseEntity<User>> updateUser(@PathVariable("id") String id, @Valid @RequestBody User user) {
CheckUtil.checkUserName(user.getName());
return this.userRepository.findById(id)
//flatMat: 操作数据
.flatMap(u -> {
u.setAge(user.getAge());
u.setName(user.getName());
return this.userRepository.save(u);
})
//map:转换数据
.map(u -> new ResponseEntity<>(u, HttpStatus.OK)).defaultIfEmpty(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
测试:
- 5.3 RouterFunction模式
实现基本的增删改查
除了上一节的User实体类,和UserRepository接口类两个,新建一个UserHandler类,内容如下:
import com.marvin.springboot.webfluxrouter.router.domain.User;
import com.marvin.springboot.webfluxrouter.router.repository.UserRepository;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author 马海强
* @create 2018-07-26 22:32
* @desc UserHandler
**/
@Component
public class UserHandler {
@Resource
private UserRepository userRepository;
/**
* 得到所有用户
* @param request
* @return
*/
public Mono<ServerResponse> getAllUser(ServerRequest request) {
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8).body(userRepository.findAll(), User.class);
}
/**
* 创建用户
* @param request
* @return
*/
public Mono<ServerResponse> createUser(ServerRequest request) {
Mono<User> user = request.bodyToMono(User.class);
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON_UTF8).body(userRepository.saveAll(user), User.class);
}
/**
* 根据id删除用户
* @param request
* @return
*/
public Mono<ServerResponse> deleteUserById(ServerRequest request) {
String id = request.pathVariable("id");
return this.userRepository.findById(id).flatMap(user ->
this.userRepository.delete(user).then(ServerResponse.ok().build())).switchIfEmpty(ServerResponse.notFound().build());
}
}
在创建一个AllRouters类,并且类似通常写的RequestMapping,处理URL路由,内容如下:
import com.marvin.springboot.webfluxrouter.router.handler.UserHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
/**
* 精诚所至,金石为开。
* 石の上にも三年;陽気の発する所金石亦透る。
* Faith moves mountains.
*
* @author 马海强
* @create 2018-07-26 22:45
* @desc 路由
**/
@Configuration
public class AllRouters {
@Bean
RouterFunction<ServerResponse> userRouter(UserHandler handler) {
return RouterFunctions.nest(
//相当于类上面的@RequestMapping("/user")
RequestPredicates.path("/user"),
//相当于方法上面的@RequestMapping,获取所有用户
RouterFunctions.route(RequestPredicates.GET(""), handler::getAllUser)
//添加用户
.andRoute(RequestPredicates.POST("").and(RequestPredicates.accept(MediaType.APPLICATION_JSON_UTF8)), handler::createUser)
//删除用户
.andRoute(RequestPredicates.DELETE("/{id}"), handler::deleteUserById)
);
}
}
运行程序,并且添加,列表,删除接口,完美~~~
实现数据校验功能
仍然使用上一节中的CheckException
和CheckUtil
两个类。
修改UserHander: