曹耘豪的博客

Spring Flux入门

  1. 依赖引入
  2. Controller实战
    1. 返回body增强
    2. 全局异常处理
  3. 使用Mybatis等不支持非阻塞编程的组件

依赖引入

一个简答的flux应用,只需引入一个依赖即可

1
implementation "org.springframework.boot:spring-boot-starter-webflux"

Controller实战

和之前的写法一样,唯一的区别是Controller的返回值为Mono

1
2
3
4
@GetMapping
public Mono<String> get() {
return Mono.just("123");
}

返回body增强

之前使用ResponseBodyAdvice来统一响应格式,在flux里使用ResponseBodyResultHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
@Slf4j
@Component
public class RpcResponseBodyResultHandler extends ResponseBodyResultHandler {

public RpcResponseBodyResultHandler(ServerCodecConfigurer serverCodecConfigurer, RequestedContentTypeResolver resolver) {
// 这里注意传serverCodecConfigurer
super(serverCodecConfigurer.getWriters(), resolver);
}

@SneakyThrows
@Override
public Mono<Void> handleResult(ServerWebExchange exchange, HandlerResult result) {
Object returnValue = result.getReturnValue();
Object body;
if (returnValue != null && Mono.class.isAssignableFrom(returnValue.getClass())) {
log.info("返回值类型是Mono");
body = ((Mono<?>) returnValue)
.map(RpcResponseBodyResultHandler::ok) // 注意,如果Mono内的值是null,则不会执行
.onErrorResume((e) -> {
// 在Mono内抛出的异常不会被全局异常捕获,所以在这里处理
Object b = RpcGlobalErrorHandler.buildBody(e, exchange.getRequest());
return Mono.just(b);
});
} else {
// 针对Controller内方法返回值不为Mono类型的
log.info("返回值类型是原始类型");
body = Mono.just(ok(returnValue));
}

return writeBody(body, METHOD_PARAMETER_MONO_COMMON_RESULT, exchange);
}

private static final MethodParameter METHOD_PARAMETER_MONO_COMMON_RESULT;

static {
try {
METHOD_PARAMETER_MONO_COMMON_RESULT = new MethodParameter(RpcResponseBodyResultHandler.class.getDeclaredMethod("methodForParams"), -1);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static Mono<ResponseEntity<?>> methodForParams() {
return null;
}

public static Object ok(Object o) {
if (o instanceof RpcResponse) {
return o;
}
if (o == NULL) { // 特殊处理null
return RpcResponse.success();
}
return RpcResponse.success(o);
}

public static final Object NULL = new Object[0];

}

如果接口需要返回null,则需这么写

1
2
3
4
@GetMapping
public Mono<Object> get() {
return Mono.just(RpcResponseBodyResultHandler.NULL);
}

全局异常处理

Mono内抛出的异常会包含在其内部,类似CompletableFuture,使用onErrorResume等方法处理,在其他地方抛出的依然由ExceptionHandler处理

传统IO的处理代码如下:

1
2
3
4
5
@ExceptionHandler(Throwable.class)
@ResponseStatus(code = HttpStatus.INTERNAL_SERVER_ERROR)
public Object handleError(Throwable e, HttpServletRequest httpServletRequest) {
// ...
}

HttpServletRequest是阻塞IO模型,其在reactive的概念是ServerHttpRequest,所以写法如下:

1
2
3
4
5
@ExceptionHandler(Throwable.class)
@ResponseStatus(code = HttpStatus.INTERNAL_SERVER_ERROR)
public Object handleError(Throwable e, ServerHttpRequest request) {
// ...
}

使用Mybatis等不支持非阻塞编程的组件

1
2
3
4
5
6
7
Mono.fromCallable(() -> {
mybatisMapper.insert(entity);
return (Object) status;
}).subscribeOn(Schedulers.boundedElastic());

// 或者指定线程池运行
.subscribeOn(Schedulers.fromExecutor(...));
   / 
  ,