WebFlux运行时动态新增路由

发布于 13 小时前  0 次阅读


因为在项目中涉及到类似于网关的增加路由的操作,所以需要在WebFlux中动态的添加接口。
本文WebFlux版本为2.7.0,导包:

        <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
            <version>2.7.0</version>
        </dependency>

WebFlux路由源码分析

WebFlux中由WebHandler处理Web请求

public interface WebHandler {

    /**
     * Handle the web server exchange.
     * @param exchange the current server exchange
     * @return {@code Mono<Void>} to indicate when request handling is complete
     */
    Mono<Void> handle(ServerWebExchange exchange);

}

DispatcherHandler实现WebHandler,其提供了链式处理能力,可以组合异常处理器(WebExceptionHandler)、过滤器(WebFilter)以及目标。

public class DispatcherHandler implements WebHandler, PreFlightRequestHandler, ApplicationContextAware {

    @Nullable
    private List<HandlerMapping> handlerMappings; // 请求和处理程序对象之间的映射
    ......
    @Override
    public Mono<Void> handle(ServerWebExchange exchange) {
        if (this.handlerMappings == null) { // 是否有请求映射存在
            return createNotFoundError();
        }
        if (CorsUtils.isPreFlightRequest(exchange.getRequest())) { // 判断请求是否为CORS pre-flight
            return handlePreFlight(exchange);
        }
        return Flux.fromIterable(this.handlerMappings)
                .concatMap(mapping -> mapping.getHandler(exchange)) // 取单个mapping执行getHandler
                .next()
                .switchIfEmpty(createNotFoundError())
                .flatMap(handler -> invokeHandler(exchange, handler))
                .flatMap(result -> handleResult(exchange, result));
    }
    ......
}

getHandler是接口HandlerMapping中用于返回此请求的处理程序,我们需要关注其实现类AbstractHandlerMapping,如下

public abstract class AbstractHandlerMapping extends ApplicationObjectSupport implements HandlerMapping, Ordered, BeanNameAware {
    ......
    @Override
    public Mono<Object> getHandler(ServerWebExchange exchange) {
        return getHandlerInternal(exchange).map(handler -> { // 主要关注getHandlerInternal,为一个抽象方法,用于查找给定请求的处理程序
            if (logger.isDebugEnabled()) { // 日志
                logger.debug(exchange.getLogPrefix() + "Mapped to " + handler);
            }
            ServerHttpRequest request = exchange.getRequest();
            // CORS pre-flight验证
            if (hasCorsConfigurationSource(handler) || CorsUtils.isPreFlightRequest(request)) {
                CorsConfiguration config = (this.corsConfigurationSource != null ?
                        this.corsConfigurationSource.getCorsConfiguration(exchange) : null);
                CorsConfiguration handlerConfig = getCorsConfiguration(handler, exchange);
                config = (config != null ? config.combine(handlerConfig) : handlerConfig);
                if (config != null) {
                    config.validateAllowCredentials();
                }
                if (!this.corsProcessor.process(config, exchange) || CorsUtils.isPreFlightRequest(request)) {
                    return NO_OP_HANDLER;
                }
            }
            return handler;
        });
    }
    ......
}

最后到最重要的RouterFunctionMapping类,其继承了AbstractHandlerMapping,getHandlerInternal实现如下

    @Override
    protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
        if (this.routerFunction != null) {
            ServerRequest request = ServerRequest.create(exchange, this.messageReaders);
            // 在routerFunction中查找跟request匹配的处理函数
            return this.routerFunction.route(request)
                    .doOnNext(handler -> setAttributes(exchange.getAttributes(), request, handler));
        }
        else {
            return Mono.empty();
        }
    }

所以可以发现其实我们想动态增加路由,只需要修改将新的路由添加到routerFunction中即可,主要关注两个方法,如下

public class RouterFunctionMapping extends AbstractHandlerMapping implements InitializingBean {

    @Nullable
    private RouterFunction<?> routerFunction;
    ......
        // 该方法用于初始化routerFuncitons,调用了routerFunctions方法
        protected void initRouterFunctions() {
        List<RouterFunction<?>> routerFunctions = routerFunctions();
        this.routerFunction = routerFunctions.stream().reduce(RouterFunction::andOther).orElse(null);
        logRouterFunctions(routerFunctions);
    }

    private List<RouterFunction<?>> routerFunctions() {
        List<RouterFunction<?>> functions = obtainApplicationContext() // 获取上下文
                .getBeanProvider(RouterFunction.class) // 获取所有RouterFunciton的bean
                .orderedStream()
                .map(router -> (RouterFunction<?>)router)
                .collect(Collectors.toList());
        return (!CollectionUtils.isEmpty(functions) ? functions : Collections.emptyList());
    }
    ......
]

所以我们只需要将新的路由构建成RouterFunction然后注册一个bean,最后调用initRouterFunctions重新初始化routerFunction即可。

实现

注册路由

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;
import org.springframework.web.reactive.function.server.support.RouterFunctionMapping;

import javax.annotation.Nonnull;
import javax.annotation.Resource;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Component
public class RegisterRouter implements ApplicationContextAware {

    @Resource
    private RouterBeanFactory routerBeanFactory;

    private ApplicationContext applicationContext;

    /**
     * 注册新路由
     *
     * @param url 路由地址
     * @throws NoSuchMethodException     getDeclaredMethod时 如果找不到匹配的方法
     * @throws InvocationTargetException invoke时 如果底层方法抛出异常
     * @throws IllegalAccessException    invoke时 如果此Method对象正在执行 Java 语言访问控制并且底层方法不可访问。
     */
    public void registerRouter(String url) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        // 访问新路由时的调用
        RouterFunction<ServerResponse> appRouter = route(POST(url), request -> {
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body("webflux", String.class); 
        });
        // 路由的Bean注册
        routerBeanFactory.registerBean("bean" + url, appRouter);
        // 重新初始化路由
        RouterFunctionMapping routerFunctionMapping = (RouterFunctionMapping) applicationContext.getBean("routerFunctionMapping");
        Class<? extends RouterFunctionMapping> aClass = routerFunctionMapping.getClass();
        Method initRouterFunctions = aClass.getDeclaredMethod("initRouterFunctions");
        initRouterFunctions.setAccessible(true);
        initRouterFunctions.invoke(routerFunctionMapping);
    }

    @Override
    public void setApplicationContext(@Nonnull ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

路由注册bean

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.stereotype.Component;

import javax.annotation.Nonnull;

@Component
public class RouterBeanFactory implements ApplicationContextAware {
    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(@Nonnull ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    /**
     * 获取Bean注册器
     *
     * @return Bean注册器
     */
    public DefaultSingletonBeanRegistry getBeanRegistry() {
        return (DefaultSingletonBeanRegistry) ((AbstractApplicationContext) applicationContext).getBeanFactory();
    }

    /**
     * 注册Bean
     *
     * @param name Bean名称
     * @param o    对象
     */
    public void registerBean(String name, Object o) {
        DefaultSingletonBeanRegistry beanRegistry = getBeanRegistry();
        if (beanRegistry.containsSingleton(name)) {
            beanRegistry.destroySingleton(name);
        }
        beanRegistry.registerSingleton(name, o);
    }
}

平平无奇的在校大学生