一 . 前言
文档目的
- 梳理 Gateway 生产中转发请求的细节
- 梳理 转发的定制点
知识补充
请求转发是 Gateway 最核心的功能之一 , 他涉及到三个主要的概念 :
Route(路由): 路由是网关的基本单元,由 ID、URI、一组 Predicate、一组 Filter 组成,如果 Predicate 匹配 True , 则进行转发
Predicate(谓语、断言): 路由转发的判断条件,这是一个 Java 8 函数断言, 输入类型是 Spring Framework ServerWebExchange , 目前 SpringCloud Gateway 支持多种方式,常见如:Path、Query、Method、Header 等,写法必须遵循 key=vlue 的形式
Filter(过滤器): 过滤器是路由转发请求时所经过的过滤逻辑,使用特定工厂构建的 GatewayFilter 实例 , 可用于修改请求、响应内容
二 . 简单使用
2.1 predicates 汇总
//
- After=2017-01-20T17:42:47.789-07:00[America/Denver]
//
- Before=2017-01-20T17:42:47.789-07:00[America/Denver]
//
- Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver]
//
- Cookie=chocolate, ch.p
2.2 Mono 和 Flux
Mono 和 Flux 是贯穿了整个流程的核心对象 , 根据 reactive-streams 规范,发布服务器提供了数量可能无限的有序元素,并根据从其订阅服务器接收到的需求发布这些元素。Reactor-core 有一组此 Publisher 接口的实现。我们将要创建序列的两个重要实现是 Mono 和 Flux。
- Flux 表示的是包含 0 到 N 个元素的异步序列
- Mono 表示的是包含 0 或 1 个元素的异步序列
> SpringGateway 是使用 webflux 作为底层调用框架的 , 其中涉及到 mono 和 Flux 对象
> 该序列中可以包含 3 种通知 :
- 正常的包含元素的消息
- 序列结束的消息
- 序列出错的消息
Flux
- Flux 是一个标准 Publisher,表示 0 到 N 个发射项的异步序列,选择性的以完成或错误信号终止。与 Reactive Streams 规范中一样,这三种类型的信号转换为对下游订阅者的 onNext、onComplete 或 onError 方法的调用。
Mono
- Mono 是 Publisher 的另一个实现。它最多发出一个条目,然后 (可选) 以 onComplete 信号或 onError 信号终止 , Mono 在本质上也是异步的
- 它只提供了可用于 Flux 的操作符的子集,并且一些操作符(特别是那些将 Mono 与另一个发布者组合的操作符)切换到 Flux。
- 例如,Mono#concatWith(Publisher) 返回一个 Flux ,而 Mono#then(Mono) 则返回另一个 Mono。
常见的方法如下 :
- create : 以编程方式创建具有多次发射能力的 Flux,
- empty : 发出 0 元素或返回空 Flux
- just : 创建一个基础
- error : 创建一个 Flux,它在订阅之后立即以指定的错误终止
PS : 这一块就不深入看了 , 先看完 Gateway 的主流程
三 . 拦截深入
3.1 原理图
首先来看一下 SpringGateway 的原理图
四 . 调用的入口
4.1 调用流程
- Step 1 : HttpWebHandlerAdapter # handle : 构建 ServerWebExchange , 发起 Handler 处理
- Step 2 : DispatcherHandler # handle : 发起请求处理
- Step 3 : RoutePredicateHandlerMapping # getHandlerInternal : route 判断处理
4.2. getHandlerInternal 逻辑
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
}
3.2. lookupRoute
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
validateRoute(route, exchange);
return route;
});
}
会遍历所有的 route
五. 发送的流程
5.1 FilteringWebHandler 体系
此处的 webHandler 为 FilteringWebHandler 对象 , 来看一下这个对象的作用
这里涉及到以下的 Filter :
- C- ForwardPathFilter :
- C- ForwardRoutingFilter : 用来做本地 forward 的
- C- GatewayMetricsFilter : 与 Prometheus 整合,从而创建一个 Grafana dashboard
- C- LoadBalancerClientFilter : 用来整合 Ribbon 的 , 先获取微服务的名称,然后再通过 Ribbon 获取实际的调用地址
- C- NettyRoutingFilter : http 或 https , 使用 Netty 的
HttpClient
向下游的服务发送代理请求 - C- NettyWriteResponseFilter : 用于将代理响应写回网关的客户端侧,所以该过滤器会在所有其他过滤器执行完成后才执行
- C- OrderedGatewayFilter :
- C- RouteToRequestUrlFilter : 将从 request 里获取的 原始 url 转换成 Gateway 进行请求转发时所使用的 url
- C- WebClientHttpRoutingFilter :
- C- WebClientWriteResponseFilter :
- C- WebsocketRoutingFilter : ws 或者 wss,那么该 Filter 会使用 Spring Web Socket 将 Websocket 请求转发到下游
- C- WeightCalculatorWebFilter :
可以参考 → Spring Cloud Gateway 内置的全局过滤器
调用逻辑 1 : FilteringWebHandler 管理
该对象中存在一个内部类 DefaultGatewayFilterChain , 该类为 Filter 过滤链
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
// 当前 Filter 链索引
private final int index;
// Filter 集合
private final List<GatewayFilter> filters;
DefaultGatewayFilterChain(List<GatewayFilter> filters) {
this.filters = filters;
this.index = 0;
}
private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
this.filters = parent.getFilters();
this.index = index;
}
public List<GatewayFilter> getFilters() {
return filters;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
// 逐个 Filter 过滤调用
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
}
调用流程 3 : Filter 过滤
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 通常判断部分条件 , 如果该 Filter 不符合 , 则跳过该 Filter
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
5.2 发送的主体
核心的发送 Filter 是 NettyRoutingFilter, 下面只关注这个 Filter 的相关逻辑 :
C- NettyRoutingFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 请求 URL : http://httpbin.org:80/get
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
// 协议类型 : http
String scheme = requestUrl.getScheme();
// Step 1 : filter 链处理 ,如果不符合 http 协议 , 就通过下一个 Filter 处理
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
// Step 2 : 标识 Routed 已处理
setAlreadyRouted(exchange);
// Step 3 : 获取 Request 请求对象 , 这个是外部请求的对象
ServerHttpRequest request = exchange.getRequest();
// Step 4 : 获取 Method 类型 (get/post...)
final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
final String url = requestUrl.toString();
// Step 5 : 对 Header 进行处理 , 需要转发过去
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
// -> Transfer-Encoding
String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);
// -> preserveHostHeader
boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
// 通过 netty httpClient 发起转发请求 , PS !!! 这里是异步的
Flux<HttpClientResponse> responseFlux = this.httpClient
.chunkedTransfer(chunkedTransfer).request(method).uri(url)
.send((req, nettyOutbound) -> {
// Step 6 : 转发 Header
req.headers(httpHeaders);
// => 是否需要记录之前的 host
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
req.header(HttpHeaders.HOST, host);
}
// Step 7 : 真正发起请求
return nettyOutbound.options(NettyPipeline.SendOptions::flushOnEach)
.send(request.getBody()
.map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
.getNativeBuffer()));
}).responseConnection((res, connection) -> {
// Step 8 : 请求完成 , 获取 response
ServerHttpResponse response = exchange.getResponse();
// Step 9 : 转发headers 和 status 等属性
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(
entry -> headers.add(entry.getKey(), entry.getValue()));
// => String CONTENT_TYPE = "Content-Type"
// => String ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR = "original_response_content_type";
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
contentTypeValue);
}
// 转发状态 , 存在往 GatewayResponse 设置状态
HttpStatus status = HttpStatus.resolve(res.status().code());
if (status != null) {
response.setStatusCode(status);
}
else if (response instanceof AbstractServerHttpResponse) {
((AbstractServerHttpResponse) response)
.setStatusCodeValue(res.status().code());
}
else {
throw new IllegalStateException(
"Unable to set status code on response: "
+ res.status().code() + ", "
+ response.getClass());
}
// 确保 Header filter 在设置状态后运行, 校验 header 中 filter 正常
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
getHeadersFilters(), headers, exchange, Type.RESPONSE);
// String TRANSFER_ENCODING = "Transfer-Encoding"
// String CONTENT_LENGTH = "Content-Length"
if (!filteredResponseHeaders
.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders
.containsKey(HttpHeaders.CONTENT_LENGTH)) {
// content-length 存在需要去掉 Transfer-Encoding
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
// 延迟提交响应,直到所有路由过滤器都运行
// 将客户端响应作为ServerWebExchange属性,稍后写入响应NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
return Mono.just(res);
});
if (properties.getResponseTimeout() != null) {
// 超时异常处理
responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
Mono.error(new TimeoutException("Response took longer than timeout: "
+ properties.getResponseTimeout())))
.onErrorMap(TimeoutException.class,
// GATEWAY_TIMEOUT(504, "Gateway Timeout")
th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
th.getMessage(), th));
}
return responseFlux.then(chain.filter(exchange));
}
5.3 返回 Response
C- NettyWriteResponseFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).then(Mono.defer(() -> {
// Step 1 : 获取 GatewayRequest
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
// 连接不存在直接返回空
if (connection == null) {
return Mono.empty();
}
// Step 2 : 获取 GatewayResponse
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response
.bufferFactory();
// 此处主要包含一个 byteBufflux
final Flux<NettyDataBuffer> body = connection.inbound().receive().retain()
.map(factory::wrap);
// 媒体类型
MediaType contentType = null;
try {
contentType = response.getHeaders().getContentType();
}
catch (Exception e) {
log.trace("invalid media type", e);
}
return (isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body));
}));
}
总结
由于 netty 的底层了解的还不是很清楚 , 对于一些调用过程没办法输出数据看 , 这篇文章心里也不是很有底 , 后续深入后再来补充细节