一 . 前言

文档目的

  • 梳理 Gateway 生产中转发请求的细节
  • 梳理 转发的定制点

知识补充

请求转发是 Gateway 最核心的功能之一 , 他涉及到三个主要的概念 :

Route(路由): 路由是网关的基本单元,由 ID、URI、一组 Predicate、一组 Filter 组成,如果 Predicate 匹配 True , 则进行转发
Predicate(谓语、断言): 路由转发的判断条件,这是一个 Java 8 函数断言, 输入类型是 Spring Framework ServerWebExchange , 目前 SpringCloud Gateway 支持多种方式,常见如:Path、Query、Method、Header 等,写法必须遵循 key=vlue 的形式
Filter(过滤器): 过滤器是路由转发请求时所经过的过滤逻辑,使用特定工厂构建的 GatewayFilter 实例 , 可用于修改请求、响应内容

二 . 简单使用

2.1 predicates 汇总

// 
- After=2017-01-20T17:42:47.789-07:00[America/Denver]

// 
- Before=2017-01-20T17:42:47.789-07:00[America/Denver]

//
- Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver]

// 
- Cookie=chocolate, ch.p



2.2 Mono 和 Flux

Mono 和 Flux 是贯穿了整个流程的核心对象 , 根据 reactive-streams 规范,发布服务器提供了数量可能无限的有序元素,并根据从其订阅服务器接收到的需求发布这些元素。Reactor-core 有一组此 Publisher 接口的实现。我们将要创建序列的两个重要实现是 Mono 和 Flux。

  • Flux 表示的是包含 0 到 N 个元素的异步序列
  • Mono 表示的是包含 0 或 1 个元素的异步序列

> SpringGateway 是使用 webflux 作为底层调用框架的 , 其中涉及到 mono 和 Flux 对象

> 该序列中可以包含 3 种通知 :

  • 正常的包含元素的消息
  • 序列结束的消息
  • 序列出错的消息

Flux

  • Flux 是一个标准 Publisher,表示 0 到 N 个发射项的异步序列,选择性的以完成或错误信号终止。与 Reactive Streams 规范中一样,这三种类型的信号转换为对下游订阅者的 onNext、onComplete 或 onError 方法的调用。

Mono

  • Mono 是 Publisher 的另一个实现。它最多发出一个条目,然后 (可选) 以 onComplete 信号或 onError 信号终止 , Mono 在本质上也是异步的
  • 它只提供了可用于 Flux 的操作符的子集,并且一些操作符(特别是那些将 Mono 与另一个发布者组合的操作符)切换到 Flux。
    • 例如,Mono#concatWith(Publisher) 返回一个 Flux ,而 Mono#then(Mono) 则返回另一个 Mono。

常见的方法如下 :

  • create : 以编程方式创建具有多次发射能力的 Flux,
  • empty : 发出 0 元素或返回空 Flux
  • just : 创建一个基础
  • error : 创建一个 Flux,它在订阅之后立即以指定的错误终止

PS : 这一块就不深入看了 , 先看完 Gateway 的主流程

三 . 拦截深入

3.1 原理图

首先来看一下 SpringGateway 的原理图

四 . 调用的入口

4.1 调用流程

  • Step 1 : HttpWebHandlerAdapter # handle : 构建 ServerWebExchange , 发起 Handler 处理
  • Step 2 : DispatcherHandler # handle : 发起请求处理
  • Step 3 : RoutePredicateHandlerMapping # getHandlerInternal : route 判断处理

4.2. getHandlerInternal 逻辑

protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
   // don't handle requests on management port if set and different than server port
   if (this.managementPortType == DIFFERENT && this.managementPort != null
         && exchange.getRequest().getURI().getPort() == this.managementPort) {
      return Mono.empty();
   }
   exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());

   return lookupRoute(exchange)
         // .log("route-predicate-handler-mapping", Level.FINER) //name this
         .flatMap((Function<Route, Mono<?>>) r -> {
            exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
            if (logger.isDebugEnabled()) {
               logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
            }

            exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
            return Mono.just(webHandler);
         }).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
            exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
            if (logger.isTraceEnabled()) {
               logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
            }
         })));
}


3.2. lookupRoute

protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
   return this.routeLocator.getRoutes()
         // individually filter routes so that filterWhen error delaying is not a
         // problem
         .concatMap(route -> Mono.just(route).filterWhen(r -> {
            // add the current route we are testing
            exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
            return r.getPredicate().apply(exchange);
         })
               // instead of immediately stopping main flux due to error, log and
               // swallow it
               .doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
               .onErrorResume(e -> Mono.empty()))
         // .defaultIfEmpty() put a static Route not found
         // or .switchIfEmpty()
         // .switchIfEmpty(Mono.<Route>empty().log("noroute"))
         .next()
         // TODO: error handling
         .map(route -> {
            validateRoute(route, exchange);
            return route;
         });


}


会遍历所有的 route

五. 发送的流程

5.1 FilteringWebHandler 体系

此处的 webHandler 为 FilteringWebHandler 对象 , 来看一下这个对象的作用

这里涉及到以下的 Filter :

  • C- ForwardPathFilter :
  • C- ForwardRoutingFilter : 用来做本地 forward 的
  • C- GatewayMetricsFilter : 与 Prometheus 整合,从而创建一个 Grafana dashboard
  • C- LoadBalancerClientFilter : 用来整合 Ribbon 的 , 先获取微服务的名称,然后再通过 Ribbon 获取实际的调用地址
  • C- NettyRoutingFilter : http 或 https , 使用 Netty 的 HttpClient 向下游的服务发送代理请求
  • C- NettyWriteResponseFilter : 用于将代理响应写回网关的客户端侧,所以该过滤器会在所有其他过滤器执行完成后才执行
  • C- OrderedGatewayFilter :
  • C- RouteToRequestUrlFilter : 将从 request 里获取的 原始 url 转换成 Gateway 进行请求转发时所使用的 url
  • C- WebClientHttpRoutingFilter :
  • C- WebClientWriteResponseFilter :
  • C- WebsocketRoutingFilter : ws 或者 wss,那么该 Filter 会使用 Spring Web Socket 将 Websocket 请求转发到下游
  • C- WeightCalculatorWebFilter :

可以参考 Spring Cloud Gateway 内置的全局过滤器

调用逻辑 1 : FilteringWebHandler 管理

该对象中存在一个内部类 DefaultGatewayFilterChain , 该类为 Filter 过滤链

private static class DefaultGatewayFilterChain implements GatewayFilterChain {
    
   // 当前 Filter 链索引 
   private final int index;
   // Filter 集合 
   private final List<GatewayFilter> filters;

   DefaultGatewayFilterChain(List<GatewayFilter> filters) {
      this.filters = filters;
      this.index = 0;
   }

   private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
      this.filters = parent.getFilters();
      this.index = index;
   }

   public List<GatewayFilter> getFilters() {
      return filters;
   }

   @Override
   public Mono<Void> filter(ServerWebExchange exchange) {
      return Mono.defer(() -> {
         if (this.index < filters.size()) {
            // 逐个 Filter 过滤调用 
            GatewayFilter filter = filters.get(this.index);
            DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
                  this.index + 1);
            return filter.filter(exchange, chain);
         }
         else {
            return Mono.empty(); // complete
         }
      });
   }

}


