因为在项目中涉及到类似于网关的增加路由的操作,所以需要在WebFlux中动态的添加接口。
本文WebFlux版本为2.7.0,导包:
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.7.0</version>
</dependency>
WebFlux路由源码分析
WebFlux中由WebHandler处理Web请求
public interface WebHandler {
/**
* Handle the web server exchange.
* @param exchange the current server exchange
* @return {@code Mono<Void>} to indicate when request handling is complete
*/
Mono<Void> handle(ServerWebExchange exchange);
}
DispatcherHandler实现WebHandler,其提供了链式处理能力,可以组合异常处理器(WebExceptionHandler)、过滤器(WebFilter)以及目标。
public class DispatcherHandler implements WebHandler, PreFlightRequestHandler, ApplicationContextAware {
@Nullable
private List<HandlerMapping> handlerMappings; // 请求和处理程序对象之间的映射
......
@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) { // 是否有请求映射存在
return createNotFoundError();
}
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) { // 判断请求是否为CORS pre-flight
return handlePreFlight(exchange);
}
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange)) // 取单个mapping执行getHandler
.next()
.switchIfEmpty(createNotFoundError())
.flatMap(handler -> invokeHandler(exchange, handler))
.flatMap(result -> handleResult(exchange, result));
}
......
}
getHandler是接口HandlerMapping中用于返回此请求的处理程序,我们需要关注其实现类AbstractHandlerMapping,如下
public abstract class AbstractHandlerMapping extends ApplicationObjectSupport implements HandlerMapping, Ordered, BeanNameAware {
......
@Override
public Mono<Object> getHandler(ServerWebExchange exchange) {
return getHandlerInternal(exchange).map(handler -> { // 主要关注getHandlerInternal,为一个抽象方法,用于查找给定请求的处理程序
if (logger.isDebugEnabled()) { // 日志
logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
}
ServerHttpRequest request = exchange.getRequest();
// CORS pre-flight验证
if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
CorsConfiguration config = (this.corsConfigurationSource != null ?
this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
config = (config != null ? config.combine(handlerConfig) : handlerConfig);
if (config != null) {
config.validateAllowCredentials();
}
if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
return NO_OP_HANDLER;
}
}
return handler;
});
}
......
}
最后到最重要的RouterFunctionMapping类,其继承了AbstractHandlerMapping,getHandlerInternal实现如下
@Override
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
if (this.routerFunction != null) {
ServerRequest request = ServerRequest.create(exchange, this.messageReaders);
// 在routerFunction中查找跟request匹配的处理函数
return this.routerFunction.route(request)
.doOnNext(handler -> setAttributes(exchange.getAttributes(), request, handler));
}
else {
return Mono.empty();
}
}
所以可以发现其实我们想动态增加路由,只需要修改将新的路由添加到routerFunction中即可,主要关注两个方法,如下
public class RouterFunctionMapping extends AbstractHandlerMapping implements InitializingBean {
@Nullable
private RouterFunction<?> routerFunction;
......
// 该方法用于初始化routerFuncitons,调用了routerFunctions方法
protected void initRouterFunctions() {
List<RouterFunction<?>> routerFunctions = routerFunctions();
this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
logRouterFunctions(routerFunctions);
}
private List<RouterFunction<?>> routerFunctions() {
List<RouterFunction<?>> functions = obtainApplicationContext() // 获取上下文
.getBeanProvider(RouterFunction.class) // 获取所有RouterFunciton的bean
.orderedStream()
.map(router -> (RouterFunction<?>)router)
.collect(Collectors.toList());
return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList());
}
......
]
所以我们只需要将新的路由构建成RouterFunction然后注册一个bean,最后调用initRouterFunctions重新初始化routerFunction即可。
实现
注册路由
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.server.support.RouterFunctionMapping;
import javax.annotation.Nonnull;
import javax.annotation.Resource;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Component
public class RegisterRouter implements ApplicationContextAware {
@Resource
private RouterBeanFactory routerBeanFactory;
private ApplicationContext applicationContext;
/**
* 注册新路由
*
* @param url 路由地址
* @throws NoSuchMethodException getDeclaredMethod时 如果找不到匹配的方法
* @throws InvocationTargetException invoke时 如果底层方法抛出异常
* @throws IllegalAccessException invoke时 如果此Method对象正在执行 Java 语言访问控制并且底层方法不可访问。
*/
public void registerRouter(String url) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// 访问新路由时的调用
RouterFunction<ServerResponse> appRouter = route(POST(url), request -> {
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body("webflux", String.class);
});
// 路由的Bean注册
routerBeanFactory.registerBean("bean" + url, appRouter);
// 重新初始化路由
RouterFunctionMapping routerFunctionMapping = (RouterFunctionMapping) applicationContext.getBean("routerFunctionMapping");
Class<? extends RouterFunctionMapping> aClass = routerFunctionMapping.getClass();
Method initRouterFunctions = aClass.getDeclaredMethod("initRouterFunctions");
initRouterFunctions.setAccessible(true);
initRouterFunctions.invoke(routerFunctionMapping);
}
@Override
public void setApplicationContext(@Nonnull ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
路由注册bean
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.stereotype.Component;
import javax.annotation.Nonnull;
@Component
public class RouterBeanFactory implements ApplicationContextAware {
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(@Nonnull ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
/**
* 获取Bean注册器
*
* @return Bean注册器
*/
public DefaultSingletonBeanRegistry getBeanRegistry() {
return (DefaultSingletonBeanRegistry) ((AbstractApplicationContext) applicationContext).getBeanFactory();
}
/**
* 注册Bean
*
* @param name Bean名称
* @param o 对象
*/
public void registerBean(String name, Object o) {
DefaultSingletonBeanRegistry beanRegistry = getBeanRegistry();
if (beanRegistry.containsSingleton(name)) {
beanRegistry.destroySingleton(name);
}
beanRegistry.registerSingleton(name, o);
}
}
Comments NOTHING