2021-11-30 12:00:00左右 4个消息任务一直卡在创建中
2、问题分析 2.1、日志分析1个任务已消费,rpc 远程调用第三方服务上传8张图片素材后就holding住了;
3个任务无消费日志;
2.2、mq控制台消费分析4个消息任务都处于unack状态,说明4个任务都已经路由到消费端,根据此可以排除mq服务端阻塞的原因。初步猜测是由于客户端消费阻塞问题,下面方向将是着重去分析应用服务消费端。
3.3、应用消费端消费分析- 应用服务是双节点,根据日志分析出现问题的都是x.x.x.170节点,下面将着重分析170节点;
- 怀疑170节点服务会不会假死状态,让运维看了下服务的健康状态,并没有问题。
- 怀疑服务cpu负载和线程使用会不会存在满负荷,想通过普罗米修斯看下监控大盘,发现服务并未接入;(后续要计划接入)
- 由于看不了监控大盘,那就转而直接看jstack结果, 然后让运维jstack了170节点并导出到文件,来分析服务整个线程使用状态。从【图一】可以看出从2021-11-30 12:00:00左右执行上传图片素材的请求到2021-11-30 20:00:00 任然在运行中,从【图二】可以看出mq消费的线程一直处于waiting状态。到此处问题已经相对比较明了了,大概原因可以定位为由于上传图片素材到媒体渠道,在等待返回结果时由于和gateway的socket异常断连(并没有走到正常超时逻辑),而线程一直处于holding状态。
图一(请求facebook上传图片素材)
"thread-pool-2" #9901 prio=5 os_prio=0 cpu=516.49ms elapsed=434525.53s allocated=480M defined_classes=8 tid=0x00007fd4d416a000 nid=0x26bb runnable [0x00007fd466ef3000] java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(java.base@11.0.11/Native Method) at java.net.SocketInputStream.socketRead(java.base@11.0.11/SocketInputStream.java:115) at java.net.SocketInputStream.read(java.base@11.0.11/SocketInputStream.java:168) at java.net.SocketInputStream.read(java.base@11.0.11/SocketInputStream.java:140) at sun.security.ssl.SSLSocketInputRecord.read(java.base@11.0.11/SSLSocketInputRecord.java:478) at sun.security.ssl.SSLSocketInputRecord.readHeader(java.base@11.0.11/SSLSocketInputRecord.java:472) at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(java.base@11.0.11/SSLSocketInputRecord.java:70) at sun.security.ssl.SSLSocketImpl.readApplicationRecord(java.base@11.0.11/SSLSocketImpl.java:1364) at sun.security.ssl.SSLSocketImpl$AppInputStream.read(java.base@11.0.11/SSLSocketImpl.java:973) at java.io.BufferedInputStream.fill(java.base@11.0.11/BufferedInputStream.java:252) at java.io.BufferedInputStream.read1(java.base@11.0.11/BufferedInputStream.java:292) at java.io.BufferedInputStream.read(java.base@11.0.11/BufferedInputStream.java:351) - locked <0x000000009b249738> (a java.io.BufferedInputStream) at sun.net.www.http.HttpClient.parseHTTPHeader(java.base@11.0.11/HttpClient.java:754) at sun.net.www.http.HttpClient.parseHTTP(java.base@11.0.11/HttpClient.java:689) at sun.net.www.protocol.http.HttpURLConnection.getInputStream0(java.base@11.0.11/HttpURLConnection.java:1615) - locked <0x000000009b283d00> (a sun.net.www.protocol.https.DelegateHttpsURLConnection) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(java.base@11.0.11/HttpURLConnection.java:1520) - locked <0x000000009b283d00> (a sun.net.www.protocol.https.DelegateHttpsURLConnection) at java.net.HttpURLConnection.getResponseCode(java.base@11.0.11/HttpURLConnection.java:527) at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(java.base@11.0.11/HttpsURLConnectionImpl.java:334) at com.facebook.ads.sdk.APIRequest.readResponse(APIRequest.java:287) at com.facebook.ads.sdk.APIRequest.access$100(APIRequest.java:52) at com.facebook.ads.sdk.APIRequest$DefaultRequestExecutor.sendPost(APIRequest.java:588) at com.facebook.ads.sdk.APIRequest$DefaultRequestExecutor.execute(APIRequest.java:520) at com.facebook.ads.sdk.APIRequest.executeInternal(APIRequest.java:197) at com.facebook.ads.sdk.AdAccount$APIRequestCreateAdImage.execute(AdAccount.java:4225) at com.facebook.ads.sdk.AdAccount$APIRequestCreateAdImage.execute(AdAccount.java:4220) at cn.sino.ams.integration.facebook.MaterialApi.uploadImageToFaceBook(MaterialApi.java:120) at cn.sino.ams.integration.facebook.MaterialApi.uploadImage(MaterialApi.java:86) at cn.sino.ams.service.handler.CascadeCreateFacebookHandler.pushSingleMaterialToChannel(CascadeCreateFacebookHandler.java:723) at cn.sino.ams.service.handler.CascadeCreateFacebookHandler.lambda$pushMaterialToChannelByMultiThread$30(CascadeCreateFacebookHandler.java:690) at cn.sino.ams.service.handler.CascadeCreateFacebookHandler$$Lambda$1456/0x0000000100f6d840.get(Unknown Source) at java.util.concurrent.CompletableFuture$AsyncSupply.run(java.base@11.0.11/CompletableFuture.java:1700) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.11/ThreadPoolExecutor.java:1128) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.11/ThreadPoolExecutor.java:628) at java.lang.Thread.run(java.base@11.0.11/Thread.java:829)
图二(mq消费线程一直处于waiting状态)
"org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1" #55 prio=5 os_prio=0 cpu=23645.70ms elapsed=588114.33s allocated=111M defined_classes=409 tid=0x00007fd5d56e2800 nid=0x45 waiting on condition [0x00007fd512490000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method) - parking to wait for <0x000000009b235408> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(java.base@11.0.11/LockSupport.java:194) at java.util.concurrent.CompletableFuture$Signaller.block(java.base@11.0.11/CompletableFuture.java:1796) at java.util.concurrent.ForkJoinPool.managedBlock(java.base@11.0.11/ForkJoinPool.java:3128) at java.util.concurrent.CompletableFuture.waitingGet(java.base@11.0.11/CompletableFuture.java:1823) at java.util.concurrent.CompletableFuture.join(java.base@11.0.11/CompletableFuture.java:2043) at cn.sino.ams.service.handler.CascadeCreateFacebookHandler.pushMaterialToChannelByMultiThread(CascadeCreateFacebookHandler.java:709) at cn.sino.ams.service.handler.CascadeCreateFacebookHandler.createChannelMaterial(CascadeCreateFacebookHandler.java:604) at cn.sino.ams.service.handler.CascadeCreateFacebookHandler.handle(CascadeCreateFacebookHandler.java:134) at cn.sino.ams.service.impl.CampaignServiceImpl.handleSingleSubmitTask(CampaignServiceImpl.java:313) at cn.sino.ams.mq.consumer.CampaignSubmitConsumer.handleCampaignSubmit(CampaignSubmitConsumer.java:66) at cn.sino.ams.mq.consumer.CampaignSubmitConsumer$$FastClassBySpringCGLIB$$df332de7.invoke() at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:88) at cn.sino.ams.common.aop.LogMdcAspect.around(LogMdcAspect.java:32) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.11/Native Method) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.11/NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.11/DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(java.base@11.0.11/Method.java:566) at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:644) at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:633) at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:175) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:95) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691) at cn.sino.ams.mq.consumer.CampaignSubmitConsumer$$EnhancerBySpringCGLIB$$ef6e6065.handleCampaignSubmit( ) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.11/Native Method) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.11/NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.11/DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(java.base@11.0.11/Method.java:566) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:53) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:220) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1579) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1498) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer$$Lambda$1187/0x0000000100c9f440.invokeListener(Unknown Source) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.11/Native Method) at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.11/NativeMethodAccessorImpl.java:62) at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.11/DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(java.base@11.0.11/Method.java:566) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at brave.spring.rabbit.TracingRabbitListenerAdvice.invoke(TracingRabbitListenerAdvice.java:101) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) at org.springframework.amqp.rabbit.listener.$Proxy239.invokeListener(Unknown Source) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1486) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1477) at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1421) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:963) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1284) at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1190) at java.lang.Thread.run(java.base@11.0.11/Thread.java:829)
- 基于【3.3-4】的分析就产生了几个疑问:
疑问一:应用消费端侧网络调用客户端自身为啥没触发网络超时而结束线程?
答:分析facebook-java-business-sdk源码,从【图三】可以看出fb的sdk并没有设置请求连接超时和读超时的参数,正好验证了jstack里上传素材到渠道的线程一直处于holding的现象。
图三(facebook-java-business-sdk 上图片素材执行请求的源码)
疑问二:为啥一个任务消费阻塞会导致其他的mq消费任务停留在客户端的队列里?
答:分析rabbitmq源码,发现rabbitmq默认消息监听容器使用的是SimpleMessageListenerContainer,消费并发数concurrentConsumers默认是1,源码见【图四】。正好验证解释了为啥一个消费任务阻塞,其他三个任务处于客户端队列(ps: 消费客户端默认是可以从服务抓取多个消息,不会等待一个消息完全消费完成后再到mq服务端抓取消息)
图4(rabbitmq默认消息监听容器SimpleMessageListenerContainer)
疑问三:上传素材图片这里是多线程,并设置了超时时间,为啥没有触发超时?
答:经过本地调试验证,当使用whenComplete时,异步线程会一直等待处理结束,orTimeout只会在处理结束后调用future.get()或者join()才会触发超时异常。
图5(上传素材到媒体渠道的多线程代码)
3、问题根本原因上传素材到媒体渠道,和gateway的读取返回的socket异常断连, facebook sdk没有设置请求读超时时间,导致线程一直holding。
4、解决方案4.1、HttpsURLConnection 设置连接超时和读超时的时间,由于依赖的第三方sdk, 不建议直接修改第三方源码,避免后续升级问题,所以此方案被放弃。
4.2、应用消费端并发数调整成3,解决由于一个任务消费慢,导致其他任务阻塞等待。
(ps: 如果后续用户量上来,出现消费慢的现象,可以继续调整消费并发数以及服务器的cpu核数内存;或者增加服务器集群节点)
4.3、调整CompletableFuture多线程代码,去除whenComplete的使用,解决无法通过超时释放mq消费线程。
(ps: 经本地调试已确认,由于CompletableFuture本身设计原因,即使触发超时了,能释放消费线程,但异步处理任务的线程并不会释放,任然存在时间长了,发生次数多了会把CompletableFuture使用的线程池耗尽)
4.4、长期根本解决方案:媒体渠道的api都加上hystrix超时熔断机制。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)