前段时间的RabbitMQ broker服务端由于某个队列一直积压消息,运维在凌晨对mq服务端机器pod进行了扩容,重启了RabbitMQ,然后早上发现自己的服务在mq重启之后一直报异常,停止消费了,导致影响了业务的运行,虽然mq重启成功了但是消费者却没有重连成功。本节会通过分析spring-rabbit的源码,来分析问题出现的原因以及解决办法。
目录
一、出现的问题
二、spring-rabbit消费源码分析
三、解决消费者停止消费问题
一、出现的问题
先看下报了什么异常,这里挑了一些主要的异常堆栈贴出来
o.s.a.r.l.BlockingQueueConsumer - Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException
Failed to declare queue(s):[work_queue]...............................................
Consumer received fatal=false exception on startup
org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.handleDeclarationException(BlockingQueueConsumer.java:661)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:601)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:581)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1196)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1041)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[work_queue]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:710)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.passiveDeclarations(BlockingQueueConsumer.java:594)
... 4 common frames omitted
Caused by: java.io.IOException: null
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:126)
at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:122)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:1006)
at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:52)
at sun.reflect.GeneratedMethodAccessor175.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1110)
at com.sun.proxy.$Proxy285.queueDeclarePassive(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:689)
... 5 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost 'work_platform', class-id=50, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
... 13 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no queue 'work_queue' in vhost 'work_platform', class-id=50, method-id=10)
at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:516)
at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:346)
at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:178)
at com.rabbitmq.client.impl.AMQChannel.handleframe(AMQChannel.java:111)
at com.rabbitmq.client.impl.AMQConnection.readframe(AMQConnection.java:670)
at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:48)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:597)
... 1 common frames omitted...........................
o.s.a.r.l.SimpleMessageListenerContainer message: Stopping container from aborted consumer
我们挑里面的几个主要的错误信息:
- Failed to declare queue(s):[work_queue]Consumer received fatal=false exception on startup org.springframework.amqp.rabbit.listener.QueuesNotAvailableException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.Stopping container from aborted consumer
打开spring-rabbit的源码发现Consumer received fatal=false exception on startup异常在SimpleMessageListenerContainer类中的子类AsyncMessageProcessingConsumer的run()方法中。
下面简单分析一下spring-rabbit的消费者源码流程:
spring-rabbit会为我们的每个消费者(可能消费一个或者多个队列)创建一个SimpleMessageListenerContainer对象,SimpleMessageListenerContainer继承了AsyncMessageProcessingConsumer,而AsyncMessageProcessingConsumer又实现了Lifecycle接口的start()方法,启动时会调用start()方法。
AsyncMessageProcessingConsumer的start()方法如下:
@Override public void start() { if (isRunning()) { return; } if (!this.initialized) { synchronized (this.lifecycleMonitor) { if (!this.initialized) { afterPropertiesSet(); } } } try { if (logger.isDebugEnabled()) { logger.debug("Starting Rabbit listener container."); } configureAdminIfNeeded(); checkMismatchedQueues(); doStart(); } catch (Exception ex) { throw convertRabbitAccessException(ex); } }
start()方法会调用doStart()方法
protected void doStart() { // Reschedule paused tasks, if any. synchronized (this.lifecycleMonitor) { this.active = true; this.running = true; this.lifecycleMonitor.notifyAll(); } }
实际上调用的是子类SimpleMessageListenerContainer中的doStart方法,
@Override protected void doStart() { checkListenerContainerAware(); super.doStart(); synchronized (this.consumersMonitor) { if (this.consumers != null) { throw new IllegalStateException("A stopped container should not have consumers"); } int newConsumers = initializeConsumers(); if (this.consumers == null) { logger.info("Consumers were initialized and then cleared " + "(presumably the container was stopped concurrently)"); return; } if (newConsumers <= 0) { if (logger.isInfoEnabled()) { logger.info("Consumers are already running"); } return; } Set processors = new HashSet(); for (BlockingQueueConsumer consumer : this.consumers) { AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); processors.add(processor); getTaskExecutor().execute(processor); if (getApplicationEventPublisher() != null) { getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer)); } } waitForConsumersToStart(processors); } }
里面的initializeConsumers方法根据我们配置的消费者线程数量concurrentConsumers创建对应数量的消费者,实际的消费逻辑都在BlockingQueueConsumer中。
然后循环遍历BlockingQueueConsumer集合,将每个BlockingQueueConsumer包装创建一个AsyncMessageProcessingConsumer(实现了Runnable接口)。
getTaskExecutor().execute(processor)获取线程池执行创建的线程任务,然后发布了一个AsyncConsumerStartedEvent事件。
protected int initializeConsumers() { int count = 0; synchronized (this.consumersMonitor) { if (this.consumers == null) { this.cancellationLock.reset(); this.consumers = new HashSet(this.concurrentConsumers); for (int i = 0; i < this.concurrentConsumers; i++) { BlockingQueueConsumer consumer = createBlockingQueueConsumer(); this.consumers.add(consumer); count++; } } } return count; }
下面看下消费的最核心逻辑,也就是AsyncMessageProcessingConsumer实现的run方法:
我们看到try catch代码块中有一个while循环中有一个mainLoop()循环,mainLoop方法中就是拉取消息的逻辑,可以看到catch了很多的异常。在上面定义了一个boolean类型的变量aborted默认false,在catch到的有些异常当中将aborted改为了true,aborted这个变量的值直接决定了下面在killOrRestart()方法中的处理逻辑。
看下前面抛出的QueuesNotAvailableException异常的catch逻辑,将aborted改成了true,并且调用了publishConsumerFailedEvent方法中。
catch (QueuesNotAvailableException ex) { logger.error("Consumer received fatal=" + isMismatchedQueuesFatal() + " exception on startup", ex); if (isMissingQueuesFatal()) { this.startupException = ex; // Fatal, but no point re-throwing, so just abort. aborted = true; } publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex); }
publishConsumerFailedEvent方法中传入了aborted(此时为true),即fatal的值为true,abortEvents(BlockingQueue)添加了一个ListenerContainerConsumerFailedEvent事件(fatal此时为true)。
private final BlockingQueueabortEvents = new linkedBlockingQueue<>(); ...... protected void publishConsumerFailedEvent(String reason, boolean fatal, Throwable t) { if (!fatal || !isRunning()) { super.publishConsumerFailedEvent(reason, fatal, t); } else { try { this.abortEvents.put(new ListenerContainerConsumerFailedEvent(this, reason, t, fatal)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
killOrRestart()方法:关键的地方都写了注释,可以看到aborted的值是true的时候,会从阻塞队列abortEvents中获取ListenerContainerConsumerFailedEvent事件,并且广播该事件;如果aborted为false的时候会调用restart方法重启消费者容器。
注意:这里当前面的catch的异常中将aborted改成了true或者消费者已经关闭的状态下,消费者容器不会自动重启,仅仅是发布了一个ListenerContainerConsumerFailedEvent广播事件,其他情况下消费者会自动重启。
我们针对消费者停止消费的处理逻辑也就可以从ListenerContainerConsumerFailedEvent广播事件入手。
private void killOrRestart(boolean aborted) { //判断consumer是关闭状态 || aborted==true if (!isActive(this.consumer) || aborted) { logger.debug("Cancelling " + this.consumer); try { this.consumer.stop(); SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer); if (getApplicationEventPublisher() != null) { getApplicationEventPublisher().publishEvent( new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer)); } } catch (AmqpException e) { logger.info("Could not cancel message consumer", e); } if (aborted && SimpleMessageListenerContainer.this.containerStoppingForAbort .compareAndSet(null, Thread.currentThread())) { logger.error("Stopping container from aborted consumer"); stop(); SimpleMessageListenerContainer.this.containerStoppingForAbort.set(null); ListenerContainerConsumerFailedEvent event = null; do { try { //从阻塞队列abortEvents中获取ListenerContainerConsumerFailedEvent事件 event = SimpleMessageListenerContainer.this.abortEvents.poll(ABORT_EVENT_WAIT_SECONDS, TimeUnit.SECONDS); if (event != null) { //如果ListenerContainerConsumerFailedEvent不为空,发布广播该事件 SimpleMessageListenerContainer.this.publishConsumerFailedEvent( event.getReason(), event.isFatal(), event.getThrowable()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } while (event != null); } } else { logger.info("Restarting " + this.consumer); //调用restart方法重启消费者容器 restart(this.consumer); } }
restart方法会重新创建消费者,发布消费者重启事件AsyncConsumerRestartedEvent,通过线程池执行AsyncMessageProcessingConsumer任务。
private void restart(BlockingQueueConsumer oldConsumer) { BlockingQueueConsumer consumer = oldConsumer; synchronized (this.consumersMonitor) { if (this.consumers != null) { try { // Need to recycle the channel in this consumer consumer.stop(); // Ensure consumer counts are correct (another is going // to start because of the exception, but // we haven't counted down yet) this.cancellationLock.release(consumer); this.consumers.remove(consumer); if (!isActive()) { // Do not restart - container is stopping return; } //重新创建消费者BlockingQueueConsumer BlockingQueueConsumer newConsumer = createBlockingQueueConsumer(); newConsumer.setBackOffExecution(consumer.getBackOffExecution()); consumer = newConsumer; this.consumers.add(consumer); if (getApplicationEventPublisher() != null) { //发布消费者重启事件AsyncConsumerRestartedEvent getApplicationEventPublisher() .publishEvent(new AsyncConsumerRestartedEvent(this, oldConsumer, newConsumer)); } } catch (RuntimeException e) { logger.warn("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage()); // Re-throw and have it logged properly by the caller. throw e; } //通过线程池异步执行任务AsyncMessageProcessingConsumer getTaskExecutor() .execute(new AsyncMessageProcessingConsumer(consumer)); } } }三、解决消费者停止消费问题
通过上面的源码分析,我们得知消费出现某些异常(例如QueuesNotAvailableException)的时候会发布一个ListenerContainerConsumerFailedEvent事件,我们可以监听这个事件重启消费者容器。
spring中跟RabbitMQ相关的事件是AmqpEvent的子类
spring通过发布事件的方式,可以通知观察者(即事件监听器)消费者的一些行为,消费者相关的事件如下所示:
AsyncConsumerStartedEvent:一个新的消费者启动事件AsyncConsumerStoppedEvent:一个消费者停止事件AsyncConsumerRestartedEvent:一个消费者重启事件ListenerContainerConsumerFailedEvent:一个消息监听器消费失败的事件
我们可以监听ListenerContainerConsumerFailedEvent事件,其定义如下所示:其中有一个属性fatal,我们上面也提到过,fatal为true时表示消费者出现了致命的错误,此时消费者不会自动重试进行重新启动,需要我们在事件处理逻辑中进行重启。当fatal为false时,我们可以忽略该事件,消费者容器中会自动重试启动。
public class ListenerContainerConsumerFailedEvent extends AmqpEvent { private final String reason; private final boolean fatal; private final Throwable throwable; }
处理逻辑代码:判断event的fatal是true时,先判断container是否在运行,如果没有在运行则调用start进行启动,然后发送告警信息。
import java.util.Arrays; import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.util.Assert; import lombok.extern.slf4j.Slf4j; @Slf4j @Component public class ListenerContainerConsumerFailedEventListener implements ApplicationListener{ @Override public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) { log.error("消费者失败事件发生:{}", event); if (event.isFatal()) { log.error("Stopping container from aborted consumer. Reason::{}", event.getReason(), event.getThrowable()); SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource(); String queueNames = Arrays.toString(container.getQueueNames()); try { try { Thread.sleep(30000); } catch (Exception e) { log.error(e.getMessage()); } //判断此时消费者容器是否正在运行 Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container)); //消费者容器没有在运行时,进行启动 container.start(); log.info("重启队列{}的监听成功", queueNames); } catch (Exception e) { log.error("重启队列{}的监听失败", queueNames, e); } // TODO 短信/邮件/钉钉...告警,包含队列信息,监听断开原因,断开时异常信息,重启是否成功等... } } }
参考 https://blog.csdn.net/u011424653/article/details/79824538
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)