5-服务网关Gateway

5-服务网关Gateway,第1张

5-服务网关Gateway 5-服务网关Gateway_读取与修改请求

Gateway_版本 3.0.4

1.0 修改请求体

参考 ModifyRequestBodyGatewayFilterFactory实现

if (HttpMethod.POST.equals(method)) {
            // mediaType
            MediaType mediaType = request.getHeaders().getContentType();
            // read & modify body
            ServerRequest serverRequest = ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders());
            //重点
            Mono modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> {
                //因为约定了终端传参的格式,所以只考虑json的情况,如果是表单传参,请自行发挥
                if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
                    requestParams.set(body);
                    // 这里修改然后将修改后的字符串设置返回
                    return Mono.just(body);
                }
                return Mono.empty();
            }).switchIfEmpty(Mono.defer(() -> Mono.empty()));

            BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
            HttpHeaders headers = new HttpHeaders();
            headers.putAll(exchange.getRequest().getHeaders());
            headers.remove("Content-Length");
            CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
            return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
                ServerHttpRequest decorator = this.decorate(exchange, headers, outputMessage);
                return chain.filter(exchange.mutate().request(decorator).build());
            })).onErrorResume((throwable) -> this.release(exchange, outputMessage, (Throwable) throwable));
        }


public ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
                                               CachedBodyOutputMessage outputMessage) {
        return new ServerHttpRequestDecorator(exchange.getRequest()) {
            public HttpHeaders getHeaders() {
                long contentLength = headers.getContentLength();
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.putAll(headers);
                if (contentLength > 0L) {
                    httpHeaders.setContentLength(contentLength);
                } else {
                    httpHeaders.set("Transfer-Encoding", "chunked");
                }

                return httpHeaders;
            }

            public Flux getBody() {
                return outputMessage.getBody();
            }
        };
    }

    private Mono release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage, Throwable throwable) {
        Field cached = ReflectionUtils.findField(outputMessage.getClass(), "cached");
        cached.setAccessible(true);
        try {
            return (boolean)cached.get(outputMessage) ? outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable)) : Mono.error(throwable);
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        return null;
    }
2.0 获取请求体 2.1 自定义ReadBodyRoutePredicateFactory
@Component
public class CustomReadBodyRoutePredicateFactory extends AbstractRoutePredicateFactory {

    protected static final Log log = LogFactory.getLog(CustomReadBodyRoutePredicateFactory.class);
    private List> messageReaders;

    public CustomReadBodyRoutePredicateFactory() {
        super(Config.class);
        this.messageReaders = HandlerStrategies.withDefaults().messageReaders();
    }

    public CustomReadBodyRoutePredicateFactory(List> messageReaders) {
        super(Config.class);
        this.messageReaders = messageReaders;
    }

    @Override
    public AsyncPredicate applyAsync(Config config) {
        return new AsyncPredicate() {
            @Override
            public Publisher apply(ServerWebExchange exchange) {
                Class inClass = config.getInClass();
                Object cachedBody = exchange.getAttribute("cachedRequestBodyObject");
                if (cachedBody != null) {
                    try {
                        boolean test = config.predicate.test(cachedBody);
                        exchange.getAttributes().put("read_body_predicate_test_attribute", test);
                        return Mono.just(test);
                    } catch (ClassCastException var6) {
                        if (CustomReadBodyRoutePredicateFactory.log.isDebugEnabled()) {
                            CustomReadBodyRoutePredicateFactory.log.debug("Predicate test failed because class in predicate does not match the cached body object", var6);
                        }
                        return Mono.just(false);
                    }
                } else {
                    return ServerWebExchangeUtils.cacheRequestBodyAndRequest(exchange, (serverHttpRequest) -> {
                        return ServerRequest.create(exchange.mutate().request(serverHttpRequest).build(), CustomReadBodyRoutePredicateFactory.this.messageReaders).bodyToMono(inClass).doOnNext((objectValue) -> {
                            exchange.getAttributes().put("cachedRequestBodyObject", objectValue);
                        }).map((objectValue) -> {
                            return config.getPredicate().test(objectValue);
                        }).thenReturn(true);
                    });
                }
            }

            @Override
            public String toString() {
                return String.format("ReadBody: %s", config.getInClass());
            }
        };
    }

    @Override
    public Predicate apply(Config config) {
        throw new UnsupportedOperationException("ReadBodyPredicateFactory is only async.");
    }

    public static class Config {
        private Class inClass;
        private Predicate predicate;
        private Map hints;

