rabbitmq异步任务未消费或者消费阻塞排查思路

rabbitmq异步任务未消费或者消费阻塞排查思路,第1张

rabbitmq异步任务未消费或者消费阻塞排查思路

1、问题现象

2021-11-30 12:00:00左右 4个消息任务一直卡在创建中

2、问题分析 2.1、日志分析

1个任务已消费,rpc 远程调用第三方服务上传8张图片素材后就holding住了;

3个任务无消费日志;

2.2、mq控制台消费分析

4个消息任务都处于unack状态,说明4个任务都已经路由到消费端,根据此可以排除mq服务端阻塞的原因。初步猜测是由于客户端消费阻塞问题,下面方向将是着重去分析应用服务消费端。

3.3、应用消费端消费分析
  1. 应用服务是双节点,根据日志分析出现问题的都是x.x.x.170节点,下面将着重分析170节点;
  2. 怀疑170节点服务会不会假死状态,让运维看了下服务的健康状态,并没有问题。
  3. 怀疑服务cpu负载和线程使用会不会存在满负荷,想通过普罗米修斯看下监控大盘,发现服务并未接入;(后续要计划接入)
  4. 由于看不了监控大盘,那就转而直接看jstack结果, 然后让运维jstack了170节点并导出到文件,来分析服务整个线程使用状态。从【图一】可以看出从2021-11-30 12:00:00左右执行上传图片素材的请求到2021-11-30 20:00:00 任然在运行中,从【图二】可以看出mq消费的线程一直处于waiting状态。到此处问题已经相对比较明了了,大概原因可以定位为由于上传图片素材到媒体渠道,在等待返回结果时由于和gateway的socket异常断连(并没有走到正常超时逻辑),而线程一直处于holding状态。

图一(请求facebook上传图片素材)

"thread-pool-2" #9901 prio=5 os_prio=0 cpu=516.49ms elapsed=434525.53s allocated=480M defined_classes=8 tid=0x00007fd4d416a000 nid=0x26bb runnable [0x00007fd466ef3000]

java.lang.Thread.State: RUNNABLE

at java.net.SocketInputStream.socketRead0(java.base@11.0.11/Native Method)

at java.net.SocketInputStream.socketRead(java.base@11.0.11/SocketInputStream.java:115)

at java.net.SocketInputStream.read(java.base@11.0.11/SocketInputStream.java:168)

at java.net.SocketInputStream.read(java.base@11.0.11/SocketInputStream.java:140)

at sun.security.ssl.SSLSocketInputRecord.read(java.base@11.0.11/SSLSocketInputRecord.java:478)

at sun.security.ssl.SSLSocketInputRecord.readHeader(java.base@11.0.11/SSLSocketInputRecord.java:472)

at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(java.base@11.0.11/SSLSocketInputRecord.java:70)

at sun.security.ssl.SSLSocketImpl.readApplicationRecord(java.base@11.0.11/SSLSocketImpl.java:1364)

at sun.security.ssl.SSLSocketImpl$AppInputStream.read(java.base@11.0.11/SSLSocketImpl.java:973)

at java.io.BufferedInputStream.fill(java.base@11.0.11/BufferedInputStream.java:252)

at java.io.BufferedInputStream.read1(java.base@11.0.11/BufferedInputStream.java:292)

at java.io.BufferedInputStream.read(java.base@11.0.11/BufferedInputStream.java:351)

- locked <0x000000009b249738> (a java.io.BufferedInputStream)

at sun.net.www.http.HttpClient.parseHTTPHeader(java.base@11.0.11/HttpClient.java:754)

at sun.net.www.http.HttpClient.parseHTTP(java.base@11.0.11/HttpClient.java:689)

at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(java.base@11.0.11/HttpURLConnection.java:1615)

- locked <0x000000009b283d00> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)

at sun.net.www.protocol.http.HttpURLConnection.getInputStream(java.base@11.0.11/HttpURLConnection.java:1520)