调用流程 3 : Filter 过滤

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {

    // 通常判断部分条件  , 如果该 Filter 不符合 , 则跳过该 Filter
   if (isAlreadyRouted(exchange)
         || (!"http".equals(scheme) && !"https".equals(scheme))) {
      return chain.filter(exchange);
   }


5.2 发送的主体

核心的发送 Filter 是 NettyRoutingFilter, 下面只关注这个 Filter 的相关逻辑 :

C- NettyRoutingFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
   // 请求 URL : http://httpbin.org:80/get
   URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
    // 协议类型 : http
   String scheme = requestUrl.getScheme();
   
   // Step 1 : filter 链处理 ,如果不符合 http 协议 , 就通过下一个 Filter 处理
   if (isAlreadyRouted(exchange)
         || (!"http".equals(scheme) && !"https".equals(scheme))) {
      return chain.filter(exchange);
   }
   // Step 2 : 标识 Routed 已处理
   setAlreadyRouted(exchange);

   // Step 3 : 获取 Request 请求对象 , 这个是外部请求的对象
   ServerHttpRequest request = exchange.getRequest();
    
   // Step 4 : 获取 Method 类型 (get/post...)
   final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
   final String url = requestUrl.toString();
   
   // Step 5 : 对 Header 进行处理 , 需要转发过去
   HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
   final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
   filtered.forEach(httpHeaders::set);
    
   // -> Transfer-Encoding
   String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
   boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);
   // -> preserveHostHeader
   boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);

   // 通过 netty httpClient 发起转发请求 , PS !!! 这里是异步的
   Flux<HttpClientResponse> responseFlux = this.httpClient
         .chunkedTransfer(chunkedTransfer).request(method).uri(url)
         .send((req, nettyOutbound) -> {
            // Step 6 : 转发 Header 
            req.headers(httpHeaders);
            
            // => 是否需要记录之前的 host
            if (preserveHost) {
               String host = request.getHeaders().getFirst(HttpHeaders.HOST);
               req.header(HttpHeaders.HOST, host);
            }
            
            // Step 7 : 真正发起请求
            return nettyOutbound.options(NettyPipeline.SendOptions::flushOnEach)
                  .send(request.getBody()
                        .map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
                              .getNativeBuffer()));
         }).responseConnection((res, connection) -> {
            // Step 8 : 请求完成 , 获取 response
            ServerHttpResponse response = exchange.getResponse();
            
            // Step 9 : 转发headers 和 status 等属性
            HttpHeaders headers = new HttpHeaders();
            res.responseHeaders().forEach(
                  entry -> headers.add(entry.getKey(), entry.getValue()));
            
            // => String CONTENT_TYPE = "Content-Type" 
            // => String ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR = "original_response_content_type";
            String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
            if (StringUtils.hasLength(contentTypeValue)) {
               exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
                     contentTypeValue);
            }
            
            // 转发状态 , 存在往 GatewayResponse 设置状态
            HttpStatus status = HttpStatus.resolve(res.status().code());
            if (status != null) {
               response.setStatusCode(status);
            }
            else if (response instanceof AbstractServerHttpResponse) {
               ((AbstractServerHttpResponse) response)
                     .setStatusCodeValue(res.status().code());
            }
            else {
               throw new IllegalStateException(
                     "Unable to set status code on response: "
                           + res.status().code() + ", "
                           + response.getClass());
            }

            // 确保 Header filter 在设置状态后运行, 校验 header 中 filter 正常
            HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
                  getHeadersFilters(), headers, exchange, Type.RESPONSE);
            
            //  String TRANSFER_ENCODING = "Transfer-Encoding"
            //  String CONTENT_LENGTH = "Content-Length"
            if (!filteredResponseHeaders
                  .containsKey(HttpHeaders.TRANSFER_ENCODING)
                  && filteredResponseHeaders
                        .containsKey(HttpHeaders.CONTENT_LENGTH)) {
               // content-length 存在需要去掉 Transfer-Encoding
               response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
            }

            exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
                  filteredResponseHeaders.keySet());

            response.getHeaders().putAll(filteredResponseHeaders);

            // 延迟提交响应,直到所有路由过滤器都运行
            // 将客户端响应作为ServerWebExchange属性,稍后写入响应NettyWriteResponseFilter
            exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
            exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

            return Mono.just(res);
         });

   if (properties.getResponseTimeout() != null) {
      // 超时异常处理
      responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
            Mono.error(new TimeoutException("Response took longer than timeout: "
                  + properties.getResponseTimeout())))
            .onErrorMap(TimeoutException.class,
                  // GATEWAY_TIMEOUT(504, "Gateway Timeout")
                  th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
                        th.getMessage(), th));
   }

   return responseFlux.then(chain.filter(exchange));
}


5.3 返回 Response

C- NettyWriteResponseFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
   return chain.filter(exchange).then(Mono.defer(() -> {
      // Step 1 : 获取 GatewayRequest
      Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
      // 连接不存在直接返回空 
      if (connection == null) {
         return Mono.empty();
      }
      
      // Step 2 : 获取 GatewayResponse
      ServerHttpResponse response = exchange.getResponse();
      NettyDataBufferFactory factory = (NettyDataBufferFactory) response
            .bufferFactory();

      // 此处主要包含一个 byteBufflux
      final Flux<NettyDataBuffer> body = connection.inbound().receive().retain()
            .map(factory::wrap);

      // 媒体类型  
      MediaType contentType = null;
      try {
         contentType = response.getHeaders().getContentType();
      }
      catch (Exception e) {
         log.trace("invalid media type", e);
      }
      return (isStreamingMediaType(contentType)
            ? response.writeAndFlushWith(body.map(Flux::just))
            : response.writeWith(body));
   }));
}


总结

由于 netty 的底层了解的还不是很清楚 , 对于一些调用过程没办法输出数据看 , 这篇文章心里也不是很有底 , 后续深入后再来补充细节