@RabbitListener(queues = "activity_queue" ,containerFactory = "simpleRabbitListenerContainerFactory")
rabbitmq 执行流程
RabbitListenerEndpointRegistrar 实现 InitializingBean 接口,启动会自动被调用
org.springframework.beans.factory.InitializingBean#afterPropertiesSet方法@Override public void afterPropertiesSet() { registerAllEndpoints(); }
注册所有的端点
protected void registerAllEndpoints() { Assert.state(this.endpointRegistry != null, "No registry available"); synchronized (this.endpointDescriptors) { for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) { if (descriptor.endpoint instanceof MultiMethodRabbitListenerEndpoint && this.validator != null) { ((MultiMethodRabbitListenerEndpoint) descriptor.endpoint).setValidator(this.validator); } this.endpointRegistry.registerListenerContainer(// NOSonAR never null descriptor.endpoint, resolveContainerFactory(descriptor)); } this.startImmediately = true; // trigger immediate startup } }org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar#resolveContainerFactory
private RabbitListenerContainerFactory> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) { if (descriptor.containerFactory != null) { return descriptor.containerFactory; } else if (this.containerFactory != null) { return this.containerFactory; } else if (this.containerFactoryBeanName != null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); // 得到注入容器中的 containerFactory this.containerFactory = this.beanFactory.getBean( this.containerFactoryBeanName, RabbitListenerContainerFactory.class); return this.containerFactory; // Consider changing this if live change of the factory is required } else { throw new IllegalStateException("Could not resolve the " + RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" + descriptor.endpoint + "] no factory was given and no default is set."); } }spring执行到 org.springframework.context.support.AbstractApplicationContext#finishRefresh 方法
protected void finishRefresh() { // Clear context-level resource caches (such as ASM metadata from scanning). clearResourceCaches(); // Initialize lifecycle processor for this context. initLifecycleProcessor(); // 这一步 // Propagate refresh to lifecycle processor first. getLifecycleProcessor().onRefresh(); // Publish the final event. publishEvent(new ContextRefreshedEvent(this)); // Participate in LiveBeansView MBean, if active. if (!NativeDetector.inNativeImage()) { LiveBeansView.registerApplicationContext(this); } }org.springframework.context.support.DefaultLifecycleProcessor#startBeans
private void startBeans(boolean autoStartupOnly) { Maporg.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup#startlifecycleBeans = getLifecycleBeans(); Map phases = new TreeMap<>(); lifecycleBeans.forEach((beanName, bean) -> { if (!autoStartuponly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) { int phase = getPhase(bean); phases.computeIfAbsent( phase, p -> new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly) ).add(beanName, bean); } }); if (!phases.isEmpty()) { //执行start phases.values().forEach(LifecycleGroup::start); } }
public void start() { if (this.members.isEmpty()) { return; } if (logger.isDebugEnabled()) { logger.debug("Starting beans in phase " + this.phase); } Collections.sort(this.members); for (LifecycleGroupMember member : this.members) { doStart(this.lifecycleBeans, member.name, this.autoStartupOnly); } }org.springframework.context.support.DefaultLifecycleProcessor#doStart
private void doStart(MaplifecycleBeans, String beanName, boolean autoStartupOnly) { Lifecycle bean = lifecycleBeans.remove(beanName); if (bean != null && bean != this) { String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName); for (String dependency : dependenciesForBean) { doStart(lifecycleBeans, dependency, autoStartupOnly); } if (!bean.isRunning() && (!autoStartuponly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) { if (logger.isTraceEnabled()) { logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]"); } try { // 调用start bean.start(); } catch (Throwable ex) { throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex); } if (logger.isDebugEnabled()) { logger.debug("Successfully started bean '" + beanName + "'"); } } } }
执行 RabbitListenerEndpointRegistry 的start方法
public void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { startIfNecessary(listenerContainer); } }org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry#startIfNecessary
private void startIfNecessary(MessageListenerContainer listenerContainer) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { // simpleRabbitListenerContainerFactory SimpleMessageListenerContainer listenerContainer.start(); } }SimpleRabbitListenerContainer
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer { // 执行父类 AbstractMessageListenerContainer 的start方法 public void start() { if (isRunning()) { return; } if (!this.initialized) { synchronized (this.lifecycleMonitor) { if (!this.initialized) { afterPropertiesSet(); } } } try { logger.debug("Starting Rabbit listener container."); configureAdminIfNeeded(); checkMismatchedQueues(); doStart(); } catch (Exception ex) { throw convertRabbitAccessException(ex); } finally { this.lazyLoad = false; } } }org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#checkMismatchedQueues
protected void checkMismatchedQueues() { if (this.mismatchedQueuesFatal && this.amqpAdmin != null) { try { // 初始化 this.amqpAdmin.initialize(); } catch (AmqpConnectException e) { logger.info("Broker not available; cannot check queue declarations"); } catch (AmqpIOException e) { if (RabbitUtils.isMismatchedQueueArgs(e)) { throw new FatalListenerStartupException("Mismatched queues", e); } else { logger.info("Failed to get connection during start(): " + e); } } } else { try { Connection connection = getConnectionFactory().createConnection(); // NOSONAR if (connection != null) { connection.close(); } } catch (Exception e) { logger.info("Broker not available; cannot force queue declarations during start: " + e.getMessage()); } } }org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart
protected void doStart() { Assert.state(!this.consumerBatchEnabled || getMessageListener() instanceof BatchMessageListener || getMessageListener() instanceof ChannelAwareBatchMessageListener, "When setting 'consumerBatchEnabled' to true, the listener must support batching"); checkListenerContainerAware(); super.doStart(); synchronized (this.consumersMonitor) { if (this.consumers != null) { throw new IllegalStateException("A stopped container should not have consumers"); } int newConsumers = initializeConsumers(); if (this.consumers == null) { logger.info("Consumers were initialized and then cleared " + "(presumably the container was stopped concurrently)"); return; } if (newConsumers <= 0) { if (logger.isInfoEnabled()) { logger.info("Consumers are already running"); } return; } Set processors = new HashSet(); // 关键代码 for (BlockingQueueConsumer consumer : this.consumers) { // AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); processors.add(processor); // 提交任务执行 getTaskExecutor().execute(processor); if (getApplicationEventPublisher() != null) { getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer)); } } waitForConsumersToStart(processors); } }org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run
public void run() { // NOSonAR - line count if (!isActive()) { return; } boolean aborted = false; this.consumer.setLocallyTransacted(isChannelLocallyTransacted()); String routingLookupKey = getRoutingLookupKey(); if (routingLookupKey != null) { SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSonAR both never null } if (this.consumer.getQueueCount() < 1) { if (logger.isDebugEnabled()) { logger.debug("Consumer stopping; no queues for " + this.consumer); } SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer); if (getApplicationEventPublisher() != null) { getApplicationEventPublisher().publishEvent( new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer)); } this.start.countDown(); return; } try { initialize(); // 循环等待 while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) { mainLoop(); } } catch (InterruptedException e) { logger.debug("Consumer thread interrupted, processing stopped."); Thread.currentThread().interrupt(); aborted = true; publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e); } catch (QueuesNotAvailableException ex) { logger.error("Consumer threw missing queues exception, fatal=" + isMissingQueuesFatal(), ex); if (isMissingQueuesFatal()) { this.startupException = ex; // Fatal, but no point re-throwing, so just abort. aborted = true; } publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex); } catch (FatalListenerStartupException ex) { logger.error("Consumer received fatal exception on startup", ex); this.startupException = ex; // Fatal, but no point re-throwing, so just abort. aborted = true; publishConsumerFailedEvent("Consumer received fatal exception on startup", true, ex); } catch (FatalListenerExecutionException ex) { // NOSonAR exception as flow control logger.error("Consumer received fatal exception during processing", ex); // Fatal, but no point re-throwing, so just abort. aborted = true; publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex); } catch (PossibleAuthenticationFailureException ex) { logger.error("Consumer received fatal=" + isPossibleAuthenticationFailureFatal() + " exception during processing", ex); if (isPossibleAuthenticationFailureFatal()) { this.startupException = new FatalListenerStartupException("Authentication failure", new AmqpAuthenticationException(ex)); // Fatal, but no point re-throwing, so just abort. aborted = true; } publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, ex); } catch (ShutdownSignalException e) { if (RabbitUtils.isNormalShutdown(e)) { if (logger.isDebugEnabled()) { logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage()); } } else { logConsumerException(e); } } catch (AmqpIOException e) { if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException && e.getCause().getCause().getMessage().contains("in exclusive use")) { getExclusiveConsumerExceptionLogger().log(logger, "Exclusive consumer failure", e.getCause().getCause()); publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e); } else { logConsumerException(e); } } catch (Error e) { //NOSONAR logger.error("Consumer thread error, thread abort.", e); publishConsumerFailedEvent("Consumer threw an Error", true, e); getJavaLangErrorHandler().handle(e); aborted = true; } catch (Throwable t) { //NOSONAR // by now, it must be an exception if (isActive()) { logConsumerException(t); } } finally { if (getTransactionManager() != null) { ConsumerChannelRegistry.unRegisterConsumerChannel(); } } // In all cases count down to allow container to progress beyond startup this.start.countDown(); killOrRestart(aborted); if (routingLookupKey != null) { SimpleResourceHolder.unbind(getRoutingConnectionFactory()); // NOSonAR never null here } }org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop
private void mainLoop() throws Exception { // NOSonAR Exception try { boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) { // 调整消费者数 checkAdjust(receivedOk); } long idleEventInterval = getIdleEventInterval(); if (idleEventInterval > 0) { if (receivedOk) { updateLastReceive(); } else { long now = System.currentTimeMillis(); long lastalertAt = SimpleMessageListenerContainer.this.lastNoMessagealert.get(); long lastReceive = getLastReceive(); if (now > lastReceive + idleEventInterval && now > lastalertAt + idleEventInterval && SimpleMessageListenerContainer.this.lastNoMessagealert .compareAndSet(lastalertAt, now)) { publishIdleContainerEvent(now - lastReceive); } } } } catch (ListenerExecutionFailedException ex) { // Continue to process, otherwise re-throw if (ex.getCause() instanceof NoSuchMethodException) { throw new FatalListenerExecutionException("Invalid listener", ex); } } catch (AmqpRejectAndDontRequeueException rejectEx) { } }org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#checkAdjust 调整消费者
private void checkAdjust(boolean receivedOk) { if (receivedOk) { if (isActive(this.consumer)) { this.consecutiveIdles = 0; if (this.consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) { considerAddingAConsumer(); this.consecutiveMessages = 0; } } } else { this.consecutiveMessages = 0; if (this.consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) { considerStoppingAConsumer(this.consumer); this.consecutiveIdles = 0; } } }org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#considerAddingAConsumer 新增消费者
private void considerAddingAConsumer() { synchronized (this.consumersMonitor) { if (this.consumers != null && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) { long now = System.currentTimeMillis(); if (this.lastConsumerStarted + this.startConsumerMinInterval < now) { this.addAndStartConsumers(1); this.lastConsumerStarted = now; } } } }org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#considerStoppingAConsumer 减少消费者
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) { synchronized (this.consumersMonitor) { if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) { long now = System.currentTimeMillis(); if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) { consumer.basicCancel(true); this.consumers.remove(consumer); if (logger.isDebugEnabled()) { logger.debug("Idle consumer terminating: " + consumer); } this.lastConsumerStopped = now; } } } }
自记录日志信息
try { initialize(); while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) { mainLoop(); } }org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart
// 关键代码 for (BlockingQueueConsumer consumer : this.consumers) { AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer); processors.add(processor); // 提交任务 getTaskExecutor().execute(processor); if (getApplicationEventPublisher() != null) { getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer)); }
执行任务代码逻辑
private final class AsyncMessageProcessingConsumer implements Runnable { public void run(){ //主要代码逻辑 try { initialize(); while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) { mainLoop(); } } } }
private void mainLoop() throws Exception { // NOSonAR Exception try { boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) { checkAdjust(receivedOk); } long idleEventInterval = getIdleEventInterval(); if (idleEventInterval > 0) { if (receivedOk) { updateLastReceive(); } else { long now = System.currentTimeMillis(); long lastalertAt = SimpleMessageListenerContainer.this.lastNoMessagealert.get(); long lastReceive = getLastReceive(); if (now > lastReceive + idleEventInterval && now > lastalertAt + idleEventInterval && SimpleMessageListenerContainer.this.lastNoMessagealert .compareAndSet(lastalertAt, now)) { publishIdleContainerEvent(now - lastReceive); } } } } catch (ListenerExecutionFailedException ex) { // Continue to process, otherwise re-throw if (ex.getCause() instanceof NoSuchMethodException) { throw new FatalListenerExecutionException("Invalid listener", ex); } } catch (AmqpRejectAndDontRequeueException rejectEx) { } }
设置了 最大消费个数的时候,发现队列数据很多的时候,并不是直接新增到设置的最大数
private void checkAdjust(boolean receivedOk) { // 判断是否有推送到消息 if (receivedOk) { //当前消费者是否存活 if (isActive(this.consumer)) { this.consecutiveIdles = 0; // 当前持续的消息数 是否大于默认的配置数 if (this.consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) { // 此时执行 新增队列逻辑 *** 作 considerAddingAConsumer(); this.consecutiveMessages = 0; } } } else { // 当队列没有收到消息的执行代码逻辑 this.consecutiveMessages = 0; if (this.consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) { // 移除多余的channel considerStoppingAConsumer(this.consumer); this.consecutiveIdles = 0; } } }新增代码逻辑
private void considerAddingAConsumer() { synchronized (this.consumersMonitor) { if (this.consumers != null && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) { long now = System.currentTimeMillis(); if (this.lastConsumerStarted + this.startConsumerMinInterval < now) { // 新增并开启消费者 this.addAndStartConsumers(1); this.lastConsumerStarted = now; } } } }移除代码逻辑
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) { synchronized (this.consumersMonitor) { if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) { long now = System.currentTimeMillis(); if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) { consumer.basicCancel(true); this.consumers.remove(consumer); if (logger.isDebugEnabled()) { logger.debug("Idle consumer terminating: " + consumer); } this.lastConsumerStopped = now; } } } }
org.springframework.amqp.rabbit.core.RabbitAdmin#initialize 注册
public void initialize() { if (this.applicationContext == null) { this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings"); return; } this.logger.debug("Initializing declarations"); CollectioncontextExchanges = new linkedList ( this.applicationContext.getBeansOfType(Exchange.class).values()); Collection contextQueues = new linkedList ( this.applicationContext.getBeansOfType(Queue.class).values()); Collection contextBindings = new linkedList ( this.applicationContext.getBeansOfType(Binding.class).values()); Collection customizers = this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values(); processDeclarables(contextExchanges, contextQueues, contextBindings); final Collection exchanges = filterDeclarables(contextExchanges, customizers); final Collection queues = filterDeclarables(contextQueues, customizers); final Collection bindings = filterDeclarables(contextBindings, customizers); for (Exchange exchange : exchanges) { if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable or auto-delete Exchange (" + exchange.getName() + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". " + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and " + "reopening the connection."); } } for (Queue queue : queues) { if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue (" + queue.getName() + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:" + queue.isExclusive() + ". " + "It will be redeclared if the broker stops and is restarted while the connection factory is " + "alive, but all messages will be lost."); } } if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) { this.logger.debug("Nothing to declare"); return; } this.rabbitTemplate.execute(channel -> { declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()])); declareQueues(channel, queues.toArray(new Queue[queues.size()])); declareBindings(channel, bindings.toArray(new Binding[bindings.size()])); return null; }); this.logger.debug("Declarations finished"); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)