- locked <0x000000009b283d00> (a sun.net.www.protocol.https.DelegateHttpsURLConnection)

at java.net.HttpURLConnection.getResponseCode(java.base@11.0.11/HttpURLConnection.java:527)

at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(java.base@11.0.11/HttpsURLConnectionImpl.java:334)

at com.facebook.ads.sdk.APIRequest.readResponse(APIRequest.java:287)

at com.facebook.ads.sdk.APIRequest.access$100(APIRequest.java:52)

at com.facebook.ads.sdk.APIRequest$DefaultRequestExecutor.sendPost(APIRequest.java:588)

at com.facebook.ads.sdk.APIRequest$DefaultRequestExecutor.execute(APIRequest.java:520)

at com.facebook.ads.sdk.APIRequest.executeInternal(APIRequest.java:197)

at com.facebook.ads.sdk.AdAccount$APIRequestCreateAdImage.execute(AdAccount.java:4225)

at com.facebook.ads.sdk.AdAccount$APIRequestCreateAdImage.execute(AdAccount.java:4220)

at cn.sino.ams.integration.facebook.MaterialApi.uploadImageToFaceBook(MaterialApi.java:120)

at cn.sino.ams.integration.facebook.MaterialApi.uploadImage(MaterialApi.java:86)

at cn.sino.ams.service.handler.CascadeCreateFacebookHandler.pushSingleMaterialToChannel(CascadeCreateFacebookHandler.java:723)

at cn.sino.ams.service.handler.CascadeCreateFacebookHandler.lambda$pushMaterialToChannelByMultiThread$30(CascadeCreateFacebookHandler.java:690)

at cn.sino.ams.service.handler.CascadeCreateFacebookHandler$$Lambda$1456/0x0000000100f6d840.get(Unknown Source)

at java.util.concurrent.CompletableFuture$AsyncSupply.run(java.base@11.0.11/CompletableFuture.java:1700)

at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.11/ThreadPoolExecutor.java:1128)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.11/ThreadPoolExecutor.java:628)

at java.lang.Thread.run(java.base@11.0.11/Thread.java:829)

图二(mq消费线程一直处于waiting状态)

"org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1" #55 prio=5 os_prio=0 cpu=23645.70ms elapsed=588114.33s allocated=111M defined_classes=409 tid=0x00007fd5d56e2800 nid=0x45 waiting on condition [0x00007fd512490000]

java.lang.Thread.State: WAITING (parking)

at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method)

- parking to wait for <0x000000009b235408> (a java.util.concurrent.CompletableFuture$Signaller)

at java.util.concurrent.locks.LockSupport.park(java.base@11.0.11/LockSupport.java:194)

at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.11/CompletableFuture.java:1796)

at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.11/ForkJoinPool.java:3128)

at java.util.concurrent.CompletableFuture.waitingGet(java.base@11.0.11/CompletableFuture.java:1823)

at java.util.concurrent.CompletableFuture.join(java.base@11.0.11/CompletableFuture.java:2043)

at cn.sino.ams.service.handler.CascadeCreateFacebookHandler.pushMaterialToChannelByMultiThread(CascadeCreateFacebookHandler.java:709)

at cn.sino.ams.service.handler.CascadeCreateFacebookHandler.createChannelMaterial(CascadeCreateFacebookHandler.java:604)

at cn.sino.ams.service.handler.CascadeCreateFacebookHandler.handle(CascadeCreateFacebookHandler.java:134)

at cn.sino.ams.service.impl.CampaignServiceImpl.handleSingleSubmitTask(CampaignServiceImpl.java:313)

at cn.sino.ams.mq.consumer.CampaignSubmitConsumer.handleCampaignSubmit(CampaignSubmitConsumer.java:66)

at cn.sino.ams.mq.consumer.CampaignSubmitConsumer$$FastClassBySpringCGLIB$$df332de7.invoke()

at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)

at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)

at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)

at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)

at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88)