        public Config() {
        }

        public Class getInClass() {
            return this.inClass;
        }

        public Config setInClass(Class inClass) {
            this.inClass = inClass;
            return this;
        }

        public Predicate getPredicate() {
            return this.predicate;
        }

        public Config setPredicate(Predicate predicate) {
            this.predicate = predicate;
            return this;
        }

        public  Config setPredicate(Class inClass, Predicate predicate) {
            this.setInClass(inClass);
            this.predicate = predicate;
            return this;
        }

        public Map getHints() {
            return this.hints;
        }

        public Config setHints(Map hints) {
            this.hints = hints;
            return this;
        }
    }
    
    
    @Bean
    public Predicate bodyPredicate(){
        return o -> true;
    }
}
2.2 配置文件配置
spring:
  cloud:
    gateway:
      routes:
        - id: master-app-api
          uri: https://test-master-app-api.wanshifu.com
          predicates:
            - Path=/app/**
            - name: CustomReadBody
              args:
                inClass: '#{T(String)}'
                predicate: '#{@bodyPredicate}' #注入实现predicate接口类

        - id: master-order-api
          uri: https://test-order-app-api.wanshifu.com
          predicates:
            - Path=/order/**
            - name: CustomReadBody
              args:
                inClass: '#{T(String)}'
                predicate: '#{@bodyPredicate}' #注入实现predicate接口类
2.3 过滤器内使用
if (HttpMethod.POST.equals(method)) {
            log.info("Body: [{}]", (String) exchange.getAttribute("cachedRequestBodyObject"));
        }
2.4 另一种方式获取请求体

参考ReadBodyRoutePredicateFactory实现.

不会带来requestbody取不全问题, 但是缺点在于只能读一次, 如果在其他过滤器再次获取请求体会报java.lang.IllegalStateException: only one connection receive subscriber allowed.

@Service
public class GlobalRequestLogCollectFilter implements GlobalFilter, Ordered {

    private static final Logger log = LogManager.getLogger(GlobalRequestLogCollectFilter.class);

    private static final ObjectMapper objectMapper = new ObjectMapper();

    private static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject";

    @Override
    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String urlPath = request.getPath().pathWithinApplication().value();
        StringBuilder requestParams = new StringBuilder();
        HttpMethod method = request.getMethod();
        String contentType = request.getHeaders().getFirst(HttpHeaders.CONTENT_TYPE);
        // 获取请求那个路由
        String routeId = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR).getId();
        requestParams.append("GlobalRequestLogCollectFilter")
                .append("通过网关的请求, 路由ID:").append(routeId).append("请求路径为:").append(urlPath);
        if (HttpMethod.GET.equals(method)) {
           //....
        }
        if (HttpMethod.POST.equals(method) && MediaType.APPLICATION_JSON_VALUE.equals(contentType)) {
            ServerRequest serverRequest = ServerRequest.create(exchange.mutate().request(request).build(),
                    HandlerStrategies.withDefaults().messageReaders());
            Mono requestBody = serverRequest.bodyToMono(String.class)
                    .doOnNext(objectValue -> exchange.getAttributes() // 放进请求域属性中
                            .put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue));
            requestBody.subscribe((e) -> requestParams.append("请求参数: ").append(e).append(" "));
        }
        return chain.filter(exchange);
    }

    @Override
    public int getOrder() {
        return Integer.MIN_VALUE;
    }
}

建议使用修改请求体的方式(参考扩展), 获取请求体, 读取完重新封装一个新的请求, 其他过滤器也能继续读取

3.0 修改响应

参考ModifyResponseBodyGatewayFilterFactory 实现

public class MyModifiedServerHttpResponse extends ServerHttpResponseDecorator {

    private final Map messageBodyDecoders;
    private final Map messageBodyEncoders;
    private final List> messageReaders;
    private final ServerWebExchange exchange;

    public MyModifiedServerHttpResponse(ServerWebExchange exchange) {
        super(exchange.getResponse());
        this.exchange = exchange;
        this.messageReaders = exchange.getApplicationContext().getBean(ServerCodecConfigurer.class).getReaders();
        this.messageBodyDecoders = exchange.getApplicationContext().getBeansOfType(MessageBodyDecoder.class);
        this.messageBodyEncoders = exchange.getApplicationContext().getBeansOfType(MessageBodyEncoder.class);
    }
    
    @SuppressWarnings("unchecked")
    @Override
    public Mono writeWith(Publisher body) {
        if (body instanceof Flux) {
            Flux fluxBody = (Flux) body;
            String originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
            HttpHeaders httpHeaders = new HttpHeaders();
            httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType);
            ClientResponse clientResponse = this.prepareClientResponse(fluxBody, httpHeaders);
            Mono responseMono = extractBody(exchange, clientResponse, String.class);
            // 这里修改响应体
            responseMono = responseMono.map((e) -> {
                return "修改响应体";
            });
            BodyInserter bodyInserter = BodyInserters.fromPublisher(responseMono, String.class);
            CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange,
                    exchange.getResponse().getHeaders());

            return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
                Mono messageBody = this.writeBody(getDelegate(), outputMessage, String.class);
                HttpHeaders headers = getDelegate().getHeaders();
                if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
                        || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
                    messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
                }
                // TODO: fail if isStreamingMediaType?
                return getDelegate().writeWith(messageBody);
            }));
        }
        // if body is not a flux. never got there.
        return super.writeWith(body);
    }

    private  Mono extractBody(ServerWebExchange exchange, ClientResponse clientResponse, Class inClass) {
        // if inClass is byte[] then just return body, otherwise check if
        // decoding required
        if (byte[].class.isAssignableFrom(inClass)) {
            return clientResponse.bodyToMono(inClass);
        }
        List encodingHeaders = exchange.getResponse().getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING);
        for (String encoding : encodingHeaders) {
            MessageBodyDecoder decoder = messageBodyDecoders.get(encoding);
            if (decoder != null) {
                return clientResponse.bodyToMono(byte[].class).publishOn(Schedulers.parallel()).map(decoder::decode)
                        .map(bytes -> exchange.getResponse().bufferFactory().wrap(bytes))
                        .map(buffer -> prepareClientResponse(Mono.just(buffer),
                                exchange.getResponse().getHeaders()))
                        .flatMap(response -> response.bodyToMono(inClass));
            }
        }
        return clientResponse.bodyToMono(inClass);
    }

    private ClientResponse prepareClientResponse(Publisher body, HttpHeaders httpHeaders) {
        ClientResponse.Builder builder;
        builder = ClientResponse.create(exchange.getResponse().getStatusCode(),this.messageReaders);
        return builder.headers(headers -> headers.putAll(httpHeaders)).body(Flux.from(body)).build();
    }

    private Mono writeBody(ServerHttpResponse httpResponse, CachedBodyOutputMessage message,
                                       Class outClass) {
        Flux body = message.getBody();
        Mono response = DataBufferUtils.join(body);
        if (byte[].class.isAssignableFrom(outClass)) {
            return response;
        }
        List encodingHeaders = httpResponse.getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING);
        for (String encoding : encodingHeaders) {
            MessageBodyEncoder encoder = messageBodyEncoders.get(encoding);
            if (encoder != null) {
                DataBufferFactory dataBufferFactory = httpResponse.bufferFactory();
                response = response.publishOn(Schedulers.parallel()).map(buffer -> {
                    byte[] encodedResponse = encoder.encode(buffer);
                    DataBufferUtils.release(buffer);
                    return encodedResponse;
                }).map(dataBufferFactory::wrap);
            }
        }
        return response;
    }
}
3.1 过滤器中使用
@Service
public class GlobalRequestLogCollectFilter implements GlobalFilter, Ordered {

    private static final Logger log = LogManager.getLogger(GlobalRequestLogCollectFilter.class);

    @Override
    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
       return chain.filter(exchange.mutate().response(new MyModifiedServerHttpResponse(exchange)).build());
    }

    @Override
    public int getOrder() {
        return Integer.MIN_VALUE; // 这里必须是优先级小于0
    }
}
4.0 读取响应体

参考修改响应体实现

@Component
public class ReadServerHttpResponse {

    private final Map messageBodyDecoders;
    private final static Logger logger = LogManager.getLogger(ReadServerHttpResponseDecorator.class);
    private final static Set notReadMediaTypes = new HashSet<>(Arrays.asList(
            MediaType.APPLICATION_OCTET_STREAM_VALUE,
            MediaType.APPLICATION_PDF_VALUE,
            MediaType.IMAGE_GIF_VALUE,
            MediaType.IMAGE_JPEG_VALUE,
            MediaType.IMAGE_PNG_VALUE
    ));

    public ReadServerHttpResponse(Set bodyDecoders) {
        this.messageBodyDecoders = bodyDecoders.stream()
                .collect(Collectors.toMap(MessageBodyDecoder::encodingType, identity()));
    }

    public ReadServerHttpResponseDecorator getReadServerHttpResponseDecorator(ServerWebExchange exchange) {
        return new ReadServerHttpResponseDecorator(exchange);
    }

    protected class ReadServerHttpResponseDecorator extends ServerHttpResponseDecorator {

        private final ServerWebExchange exchange;
        private Mono result;

        public ReadServerHttpResponseDecorator(ServerWebExchange exchange) {
            super(exchange.getResponse());
            this.exchange = exchange;
        }

        public Mono getResult() {
            return this.result;
        }

        @Override
        public Mono writeWith(Publisher body) {
            // 排除流响应, 不读取日志
            ServerHttpResponse response = exchange.getResponse();
            MediaType contentType = response.getHeaders().getContentType();
            if (notReadMediaTypes.contains(contentType.toString())) {
                this.result = Mono.just(contentType.toString() + "类型, 不读取响应日志!");
                return getDelegate().writeWith(body);
            }
            // 读取响应正文日志
            Flux bodyMono = Flux.from(body);
            Mono bufferMono = bodyMono.collectList().map((dataBuffer) -> {
                DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
                DefaultDataBuffer join = factory.join(dataBuffer);
                int readableByteCount = join.readableByteCount();
                byte[] content = new byte[readableByteCount];
                join.read(content);
                // 释放之前的数据缓冲区
                DataBufferUtils.release(join);
                this.result = this.getRspString(content);
                return response.bufferFactory().wrap(content);
            });
            return getDelegate().writeWith(bufferMono);
        }

        
        private Mono getRspString(byte[] contentArray) {
            return Mono.just(contentArray)
                    .filter(Objects::nonNull)
                    .map((content) -> {
                        String str = null;
                        List encodingHeaders = exchange.getResponse().getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING);
                        for (String encoding : encodingHeaders) {
                            MessageBodyDecoder decoder = ReadServerHttpResponse.this.messageBodyDecoders.get(encoding);
                            if (decoder != null) {
                                byte[] decode = decoder.decode(content);
                                str = new String(decode, Charset.defaultCharset());
                            }
                        }
                        if (StringUtil.isNullOrEmpty(str)) {
                            str = new String(content, Charset.defaultCharset());
                        }
                        return str;
                    })
                    .timeout(Duration.ofSeconds(5));
        }
    }
}
扩展 使用的全局过滤器打印请求与响应日志

MyModifiedServerHttpResponse

模拟ModifyResponseBodyGatewayFilterFactory 实现的

import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.filter.factory.rewrite.MessageBodyDecoder;
import org.springframework.cloud.gateway.filter.factory.rewrite.MessageBodyEncoder;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.function.Function.identity;
import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR;

@Component
public class ReadServerHttpResponse {

    private final Map messageBodyDecoders;
    private final Map messageBodyEncoders;
    private final List> messageReaders;

    public ReadServerHttpResponse(ServerCodecConfigurer codecConfigurer,
                                  Set bodyDecoders,
                                  Set bodyEncoders) {
        this.messageReaders = codecConfigurer.getReaders();
        this.messageBodyDecoders = bodyDecoders.stream()
                .collect(Collectors.toMap(MessageBodyDecoder::encodingType, identity()));
        this.messageBodyEncoders = bodyEncoders.stream()
                .collect(Collectors.toMap(MessageBodyEncoder::encodingType, identity()));
    }

    public ReadServerHttpResponseDecorator getReadServerHttpResponseDecorator(ServerWebExchange exchange) {
        return new ReadServerHttpResponseDecorator(exchange);
    }

    public class ReadServerHttpResponseDecorator extends ServerHttpResponseDecorator {

        private final ServerWebExchange exchange;
        private String result;

        public ReadServerHttpResponseDecorator(ServerWebExchange exchange) {
            super(exchange.getResponse());
            this.exchange = exchange;
        }

        public String getResult() {
            return result;
        }

        @Override
        public Mono writeWith(Publisher body) {
            String originalResponseContentType = this.exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
            HttpHeaders httpHeaders = new HttpHeaders();
            httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType);
            ClientResponse clientResponse = this.prepareClientResponse(body, httpHeaders);
            Mono responseMono = this.extractBody(this.exchange, clientResponse, String.class);
            responseMono = responseMono.doOnSuccess((e) -> this.result = e);
            BodyInserter bodyInserter = BodyInserters.fromPublisher(responseMono, String.class);
            CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange,
                    exchange.getResponse().getHeaders());
            return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
                Mono messageBody = this.writeBody(getDelegate(), outputMessage, String.class);
                HttpHeaders headers = getDelegate().getHeaders();
                if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
                        || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
                    messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount()));
                }
                return getDelegate().writeWith(messageBody);
            }));
        }

        private  Mono extractBody(ServerWebExchange exchange, ClientResponse clientResponse, Class inClass) {
            if (byte[].class.isAssignableFrom(inClass)) {
                return clientResponse.bodyToMono(inClass);
            }
            List encodingHeaders = exchange.getResponse().getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING);
            for (String encoding : encodingHeaders) {
                MessageBodyDecoder decoder = ReadServerHttpResponse.this.messageBodyDecoders.get(encoding);
                if (decoder != null) {
                    return clientResponse.bodyToMono(byte[].class).publishOn(Schedulers.parallel()).map(decoder::decode)
                            .map(bytes -> exchange.getResponse().bufferFactory().wrap(bytes))
                            .map(buffer -> prepareClientResponse(Mono.just(buffer),
                                    exchange.getResponse().getHeaders()))
                            .flatMap(response -> response.bodyToMono(inClass));
                }
            }
            return clientResponse.bodyToMono(inClass);
        }

        private ClientResponse prepareClientResponse(Publisher body, HttpHeaders httpHeaders) {
            ClientResponse.Builder builder;
            builder = ClientResponse.create(exchange.getResponse().getStatusCode(), ReadServerHttpResponse.this.messageReaders);
            return builder.headers(headers -> headers.putAll(httpHeaders)).body(Flux.from(body)).build();
        }

        private Mono writeBody(ServerHttpResponse httpResponse, CachedBodyOutputMessage message,
                                           Class outClass) {
            Flux body = message.getBody();
            Mono response = DataBufferUtils.join(body);
            if (byte[].class.isAssignableFrom(outClass)) {
                return response;
            }
            List encodingHeaders = httpResponse.getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING);
            for (String encoding : encodingHeaders) {
                MessageBodyEncoder encoder = ReadServerHttpResponse.this.messageBodyEncoders.get(encoding);
                if (encoder != null) {
                    DataBufferFactory dataBufferFactory = httpResponse.bufferFactory();
                    response = response.publishOn(Schedulers.parallel()).map(buffer -> {
                        byte[] encodedResponse = encoder.encode(buffer);
                        DataBufferUtils.release(buffer);
                        return encodedResponse;
                    }).map(dataBufferFactory::wrap);
                }
            }
            return response;
        }
    }
}

GlobalRequestLogCollectFilter

读取请求体参数, 参考ModifyRequestBodyGatewayFilterFactory实现

import com.wshifu.filter.config.ReadServerHttpResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.stereotype.Service;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;


@Service
public class GlobalRequestLogCollectFilter implements GlobalFilter, Ordered {

    @Autowired
    private ReadServerHttpResponse readServerHttpResponse;
    private static final Logger log = LogManager.getLogger(GlobalRequestLogCollectFilter.class);
    private static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject";
    private static final String REQUEST_TIME_MILLIS_KEY = "requestTimeMillisKey";

    @Override
    public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        exchange.getAttributes().put(REQUEST_TIME_MILLIS_KEY, System.currentTimeMillis());
        ServerHttpRequest request = exchange.getRequest();
        String urlPath = request.getPath().pathWithinApplication().value();
        StringBuilder requestParams = new StringBuilder();
        HttpMethod method = request.getMethod();
        MediaType contentType = request.getHeaders().getContentType();
        // 获取请求那个路由
        String routeId = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR).getId();
        requestParams.append("GatewayService")
                .append(", 通过网关的请求, 路由ID: ").append(routeId)
                .append(", 请求路径为:").append(urlPath)
                .append(", 请求方式: ").append(method.name())
                .append(", 请求参数: ");
        if (HttpMethod.GET.equals(method)) {
            Map valueMap = Optional.ofNullable(request.getQueryParams())
                    .map(MultiValueMap::toSinglevalueMap).orElseGet(HashMap::new);
            exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, valueMap);
            requestParams.append(valueMap).append(" ");
        }
        if (HttpMethod.POST.equals(method) && !MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
            try {
                ServerRequest serverRequest = ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders());
                // 读取表单日志
                if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
                    return this.readFormData(exchange, chain, requestParams, serverRequest);
                }
                // 读取json日志
                if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) {
                    return this.readJsonData(exchange, chain, requestParams, serverRequest);
                }
            } catch (Exception e) {
                log.error("GlobalRequestLogCollectFilter, 读取请求日志异常: {}", e);
            }
        }
        // 普通请求日志
        ReadServerHttpResponse.ReadServerHttpResponseDecorator responseDecorator =
                readServerHttpResponse.getReadServerHttpResponseDecorator(exchange);
        return chain.filter(exchange.mutate().response(responseDecorator).build())
                .then(Mono.fromRunnable(() -> this.outResponseLog(exchange, requestParams, responseDecorator)));

    }

    private Mono readJsonData(ServerWebExchange exchange, GatewayFilterChain chain,
                              StringBuilder requestParams, ServerRequest serverRequest) {
        Mono modifiedBody = serverRequest.bodyToMono(String.class)
                .doOnNext(objectValue -> {
                    exchange.getAttributes() // 请求参数, 放进请求域属性中
                            .put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                    requestParams.append(", 请求参数: ").append(objectValue);
                })
                .switchIfEmpty(Mono.defer(() -> Mono.empty()));

        BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
        return this.decoratorRequestAndResponse(exchange, chain, requestParams, bodyInserter);
    }

    private Mono readFormData(ServerWebExchange exchange, GatewayFilterChain chain,
                              StringBuilder requestParams, ServerRequest serverRequest) {
        AtomicReference multiValueMapAtomicReference = new AtomicReference<>();
        return serverRequest.formData()
                .doOnNext(objectValue -> {
                    exchange.getAttributes() // 请求参数, 放进请求域属性中
                            .put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue.toSinglevalueMap().toString());
                    multiValueMapAtomicReference.set(objectValue);
                    requestParams.append(objectValue.toSinglevalueMap());
                }).then(Mono.defer(() -> {
                            BodyInserter bodyInserter = BodyInserters
                                    .fromValue(multiValueMapAtomicReference.get());
                            return this.decoratorRequestAndResponse(exchange, chain, requestParams, bodyInserter);
                        }
                ));
    }

    private Mono decoratorRequestAndResponse(ServerWebExchange exchange, GatewayFilterChain chain,
                                             StringBuilder requestParams, BodyInserter bodyInserter) {
        ReadServerHttpResponse.ReadServerHttpResponseDecorator responseDecorator =
                readServerHttpResponse.getReadServerHttpResponseDecorator(exchange);
        HttpHeaders headers = new HttpHeaders();
        headers.putAll(exchange.getRequest().getHeaders());
        headers.remove(HttpHeaders.CONTENT_LENGTH);
        CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
        return bodyInserter.insert(outputMessage, new BodyInserterContext())
                .then(Mono.defer(() -> {
                    ServerHttpRequest decorator = this.decorate(exchange, headers, outputMessage);
                    return chain.filter(exchange.mutate().request(decorator).response(responseDecorator).build()).then(
                            Mono.fromRunnable(() -> {
                                this.outResponseLog(exchange, requestParams, responseDecorator);
                            })
                    );
                }));
    }

    private void outResponseLog(ServerWebExchange exchange, StringBuilder requestParams,
                                ReadServerHttpResponse.ReadServerHttpResponseDecorator responseDecorator) {
        String result = responseDecorator.getResult();
        Long oldTime = exchange.getAttribute(REQUEST_TIME_MILLIS_KEY);
        Long timeConsuming = System.currentTimeMillis() - oldTime;
        requestParams.append(", 响应参数: ").append(result)
                .append(", 请求耗时,单位毫秒: ").append(timeConsuming);
        log.info(requestParams.toString());
    }

    
    private ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers,
                                                CachedBodyOutputMessage outputMessage) {
        return new ServerHttpRequestDecorator(exchange.getRequest()) {
            public HttpHeaders getHeaders() {
                long contentLength = headers.getContentLength();
                HttpHeaders httpHeaders = new HttpHeaders();
                httpHeaders.putAll(headers);
                if (contentLength > 0L) {
                    httpHeaders.setContentLength(contentLength);
                } else {
                    httpHeaders.set("Transfer-Encoding", "chunked");
                }
                return httpHeaders;
            }

            public Flux getBody() {
                return outputMessage.getBody();
            }
        };
    }

    @Override
    public int getOrder() {
        return Integer.MIN_VALUE;
    }
}

1

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5582574.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存