1.0 修改请求体Gateway_版本 3.0.4
参考 ModifyRequestBodyGatewayFilterFactory实现
if (HttpMethod.POST.equals(method)) { // mediaType MediaType mediaType = request.getHeaders().getContentType(); // read & modify body ServerRequest serverRequest = ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders()); //重点 Mono2.0 获取请求体 2.1 自定义ReadBodyRoutePredicateFactorymodifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> { //因为约定了终端传参的格式,所以只考虑json的情况,如果是表单传参,请自行发挥 if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { requestParams.set(body); // 这里修改然后将修改后的字符串设置返回 return Mono.just(body); } return Mono.empty(); }).switchIfEmpty(Mono.defer(() -> Mono.empty())); BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class); HttpHeaders headers = new HttpHeaders(); headers.putAll(exchange.getRequest().getHeaders()); headers.remove("Content-Length"); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> { ServerHttpRequest decorator = this.decorate(exchange, headers, outputMessage); return chain.filter(exchange.mutate().request(decorator).build()); })).onErrorResume((throwable) -> this.release(exchange, outputMessage, (Throwable) throwable)); } public ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) { return new ServerHttpRequestDecorator(exchange.getRequest()) { public HttpHeaders getHeaders() { long contentLength = headers.getContentLength(); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(headers); if (contentLength > 0L) { httpHeaders.setContentLength(contentLength); } else { httpHeaders.set("Transfer-Encoding", "chunked"); } return httpHeaders; } public Flux getBody() { return outputMessage.getBody(); } }; } private Mono release(ServerWebExchange exchange, CachedBodyOutputMessage outputMessage, Throwable throwable) { Field cached = ReflectionUtils.findField(outputMessage.getClass(), "cached"); cached.setAccessible(true); try { return (boolean)cached.get(outputMessage) ? outputMessage.getBody().map(DataBufferUtils::release).then(Mono.error(throwable)) : Mono.error(throwable); } catch (IllegalAccessException e) { e.printStackTrace(); } return null; }
@Component public class CustomReadBodyRoutePredicateFactory extends AbstractRoutePredicateFactory2.2 配置文件配置{ protected static final Log log = LogFactory.getLog(CustomReadBodyRoutePredicateFactory.class); private List > messageReaders; public CustomReadBodyRoutePredicateFactory() { super(Config.class); this.messageReaders = HandlerStrategies.withDefaults().messageReaders(); } public CustomReadBodyRoutePredicateFactory(List > messageReaders) { super(Config.class); this.messageReaders = messageReaders; } @Override public AsyncPredicate applyAsync(Config config) { return new AsyncPredicate () { @Override public Publisher apply(ServerWebExchange exchange) { Class inClass = config.getInClass(); Object cachedBody = exchange.getAttribute("cachedRequestBodyObject"); if (cachedBody != null) { try { boolean test = config.predicate.test(cachedBody); exchange.getAttributes().put("read_body_predicate_test_attribute", test); return Mono.just(test); } catch (ClassCastException var6) { if (CustomReadBodyRoutePredicateFactory.log.isDebugEnabled()) { CustomReadBodyRoutePredicateFactory.log.debug("Predicate test failed because class in predicate does not match the cached body object", var6); } return Mono.just(false); } } else { return ServerWebExchangeUtils.cacheRequestBodyAndRequest(exchange, (serverHttpRequest) -> { return ServerRequest.create(exchange.mutate().request(serverHttpRequest).build(), CustomReadBodyRoutePredicateFactory.this.messageReaders).bodyToMono(inClass).doOnNext((objectValue) -> { exchange.getAttributes().put("cachedRequestBodyObject", objectValue); }).map((objectValue) -> { return config.getPredicate().test(objectValue); }).thenReturn(true); }); } } @Override public String toString() { return String.format("ReadBody: %s", config.getInClass()); } }; } @Override public Predicate apply(Config config) { throw new UnsupportedOperationException("ReadBodyPredicateFactory is only async."); } public static class Config { private Class inClass; private Predicate predicate; private Map hints; public Config() { } public Class getInClass() { return this.inClass; } public Config setInClass(Class inClass) { this.inClass = inClass; return this; } public Predicate getPredicate() { return this.predicate; } public Config setPredicate(Predicate predicate) { this.predicate = predicate; return this; } public Config setPredicate(Class inClass, Predicate predicate) { this.setInClass(inClass); this.predicate = predicate; return this; } public Map getHints() { return this.hints; } public Config setHints(Map hints) { this.hints = hints; return this; } } @Bean public Predicate bodyPredicate(){ return o -> true; } }
spring: cloud: gateway: routes: - id: master-app-api uri: https://test-master-app-api.wanshifu.com predicates: - Path=/app/** - name: CustomReadBody args: inClass: '#{T(String)}' predicate: '#{@bodyPredicate}' #注入实现predicate接口类 - id: master-order-api uri: https://test-order-app-api.wanshifu.com predicates: - Path=/order/** - name: CustomReadBody args: inClass: '#{T(String)}' predicate: '#{@bodyPredicate}' #注入实现predicate接口类2.3 过滤器内使用
if (HttpMethod.POST.equals(method)) { log.info("Body: [{}]", (String) exchange.getAttribute("cachedRequestBodyObject")); }2.4 另一种方式获取请求体
参考ReadBodyRoutePredicateFactory实现.
不会带来requestbody取不全问题, 但是缺点在于只能读一次, 如果在其他过滤器再次获取请求体会报java.lang.IllegalStateException: only one connection receive subscriber allowed.
@Service public class GlobalRequestLogCollectFilter implements GlobalFilter, Ordered { private static final Logger log = LogManager.getLogger(GlobalRequestLogCollectFilter.class); private static final ObjectMapper objectMapper = new ObjectMapper(); private static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject"; @Override public Monofilter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); String urlPath = request.getPath().pathWithinApplication().value(); StringBuilder requestParams = new StringBuilder(); HttpMethod method = request.getMethod(); String contentType = request.getHeaders().getFirst(HttpHeaders.CONTENT_TYPE); // 获取请求那个路由 String routeId = exchange. getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR).getId(); requestParams.append("GlobalRequestLogCollectFilter") .append("通过网关的请求, 路由ID:").append(routeId).append("请求路径为:").append(urlPath); if (HttpMethod.GET.equals(method)) { //.... } if (HttpMethod.POST.equals(method) && MediaType.APPLICATION_JSON_VALUE.equals(contentType)) { ServerRequest serverRequest = ServerRequest.create(exchange.mutate().request(request).build(), HandlerStrategies.withDefaults().messageReaders()); Mono requestBody = serverRequest.bodyToMono(String.class) .doOnNext(objectValue -> exchange.getAttributes() // 放进请求域属性中 .put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue)); requestBody.subscribe((e) -> requestParams.append("请求参数: ").append(e).append(" ")); } return chain.filter(exchange); } @Override public int getOrder() { return Integer.MIN_VALUE; } }
3.0 修改响应体建议使用修改请求体的方式(参考扩展), 获取请求体, 读取完重新封装一个新的请求, 其他过滤器也能继续读取
参考ModifyResponseBodyGatewayFilterFactory 实现
public class MyModifiedServerHttpResponse extends ServerHttpResponseDecorator { private final Map3.1 过滤器中使用messageBodyDecoders; private final Map messageBodyEncoders; private final List > messageReaders; private final ServerWebExchange exchange; public MyModifiedServerHttpResponse(ServerWebExchange exchange) { super(exchange.getResponse()); this.exchange = exchange; this.messageReaders = exchange.getApplicationContext().getBean(ServerCodecConfigurer.class).getReaders(); this.messageBodyDecoders = exchange.getApplicationContext().getBeansOfType(MessageBodyDecoder.class); this.messageBodyEncoders = exchange.getApplicationContext().getBeansOfType(MessageBodyEncoder.class); } @SuppressWarnings("unchecked") @Override public Mono writeWith(Publisher extends DataBuffer> body) { if (body instanceof Flux) { Flux extends DataBuffer> fluxBody = (Flux extends DataBuffer>) body; String originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType); ClientResponse clientResponse = this.prepareClientResponse(fluxBody, httpHeaders); Mono responseMono = extractBody(exchange, clientResponse, String.class); // 这里修改响应体 responseMono = responseMono.map((e) -> { return "修改响应体"; }); BodyInserter bodyInserter = BodyInserters.fromPublisher(responseMono, String.class); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, exchange.getResponse().getHeaders()); return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> { Mono messageBody = this.writeBody(getDelegate(), outputMessage, String.class); HttpHeaders headers = getDelegate().getHeaders(); if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING) || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) { messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount())); } // TODO: fail if isStreamingMediaType? return getDelegate().writeWith(messageBody); })); } // if body is not a flux. never got there. return super.writeWith(body); } private Mono extractBody(ServerWebExchange exchange, ClientResponse clientResponse, Class inClass) { // if inClass is byte[] then just return body, otherwise check if // decoding required if (byte[].class.isAssignableFrom(inClass)) { return clientResponse.bodyToMono(inClass); } List encodingHeaders = exchange.getResponse().getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING); for (String encoding : encodingHeaders) { MessageBodyDecoder decoder = messageBodyDecoders.get(encoding); if (decoder != null) { return clientResponse.bodyToMono(byte[].class).publishOn(Schedulers.parallel()).map(decoder::decode) .map(bytes -> exchange.getResponse().bufferFactory().wrap(bytes)) .map(buffer -> prepareClientResponse(Mono.just(buffer), exchange.getResponse().getHeaders())) .flatMap(response -> response.bodyToMono(inClass)); } } return clientResponse.bodyToMono(inClass); } private ClientResponse prepareClientResponse(Publisher extends DataBuffer> body, HttpHeaders httpHeaders) { ClientResponse.Builder builder; builder = ClientResponse.create(exchange.getResponse().getStatusCode(),this.messageReaders); return builder.headers(headers -> headers.putAll(httpHeaders)).body(Flux.from(body)).build(); } private Mono writeBody(ServerHttpResponse httpResponse, CachedBodyOutputMessage message, Class> outClass) { Flux body = message.getBody(); Mono response = DataBufferUtils.join(body); if (byte[].class.isAssignableFrom(outClass)) { return response; } List encodingHeaders = httpResponse.getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING); for (String encoding : encodingHeaders) { MessageBodyEncoder encoder = messageBodyEncoders.get(encoding); if (encoder != null) { DataBufferFactory dataBufferFactory = httpResponse.bufferFactory(); response = response.publishOn(Schedulers.parallel()).map(buffer -> { byte[] encodedResponse = encoder.encode(buffer); DataBufferUtils.release(buffer); return encodedResponse; }).map(dataBufferFactory::wrap); } } return response; } }
@Service public class GlobalRequestLogCollectFilter implements GlobalFilter, Ordered { private static final Logger log = LogManager.getLogger(GlobalRequestLogCollectFilter.class); @Override public Mono4.0 读取响应体filter(ServerWebExchange exchange, GatewayFilterChain chain) { return chain.filter(exchange.mutate().response(new MyModifiedServerHttpResponse(exchange)).build()); } @Override public int getOrder() { return Integer.MIN_VALUE; // 这里必须是优先级小于0 } }
参考修改响应体实现
@Component public class ReadServerHttpResponse { private final Map扩展 使用的全局过滤器打印请求与响应日志messageBodyDecoders; private final static Logger logger = LogManager.getLogger(ReadServerHttpResponseDecorator.class); private final static Set notReadMediaTypes = new HashSet<>(Arrays.asList( MediaType.APPLICATION_OCTET_STREAM_VALUE, MediaType.APPLICATION_PDF_VALUE, MediaType.IMAGE_GIF_VALUE, MediaType.IMAGE_JPEG_VALUE, MediaType.IMAGE_PNG_VALUE )); public ReadServerHttpResponse(Set bodyDecoders) { this.messageBodyDecoders = bodyDecoders.stream() .collect(Collectors.toMap(MessageBodyDecoder::encodingType, identity())); } public ReadServerHttpResponseDecorator getReadServerHttpResponseDecorator(ServerWebExchange exchange) { return new ReadServerHttpResponseDecorator(exchange); } protected class ReadServerHttpResponseDecorator extends ServerHttpResponseDecorator { private final ServerWebExchange exchange; private Mono result; public ReadServerHttpResponseDecorator(ServerWebExchange exchange) { super(exchange.getResponse()); this.exchange = exchange; } public Mono getResult() { return this.result; } @Override public Mono writeWith(Publisher extends DataBuffer> body) { // 排除流响应, 不读取日志 ServerHttpResponse response = exchange.getResponse(); MediaType contentType = response.getHeaders().getContentType(); if (notReadMediaTypes.contains(contentType.toString())) { this.result = Mono.just(contentType.toString() + "类型, 不读取响应日志!"); return getDelegate().writeWith(body); } // 读取响应正文日志 Flux extends DataBuffer> bodyMono = Flux.from(body); Mono bufferMono = bodyMono.collectList().map((dataBuffer) -> { DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); DefaultDataBuffer join = factory.join(dataBuffer); int readableByteCount = join.readableByteCount(); byte[] content = new byte[readableByteCount]; join.read(content); // 释放之前的数据缓冲区 DataBufferUtils.release(join); this.result = this.getRspString(content); return response.bufferFactory().wrap(content); }); return getDelegate().writeWith(bufferMono); } private Mono getRspString(byte[] contentArray) { return Mono.just(contentArray) .filter(Objects::nonNull) .map((content) -> { String str = null; List encodingHeaders = exchange.getResponse().getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING); for (String encoding : encodingHeaders) { MessageBodyDecoder decoder = ReadServerHttpResponse.this.messageBodyDecoders.get(encoding); if (decoder != null) { byte[] decode = decoder.decode(content); str = new String(decode, Charset.defaultCharset()); } } if (StringUtil.isNullOrEmpty(str)) { str = new String(content, Charset.defaultCharset()); } return str; }) .timeout(Duration.ofSeconds(5)); } } }
MyModifiedServerHttpResponse
模拟ModifyResponseBodyGatewayFilterFactory 实现的
import org.reactivestreams.Publisher; import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage; import org.springframework.cloud.gateway.filter.factory.rewrite.MessageBodyDecoder; import org.springframework.cloud.gateway.filter.factory.rewrite.MessageBodyEncoder; import org.springframework.cloud.gateway.support.BodyInserterContext; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.codec.ServerCodecConfigurer; import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServerHttpResponseDecorator; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.BodyInserter; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import static java.util.function.Function.identity; import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR; @Component public class ReadServerHttpResponse { private final MapmessageBodyDecoders; private final Map messageBodyEncoders; private final List > messageReaders; public ReadServerHttpResponse(ServerCodecConfigurer codecConfigurer, Set bodyDecoders, Set bodyEncoders) { this.messageReaders = codecConfigurer.getReaders(); this.messageBodyDecoders = bodyDecoders.stream() .collect(Collectors.toMap(MessageBodyDecoder::encodingType, identity())); this.messageBodyEncoders = bodyEncoders.stream() .collect(Collectors.toMap(MessageBodyEncoder::encodingType, identity())); } public ReadServerHttpResponseDecorator getReadServerHttpResponseDecorator(ServerWebExchange exchange) { return new ReadServerHttpResponseDecorator(exchange); } public class ReadServerHttpResponseDecorator extends ServerHttpResponseDecorator { private final ServerWebExchange exchange; private String result; public ReadServerHttpResponseDecorator(ServerWebExchange exchange) { super(exchange.getResponse()); this.exchange = exchange; } public String getResult() { return result; } @Override public Mono writeWith(Publisher extends DataBuffer> body) { String originalResponseContentType = this.exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType); ClientResponse clientResponse = this.prepareClientResponse(body, httpHeaders); Mono responseMono = this.extractBody(this.exchange, clientResponse, String.class); responseMono = responseMono.doOnSuccess((e) -> this.result = e); BodyInserter bodyInserter = BodyInserters.fromPublisher(responseMono, String.class); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, exchange.getResponse().getHeaders()); return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> { Mono messageBody = this.writeBody(getDelegate(), outputMessage, String.class); HttpHeaders headers = getDelegate().getHeaders(); if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING) || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) { messageBody = messageBody.doOnNext(data -> headers.setContentLength(data.readableByteCount())); } return getDelegate().writeWith(messageBody); })); } private Mono extractBody(ServerWebExchange exchange, ClientResponse clientResponse, Class inClass) { if (byte[].class.isAssignableFrom(inClass)) { return clientResponse.bodyToMono(inClass); } List encodingHeaders = exchange.getResponse().getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING); for (String encoding : encodingHeaders) { MessageBodyDecoder decoder = ReadServerHttpResponse.this.messageBodyDecoders.get(encoding); if (decoder != null) { return clientResponse.bodyToMono(byte[].class).publishOn(Schedulers.parallel()).map(decoder::decode) .map(bytes -> exchange.getResponse().bufferFactory().wrap(bytes)) .map(buffer -> prepareClientResponse(Mono.just(buffer), exchange.getResponse().getHeaders())) .flatMap(response -> response.bodyToMono(inClass)); } } return clientResponse.bodyToMono(inClass); } private ClientResponse prepareClientResponse(Publisher extends DataBuffer> body, HttpHeaders httpHeaders) { ClientResponse.Builder builder; builder = ClientResponse.create(exchange.getResponse().getStatusCode(), ReadServerHttpResponse.this.messageReaders); return builder.headers(headers -> headers.putAll(httpHeaders)).body(Flux.from(body)).build(); } private Mono writeBody(ServerHttpResponse httpResponse, CachedBodyOutputMessage message, Class> outClass) { Flux body = message.getBody(); Mono response = DataBufferUtils.join(body); if (byte[].class.isAssignableFrom(outClass)) { return response; } List encodingHeaders = httpResponse.getHeaders().getOrEmpty(HttpHeaders.CONTENT_ENCODING); for (String encoding : encodingHeaders) { MessageBodyEncoder encoder = ReadServerHttpResponse.this.messageBodyEncoders.get(encoding); if (encoder != null) { DataBufferFactory dataBufferFactory = httpResponse.bufferFactory(); response = response.publishOn(Schedulers.parallel()).map(buffer -> { byte[] encodedResponse = encoder.encode(buffer); DataBufferUtils.release(buffer); return encodedResponse; }).map(dataBufferFactory::wrap); } } return response; } } }
GlobalRequestLogCollectFilter
读取请求体参数, 参考ModifyRequestBodyGatewayFilterFactory实现
import com.wshifu.filter.config.ReadServerHttpResponse; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage; import org.springframework.cloud.gateway.route.Route; import org.springframework.cloud.gateway.support.BodyInserterContext; import org.springframework.cloud.gateway.support.ServerWebExchangeUtils; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.stereotype.Service; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyInserter; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.server.HandlerStrategies; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @Service public class GlobalRequestLogCollectFilter implements GlobalFilter, Ordered { @Autowired private ReadServerHttpResponse readServerHttpResponse; private static final Logger log = LogManager.getLogger(GlobalRequestLogCollectFilter.class); private static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject"; private static final String REQUEST_TIME_MILLIS_KEY = "requestTimeMillisKey"; @Override public Monofilter(ServerWebExchange exchange, GatewayFilterChain chain) { exchange.getAttributes().put(REQUEST_TIME_MILLIS_KEY, System.currentTimeMillis()); ServerHttpRequest request = exchange.getRequest(); String urlPath = request.getPath().pathWithinApplication().value(); StringBuilder requestParams = new StringBuilder(); HttpMethod method = request.getMethod(); MediaType contentType = request.getHeaders().getContentType(); // 获取请求那个路由 String routeId = exchange. getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR).getId(); requestParams.append("GatewayService") .append(", 通过网关的请求, 路由ID: ").append(routeId) .append(", 请求路径为:").append(urlPath) .append(", 请求方式: ").append(method.name()) .append(", 请求参数: "); if (HttpMethod.GET.equals(method)) { Map valueMap = Optional.ofNullable(request.getQueryParams()) .map(MultiValueMap::toSinglevalueMap).orElseGet(HashMap::new); exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, valueMap); requestParams.append(valueMap).append(" "); } if (HttpMethod.POST.equals(method) && !MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) { try { ServerRequest serverRequest = ServerRequest.create(exchange, HandlerStrategies.withDefaults().messageReaders()); // 读取表单日志 if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) { return this.readFormData(exchange, chain, requestParams, serverRequest); } // 读取json日志 if (MediaType.APPLICATION_JSON.isCompatibleWith(contentType)) { return this.readJsonData(exchange, chain, requestParams, serverRequest); } } catch (Exception e) { log.error("GlobalRequestLogCollectFilter, 读取请求日志异常: {}", e); } } // 普通请求日志 ReadServerHttpResponse.ReadServerHttpResponseDecorator responseDecorator = readServerHttpResponse.getReadServerHttpResponseDecorator(exchange); return chain.filter(exchange.mutate().response(responseDecorator).build()) .then(Mono.fromRunnable(() -> this.outResponseLog(exchange, requestParams, responseDecorator))); } private Mono readJsonData(ServerWebExchange exchange, GatewayFilterChain chain, StringBuilder requestParams, ServerRequest serverRequest) { Mono modifiedBody = serverRequest.bodyToMono(String.class) .doOnNext(objectValue -> { exchange.getAttributes() // 请求参数, 放进请求域属性中 .put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue); requestParams.append(", 请求参数: ").append(objectValue); }) .switchIfEmpty(Mono.defer(() -> Mono.empty())); BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class); return this.decoratorRequestAndResponse(exchange, chain, requestParams, bodyInserter); } private Mono readFormData(ServerWebExchange exchange, GatewayFilterChain chain, StringBuilder requestParams, ServerRequest serverRequest) { AtomicReference multiValueMapAtomicReference = new AtomicReference<>(); return serverRequest.formData() .doOnNext(objectValue -> { exchange.getAttributes() // 请求参数, 放进请求域属性中 .put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue.toSinglevalueMap().toString()); multiValueMapAtomicReference.set(objectValue); requestParams.append(objectValue.toSinglevalueMap()); }).then(Mono.defer(() -> { BodyInserter bodyInserter = BodyInserters .fromValue(multiValueMapAtomicReference.get()); return this.decoratorRequestAndResponse(exchange, chain, requestParams, bodyInserter); } )); } private Mono decoratorRequestAndResponse(ServerWebExchange exchange, GatewayFilterChain chain, StringBuilder requestParams, BodyInserter bodyInserter) { ReadServerHttpResponse.ReadServerHttpResponseDecorator responseDecorator = readServerHttpResponse.getReadServerHttpResponseDecorator(exchange); HttpHeaders headers = new HttpHeaders(); headers.putAll(exchange.getRequest().getHeaders()); headers.remove(HttpHeaders.CONTENT_LENGTH); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); return bodyInserter.insert(outputMessage, new BodyInserterContext()) .then(Mono.defer(() -> { ServerHttpRequest decorator = this.decorate(exchange, headers, outputMessage); return chain.filter(exchange.mutate().request(decorator).response(responseDecorator).build()).then( Mono.fromRunnable(() -> { this.outResponseLog(exchange, requestParams, responseDecorator); }) ); })); } private void outResponseLog(ServerWebExchange exchange, StringBuilder requestParams, ReadServerHttpResponse.ReadServerHttpResponseDecorator responseDecorator) { String result = responseDecorator.getResult(); Long oldTime = exchange. getAttribute(REQUEST_TIME_MILLIS_KEY); Long timeConsuming = System.currentTimeMillis() - oldTime; requestParams.append(", 响应参数: ").append(result) .append(", 请求耗时,单位毫秒: ").append(timeConsuming); log.info(requestParams.toString()); } private ServerHttpRequestDecorator decorate(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) { return new ServerHttpRequestDecorator(exchange.getRequest()) { public HttpHeaders getHeaders() { long contentLength = headers.getContentLength(); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(headers); if (contentLength > 0L) { httpHeaders.setContentLength(contentLength); } else { httpHeaders.set("Transfer-Encoding", "chunked"); } return httpHeaders; } public Flux getBody() { return outputMessage.getBody(); } }; } @Override public int getOrder() { return Integer.MIN_VALUE; } }
1
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)