at cn.sino.ams.common.aop.LogMdcAspect.around(LogMdcAspect.java:32)

at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.11/Native Method)

at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.11/NativeMethodAccessorImpl.java:62)

at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.11/DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(java.base@11.0.11/Method.java:566)

at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644)

at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633)

at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)

at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175)

at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)

at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95)

at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)

at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)

at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)

at cn.sino.ams.mq.consumer.CampaignSubmitConsumer$$EnhancerBySpringCGLIB$$ef6e6065.handleCampaignSubmit()

at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.11/Native Method)

at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.11/NativeMethodAccessorImpl.java:62)

at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.11/DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(java.base@11.0.11/Method.java:566)

at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)

at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)

at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:53)

at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:220)

at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148)

at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133)

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1579)

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1498)

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer$$Lambda$1187/0x0000000100c9f440.invokeListener(Unknown Source)

at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.11/Native Method)

at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.11/NativeMethodAccessorImpl.java:62)

at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.11/DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(java.base@11.0.11/Method.java:566)

at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)

at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198)

at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)

at brave.spring.rabbit.TracingRabbitListenerAdvice.invoke(TracingRabbitListenerAdvice.java:101)

at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)

at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)

at org.springframework.amqp.rabbit.listener.$Proxy239.invokeListener(Unknown Source)

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1486)

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1477)

at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1421)

at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:963)

at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)

at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81)

at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1284)

at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1190)

at java.lang.Thread.run(java.base@11.0.11/Thread.java:829)
  1. 基于【3.3-4】的分析就产生了几个疑问:

疑问一:应用消费端侧网络调用客户端自身为啥没触发网络超时而结束线程?

答:分析facebook-java-business-sdk源码,从【图三】可以看出fb的sdk并没有设置请求连接超时和读超时的参数,正好验证了jstack里上传素材到渠道的线程一直处于holding的现象。

图三(facebook-java-business-sdk 上图片素材执行请求的源码) 

疑问二:为啥一个任务消费阻塞会导致其他的mq消费任务停留在客户端的队列里?

答:分析rabbitmq源码,发现rabbitmq默认消息监听容器使用的是SimpleMessageListenerContainer,消费并发数concurrentConsumers默认是1,源码见【图四】。正好验证解释了为啥一个消费任务阻塞,其他三个任务处于客户端队列(ps: 消费客户端默认是可以从服务抓取多个消息,不会等待一个消息完全消费完成后再到mq服务端抓取消息)

图4(rabbitmq默认消息监听容器SimpleMessageListenerContainer)

  

疑问三:上传素材图片这里是多线程,并设置了超时时间,为啥没有触发超时?

答:经过本地调试验证,当使用whenComplete时,异步线程会一直等待处理结束,orTimeout只会在处理结束后调用future.get()或者join()才会触发超时异常。

图5(上传素材到媒体渠道的多线程代码)

3、问题根本原因

上传素材到媒体渠道,和gateway的读取返回的socket异常断连, facebook sdk没有设置请求读超时时间,导致线程一直holding。

4、解决方案

4.1、HttpsURLConnection 设置连接超时和读超时的时间,由于依赖的第三方sdk, 不建议直接修改第三方源码,避免后续升级问题,所以此方案被放弃。

4.2、应用消费端并发数调整成3,解决由于一个任务消费慢,导致其他任务阻塞等待。

(ps: 如果后续用户量上来,出现消费慢的现象,可以继续调整消费并发数以及服务器的cpu核数内存;或者增加服务器集群节点)

4.3、调整CompletableFuture多线程代码,去除whenComplete的使用,解决无法通过超时释放mq消费线程。

(ps: 经本地调试已确认,由于CompletableFuture本身设计原因,即使触发超时了,能释放消费线程,但异步处理任务的线程并不会释放,任然存在时间长了,发生次数多了会把CompletableFuture使用的线程池耗尽)

4.4、长期根本解决方案:媒体渠道的api都加上hystrix超时熔断机制。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5677571.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存