黑马程序员技术交流社区

标题: 【上海校区】Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.7) ... [打印本页]

作者: 不二晨    时间: 2018-7-19 09:23
标题: 【上海校区】Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.7) ...
1. 概述
本文主要分享 NettyRoutingFilter 的代码实现。
NettyRoutingFilter ,Netty 路由网关过滤器。其根据 http:// 或 https:// 前缀( Scheme )过滤处理,使用基于 Netty 实现的 HttpClient 请求后端 Http 服务。
NettyWriteResponseFilter ,与 NettyRoutingFilter 成对使用的网关过滤器。其将 NettyRoutingFilter 请求后端 Http 服务的响应写回客户端。
大体流程如下 :
另外,Spring Cloud Gateway 实现了 WebClientHttpRoutingFilter / WebClientWriteResponseFilter ,功能上和 NettyRoutingFilter / NettyWriteResponseFilter 相同,差别在于基于 org.springframework.cloud.gateway.filter.WebClient 实现的 HttpClient 请求后端 Http 服务。在 《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.8) 之 WebClientHttpRoutingFilter》 ,我们会详细解析。

推荐 Spring Cloud 书籍:
推荐 Spring Cloud 视频:
2. NettyRoutingFilter
org.springframework.cloud.gateway.filter.NettyRoutingFilter ,Netty 路由网关过滤器。
构造方法,代码如下 :
public class NettyRoutingFilter implements GlobalFilter, Ordered {

        private final HttpClient httpClient;

        public NettyRoutingFilter(HttpClient httpClient) {
                this.httpClient = httpClient;
        }

}

#getOrder() 方法,代码如下 :
@Override
public int getOrder() {
    return Ordered.LOWEST_PRECEDENCE;
}

#filter(ServerWebExchange, GatewayFilterChain) 方法,代码如下 :
1: @Override
2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
3:         // 获得 requestUrl
4:         URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
5:
6:         // 判断是否能够处理
7:         String scheme = requestUrl.getScheme();
8:         if (isAlreadyRouted(exchange) || (!scheme.equals("http") && !scheme.equals("https"))) {
9:                 return chain.filter(exchange);
10:         }
11:
12:         // 设置已经路由
13:         setAlreadyRouted(exchange);
14:
15:         ServerHttpRequest request = exchange.getRequest();
16:
17:         // Request Method
18:         final HttpMethod method = HttpMethod.valueOf(request.getMethod().toString());
19:
20:         // 获得 url
21:         final String url = requestUrl.toString();
22:
23:         // Request Header
24:         final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
25:         request.getHeaders().forEach(httpHeaders::set);
26:
27:         // 请求
28:         return this.httpClient.request(method, url, req -> {
29:                 final HttpClientRequest proxyRequest = req.options(NettyPipeline.SendOptions::flushOnEach)
30:                                 .failOnClientError(false) // // 是否请求失败,抛出异常
31:                                 .headers(httpHeaders);
32:
33:                 // Request Form
34:                 if (MediaType.APPLICATION_FORM_URLENCODED.includes(request.getHeaders().getContentType())) {
35:                         return exchange.getFormData()
36:                                         .flatMap(map -> proxyRequest.sendForm(form -> {
37:                                                 for (Map.Entry<String, List<String>> entry: map.entrySet()) {
38:                                                         for (String value : entry.getValue()) {
39:                                                                 form.attr(entry.getKey(), value);
40:                                                         }
41:                                                 }
42:                                         }).then())
43:                                         .then(chain.filter(exchange));
44:                 }
45:
46:                 // Request Body
47:                 return proxyRequest.sendHeaders() //I shouldn't need this
48:                                 .send(request.getBody()
49:                                                 .map(DataBuffer::asByteBuffer) // Flux<DataBuffer> => ByteBuffer
50:                                                 .map(Unpooled::wrappedBuffer)); // ByteBuffer => Flux<DataBuffer>
51:         }).doOnNext(res -> {
52:                 ServerHttpResponse response = exchange.getResponse();
53:                 // Response Header
54:                 // put headers and status so filters can modify the response
55:                 HttpHeaders headers = new HttpHeaders();
56:                 res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));
57:                 response.getHeaders().putAll(headers);
58:
59:                 // Response Status
60:                 response.setStatusCode(HttpStatus.valueOf(res.status().code()));
61:
62:                 // 设置 Response 到 CLIENT_RESPONSE_ATTR
63:                 // Defer committing the response until all route filters have run
64:                 // Put client response as ServerWebExchange attribute and write response later NettyWriteResponseFilter
65:                 exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
66:         }).then(chain.filter(exchange));
67: }
3. NettyWriteResponseFilter
org.springframework.cloud.gateway.filter.NettyWriteResponseFilter ,Netty 回写响应网关过滤器。
#getOrder() 方法,代码如下 :
public static final int WRITE_RESPONSE_FILTER_ORDER = -1;
   
@Override
public int getOrder() {
    return WRITE_RESPONSE_FILTER_ORDER;
}

#filter(ServerWebExchange, GatewayFilterChain) 方法,代码如下 :
1: @Override
2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
3:         // NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_ATTR is not added
4:         // until the WebHandler is run
5:         return chain.filter(exchange).then(Mono.defer(() -> {
6:             // 获得 Response
7:                 HttpClientResponse clientResponse = exchange.getAttribute(CLIENT_RESPONSE_ATTR);
8:                 // HttpClientResponse clientResponse = getAttribute(exchange, CLIENT_RESPONSE_ATTR, HttpClientResponse.class);
9:                 if (clientResponse == null) {
10:                         return Mono.empty();
11:                 }
12:                 log.trace("NettyWriteResponseFilter start");
13:                 ServerHttpResponse response = exchange.getResponse();
14:
15:                 // 将 Netty Response 写回给客户端。
16:                 NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();
17:                 //TODO: what if it's not netty
18:                 final Flux<NettyDataBuffer> body = clientResponse.receive()
19:                                 .retain() // ByteBufFlux => ByteBufFlux
20:                                 .map(factory::wrap); // ByteBufFlux  => Flux<NettyDataBuffer>
21:                 return response.writeWith(body);
22:         }));
23: }


【转载】http://www.iocoder.cn/Spring-Clo ... netty-routing/?self


作者: 不二晨    时间: 2018-7-19 10:58
优秀,奈斯
作者: 摩西摩西OvO    时间: 2018-7-19 14:25

作者: 吴琼老师    时间: 2018-7-19 16:40

作者: 摩西摩西OvO    时间: 2018-7-23 13:35

作者: 摩西摩西OvO    时间: 2018-7-26 09:19





欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2