一、问题简单描述,消息中间件使用的是kafka,消费者接收消息,发起restmplate同步请求,接收返回的数据入库。
优化方案思考:
kafka配置:
spring: kafka: bootstrap-servers: localhost template: default-topic: update-test consumer: # 指定默认消费者group id group-id: default-group # 一次最多拉取消息的数量 max-poll-records: 1 # 最早未被消费的offset, 若设置为earliest,那么会从头开始读partition auto-offset-reset: earliest listener: # 拉取消息类型:SINGLE、BATCH type: SINGLE # 指定listener 容器中的线程数,用于提高并发量 concurrency: 10 properties: max: poll: interval: ms: 600000
配置中观察到是单次拉取的,后面改成批量拉取,发现速度没什么变化,于是观察消费者处理的代码逻辑,发现会执行restmplate同步请求,而每次请求都有几秒的耗时,于是想加快消费者处理速度,于是实现了AsyncConfigurer
@Slf4j @Configuration public class ProcessorAsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(50000); executor.setKeepAliveSeconds(300); executor.setWaitForTasksToCompleteonShutdown(true); executor.setThreadNamePrefix("ProcessorAsync-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); executor.initialize(); log.info("TrackingProcessorAsyncConfig Load - {}", System.currentTimeMillis()); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new ProcessorAsyncExceptionHandler(); } }
在消费者处理方法上面加了@Async注解。
单元测试:模拟了100次的请求,发现加了异步注解后消费速率明显提升,之后上测试环境,暂时没发现问题,但是因此埋下了生产bug,读者可以先自行思考这其中可能引起的问题。
生产问题描述:日志中出现大量的read time out 请求,前端发起查询请求,一直在等待,没有返回查询结果。
问题分析与思考:可能引起了阻塞,观察Kibana日志,发现消费者的入口打印了日志,但是在restmplate发起请求后,后续日志没有打印,最终报错都是超时请求,被调用端的服务观察也是正常的,于是定位是否是restmplate连接池不够,导致的线程阻塞。
观察restmplate配置类,果然没有配置连接池等信息,于是修改代码:
import org.apache.http.conn.HttpClientConnectionManager; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.springframework.boot.web.client.RestTemplateBuilder; import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.client.ClientHttpRequestFactory; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.web.client.RestTemplate; @Configuration public class RestTemplateConfig { @Bean("loadBalancedRestTemplate") @LoadBalanced public RestTemplate loadBalancedRestTemplate(RestTemplateBuilder builder) { RestTemplate restTemplate = builder.build(); restTemplate.setRequestFactory(clientHttpRequestFactory()); return restTemplate; } @Bean public RestTemplate restTemplate(RestTemplateBuilder builder) { RestTemplate restTemplate = builder.build(); restTemplate.setRequestFactory(clientHttpRequestFactory()); return restTemplate; } @Bean public HttpClientBuilder httpClientBuilder() { HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); // 设置HTTP连接管理器 httpClientBuilder.setConnectionManager(poolingConnectionManager()); return httpClientBuilder; } @Bean public ClientHttpRequestFactory clientHttpRequestFactory() { HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory(); clientHttpRequestFactory.setHttpClient(httpClientBuilder().build()); clientHttpRequestFactory.setConnectTimeout(30000); // 连接超时,毫秒 clientHttpRequestFactory.setReadTimeout(30000); // 读写超时,毫秒 return clientHttpRequestFactory; } @Bean public HttpClientConnectionManager poolingConnectionManager() { PoolingHttpClientConnectionManager poolingConnectionManager = new PoolingHttpClientConnectionManager(); poolingConnectionManager.setMaxTotal(300); // 连接池最大连接数 poolingConnectionManager.setDefaultMaxPerRoute(100); // 每个主机的并发 return poolingConnectionManager; } }
结果发现并没有效果。
观察Kafka消费情况,发现在服务发布生产环境的时候消息堆积数变成0,之前都是有几千条消息的堆积,这次加了异步注解后,全部消息被消费,于是分析是否是因为加了@Async引起的,观察线程的队列也是配置了50000,在加入异步注解后,消费者启动了自定义线程,把kafka的消息全部拉取下来消费完了,之后要发起restmplate请求,而每个请求都比较耗时,从而引起了线程阻塞。
总结:@Async确实可以加快程序的处理速度,充分利用CPU多核的优势,但不是加了@Async就一定快,本例子中,也破坏了队列削峰限流的初衷,将所有压力都放到了消费者这边。从原本的查询慢反向优化成不仅查询慢还查询不出。
其他优化方案:此例子中就算修改线程池配置,也不能解决问题。可以从其他思路去改进,比如把restmplate同步请求的方式解耦开来,我们只需把消息丢到被调用方,被调用方处理完,在丢到kafka,我们只需做对应的监听。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)