依赖引入
一个简答的flux应用,只需引入一个依赖即可
1
| implementation "org.springframework.boot:spring-boot-starter-webflux"
|
Controller实战
和之前的写法一样,唯一的区别是Controller的返回值为Mono
1 2 3 4
| @GetMapping public Mono<String> get() { return Mono.just("123"); }
|
返回body增强
之前使用ResponseBodyAdvice来统一响应格式,在flux里使用ResponseBodyResultHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| @Slf4j @Component public class RpcResponseBodyResultHandler extends ResponseBodyResultHandler {
public RpcResponseBodyResultHandler(ServerCodecConfigurer serverCodecConfigurer, RequestedContentTypeResolver resolver) { super(serverCodecConfigurer.getWriters(), resolver); }
@SneakyThrows @Override public Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) { Object returnValue = result.getReturnValue(); Object body; if (returnValue != null && Mono.class.isAssignableFrom(returnValue.getClass())) { log.info("返回值类型是Mono"); body = ((Mono<?>) returnValue) .map(RpcResponseBodyResultHandler::ok) .onErrorResume((e) -> { Object b = RpcGlobalErrorHandler.buildBody(e, exchange.getRequest()); return Mono.just(b); }); } else { log.info("返回值类型是原始类型"); body = Mono.just(ok(returnValue)); }
return writeBody(body, METHOD_PARAMETER_MONO_COMMON_RESULT, exchange); }
private static final MethodParameter METHOD_PARAMETER_MONO_COMMON_RESULT;
static { try { METHOD_PARAMETER_MONO_COMMON_RESULT = new MethodParameter(RpcResponseBodyResultHandler.class.getDeclaredMethod("methodForParams"), -1); } catch (Exception e) { throw new RuntimeException(e); } }
private static Mono<ResponseEntity<?>> methodForParams() { return null; }
public static Object ok(Object o) { if (o instanceof RpcResponse) { return o; } if (o == NULL) { return RpcResponse.success(); } return RpcResponse.success(o); }
public static final Object NULL = new Object[0];
}
|
如果接口需要返回null,则需这么写
1 2 3 4
| @GetMapping public Mono<Object> get() { return Mono.just(RpcResponseBodyResultHandler.NULL); }
|
全局异常处理
在Mono内抛出的异常会包含在其内部,类似CompletableFuture,使用onErrorResume等方法处理,在其他地方抛出的依然由ExceptionHandler处理
传统IO的处理代码如下:
1 2 3 4 5
| @ExceptionHandler(Throwable.class) @ResponseStatus(code = HttpStatus.INTERNAL_SERVER_ERROR) public Object handleError(Throwable e, HttpServletRequest httpServletRequest) { }
|
但HttpServletRequest是阻塞IO模型,其在reactive的概念是ServerHttpRequest,所以写法如下:
1 2 3 4 5
| @ExceptionHandler(Throwable.class) @ResponseStatus(code = HttpStatus.INTERNAL_SERVER_ERROR) public Object handleError(Throwable e, ServerHttpRequest request) { }
|
使用Mybatis等不支持非阻塞编程的组件
1 2 3 4 5 6 7
| Mono.fromCallable(() -> { mybatisMapper.insert(entity); return (Object) status; }).subscribeOn(Schedulers.boundedElastic());
.subscribeOn(Schedulers.fromExecutor(...));
|