官方提供的rabbitMQ探针链路的断开原因分析:
1.原生Consumer分析下面是原生 apm-rabbitmq-5.x-plugin 中plugin定义拦截点代码可以看出加强 com.rabbitmq.client.Consumer类 handleDelivery方法。
public class RabbitMQConsumerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQConsumerInterceptor"; public static final String ENHANCE_CLASS_PRODUCER = "com.rabbitmq.client.Consumer"; public static final String ENHANCE_METHOD_DISPATCH = "handleDelivery"; public static final String INTERCEPTOR_ConSTRUCTOR = "org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQProducerAndConsumerConstructorInterceptor"; @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[] { new ConstructorInterceptPoint() { @Override public ElementMatchergetConstructorMatcher() { return takesArgumentWithType(0, "com.rabbitmq.client.impl.AMQConnection"); } @Override public String getConstructorInterceptor() { return INTERCEPTOR_CONSTRUCTOR; } } }; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[] { new DeclaredInstanceMethodsInterceptPoint() { @Override public ElementMatcher getMethodsMatcher() { return named(ENHANCE_METHOD_DISPATCH).and(takesArgumentWithType(2, "com.rabbitmq.client.AMQP$BasicProperties")); } @Override public String getMethodsInterceptor() { return INTERCEPTOR_CLASS; } @Override public boolean isOverrideArgs() { return false; } } }; } @Override protected ClassMatch enhanceClass() { return HierarchyMatch.byHierarchyMatch(new String[] {ENHANCE_CLASS_PRODUCER}); } }
对这个方法做了如下增强,主要是通过BasicProperties中获取链路相关信息放入CarrierItem中。
public class RabbitMQConsumerInterceptor implements InstanceMethodsAroundInterceptor { public static final String OPERATE_NAME_PREFIX = "RabbitMQ/"; public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer"; @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable { ContextCarrier contextCarrier = new ContextCarrier(); String url = (String) objInst.getSkyWalkingDynamicField(); Envelope envelope = (Envelope) allArguments[1]; AMQP.BasicProperties properties = (AMQP.BasicProperties) allArguments[2]; AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Topic/" + envelope.getExchange() + "Queue/" + envelope .getRoutingKey() + CONSUMER_OPERATE_NAME_SUFFIX, null).start(System.currentTimeMillis()); Tags.MQ_BROKER.set(activeSpan, url); Tags.MQ_TOPIC.set(activeSpan, envelope.getExchange()); Tags.MQ_QUEUE.set(activeSpan, envelope.getRoutingKey()); activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER); SpanLayer.asMQ(activeSpan); CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); if (properties.getHeaders() != null && properties.getHeaders().get(next.getHeadKey()) != null) { next.setHeadValue(properties.getHeaders().get(next.getHeadKey()).toString()); } } ContextManager.extract(contextCarrier); } @Override public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable { ContextManager.stopSpan(); return ret; } @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) { ContextManager.activeSpan().log(t); } }
接着找到 com.rabbitmq.client.Consumer类 handleDelivery方法 发现是一个接口
package com.rabbitmq.client; import com.rabbitmq.client.AMQP.BasicProperties; import java.io.IOException; public interface Consumer { void handleConsumeOk(String var1); void handleCancelOk(String var1); void handleCancel(String var1) throws IOException; void handleShutdownSignal(String var1, ShutdownSignalException var2); void handleRecoverOk(String var1); void handleDelivery(String var1, Envelope var2, BasicProperties var3, byte[] var4) throws IOException; }
寻找这个方法的调用链
ChannelN.processAsync->ChannelN.processDelivery->ConsumerDispatcher.handleDelivery AbstractMessageListenerContainer类中executeListener ->doExecuteListener-> invokeListener ->ContainerDelegate.invokeListener (这个是函数接口)
this._channel0 = new AMQChannel(this, 0) { @Override public boolean processAsync(Command c) throws IOException { return getConnection().processControlCommand(c); } }; public abstract boolean processAsync(Command command) throws IOException;
这个方法在 ConsumerDispatcher 被使用,
package com.rabbitmq.client.impl; final class ConsumerDispatcher { public void handleDelivery(final Consumer delegate, final String consumerTag, final Envelope envelope, final AMQP.BasicProperties properties, final byte[] body) throws IOException { executeUnlessShuttingDown( new Runnable() { @Override public void run() { try { delegate.handleDelivery(consumerTag, envelope, properties, body); } catch (Throwable ex) { connection.getExceptionHandler().handleConsumerException( channel, ex, delegate, consumerTag, "handleDelivery"); } } }); } }2.基础逻辑分析
原生的rabbitmq 插件是对ConsumerDispatcher 类 handleDelivery方法增强只是将消息中带的链路信息传到 handleDelivery方法新建的线程中。如果使用spring整合的方式消费,具体实现类里面方法handleDelivery又会将消息放到一个blockqueue队列中,然后由另外线程去消费这个消息。这也就是链路信息断掉的原因。
spring提供的实现内容分为两步:
(下面解释 相同类里面方法调用用空格隔开调用不同类的方法会加上类名且加上箭头->)
分析代码源码版本为
spring-amqp
1.消息放入blockqueue流程
ConsumerDispatcher handleDelivery 的run()delegate.handleDelivery 将消息放进 BlockingQueue
具体 调用链 ConsumerDecorator handleDelivery -> InternalConsumer handleDelivery (由 BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body)); 这行代码放入BlockingQueue 中)
2.消费消息,应用于@RabbitListener注解的方法
SimpleMessageListenerContainer 里面静态类 AsyncMessageProcessingConsumer(该类是个runnable) 异步线程 run() 循环调用 receiveAndExecute doReceiveAndExecute consumer.nextMessage获取队列中的message
doReceiveAndExecute 调用父抽象类 AbstractMessageListenerContainer executeListener invokeListener actualInvokeListener doInvokeListener ->MessagingMessageListenerAdapter onMessage invokeHandler 最后调用自己定义带@RabbitListener的方法。
辅助驱动这两个步骤的线程池
1.ConsumerWorkService 内部类 WorkPoolRunnable 运行 ConsumerDispatcher run方法,内部有个线程池。
2.AbstractMessageListenerContainer 中属性 “private Executor taskExecutor = new SimpleAsyncTaskExecutor();” 自定义了一个Executor,大致实现了限制并发度的功能。
3.RabbitListenerEndpointRegistry start() SimpleMessageListenerContainer doStart方法 -> 用 taskExecutor 启动了 AsyncMessageProcessingConsumer(runnable)
知道断开原因就开始实现连接链路插件改写
增强invokeListener方法
public class RabbitMQConsumerInvokeInstrumentation extends ClassInstanceMethodsEnhancePluginDefine { public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQConsumerInvokeInterceptor"; public static final String ENHANCE_CLASS_PRODUCER = "org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer"; public static final String ENHANCE_METHOD_DISPATCH = "invokeListener"; public static final String INTERCEPTOR_ConSTRUCTOR = "org.apache.skywalking.apm.plugin.rabbitmq.RabbitMQProducerAndConsumerConstructorInterceptor"; @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() { return new ConstructorInterceptPoint[] { new ConstructorInterceptPoint() { @Override public ElementMatchergetConstructorMatcher() { return takesArgumentWithType(0,"com.rabbitmq.client.impl.AMQConnection"); } @Override public String getConstructorInterceptor() { return INTERCEPTOR_CONSTRUCTOR; } } }; } @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() { return new InstanceMethodsInterceptPoint[] { new InstanceMethodsInterceptPoint() { @Override public ElementMatcher getMethodsMatcher() { return named(ENHANCE_METHOD_DISPATCH).and(takesArgumentWithType(1,"org.springframework.amqp.core.Message")); } @Override public String getMethodsInterceptor() { return INTERCEPTOR_CLASS; } @Override public boolean isOverrideArgs() { return true; } } }; } @Override protected ClassMatch enhanceClass() { return MultiClassNameMatch.byMultiClassMatch(ENHANCE_CLASS_PRODUCER); } }
切面实现增强实现
public class RabbitMQConsumerInvokeInterceptor implements InstanceMethodsAroundInterceptor { public static final String OPERATE_NAME_PREFIX = "RabbitMQ/"; public static final String CONSUMER_OPERATE_NAME_SUFFIX = "/Consumer/invoke/"; @Override public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable { ContextCarrier contextCarrier = new ContextCarrier(); Connection connection = ((Channel)allArguments[0]).getConnection(); String url = connection.getAddress().toString().replace("/","") + ":" + connection.getPort(); Message message = (Message) allArguments[1]; MessageProperties msgProperties = message.getMessageProperties(); String msgTopic = msgProperties.getConsumerQueue(); AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + "Queue/" + msgTopic + CONSUMER_OPERATE_NAME_SUFFIX , null).start(System.currentTimeMillis()); Tags.MQ_BROKER.set(activeSpan, url); Tags.MQ_TOPIC.set(activeSpan, msgProperties.getReceivedExchange()); Tags.MQ_QUEUE.set(activeSpan, msgProperties.getConsumerQueue()); activeSpan.setComponent(ComponentsDefine.RABBITMQ_CONSUMER); SpanLayer.asMQ(activeSpan); CarrierItem next = contextCarrier.items(); while (next.hasNext()) { next = next.next(); if (msgProperties.getHeaders() != null && msgProperties.getHeaders().get(next.getHeadKey()) != null) { next.setHeadValue(msgProperties.getHeaders().get(next.getHeadKey()).toString()); } } ContextManager.extract(contextCarrier); } @Override public Object afterMethod(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class>[] classes, Object o) throws Throwable { ContextManager.stopSpan(); return o; } @Override public void handleMethodException(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class>[] classes, Throwable throwable) { ContextManager.activeSpan().log(throwable); } }
具体实现见 https://github.com/summer-slow-grace/rabbitmq-5.x-plugin
不同spring版本会有细小差异,如果应用不同版本spring,将插件增强方法更改对应到具体版本invokeListener位置即可。
参考文章中Skywalking中RabbitMQ消费链路被隔断_Antony868的博客-CSDN博客说到的
自认为我看到官方版本的是没有问题的,就是解决handleDelivery到run方法中的,真正引起链路断裂是spring中异步消费问题引起的,而不是这篇文章中 handleDelivery方法。
參考文章
Skywalking中RabbitMQ消费链路被隔断_Antony868的博客-CSDN博客
@RabbitListener源码解析_u013905744的专栏-CSDN博客_@rabbitlistener
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)