rabbitmq 配置 SimpleRabbitListenerContainerFactory 如何 新增消费者

rabbitmq 配置 SimpleRabbitListenerContainerFactory 如何 新增消费者,第1张

rabbitmq 配置 SimpleRabbitListenerContainerFactory 如何 新增消费者 rabbitmq 配置 containerFactory 属性
@RabbitListener(queues = "activity_queue" ,containerFactory = "simpleRabbitListenerContainerFactory")


rabbitmq 执行流程

RabbitListenerEndpointRegistrar 实现 InitializingBean 接口,启动会自动被调用

org.springframework.beans.factory.InitializingBean#afterPropertiesSet方法
	@Override
	public void afterPropertiesSet() {
		registerAllEndpoints();
	}

注册所有的端点

protected void registerAllEndpoints() {
		Assert.state(this.endpointRegistry != null, "No registry available");
		synchronized (this.endpointDescriptors) {
			for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
				if (descriptor.endpoint instanceof MultiMethodRabbitListenerEndpoint && this.validator != null) {
					((MultiMethodRabbitListenerEndpoint) descriptor.endpoint).setValidator(this.validator);
				}
				this.endpointRegistry.registerListenerContainer(// NOSonAR never null
						descriptor.endpoint, 
                   
                    resolveContainerFactory(descriptor));
			}
			this.startImmediately = true;  // trigger immediate startup
		}
	}
org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar#resolveContainerFactory
private RabbitListenerContainerFactory resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
		if (descriptor.containerFactory != null) {
			return descriptor.containerFactory;
		}
		else if (this.containerFactory != null) {
			return this.containerFactory;
		}
		else if (this.containerFactoryBeanName != null) {
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            // 得到注入容器中的  containerFactory
			this.containerFactory = this.beanFactory.getBean(
					this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
			return this.containerFactory;  // Consider changing this if live change of the factory is required
		}
		else {
			throw new IllegalStateException("Could not resolve the " +
					RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
					descriptor.endpoint + "] no factory was given and no default is set.");
		}
	}
spring执行到 org.springframework.context.support.AbstractApplicationContext#finishRefresh 方法
	protected void finishRefresh() {
		// Clear context-level resource caches (such as ASM metadata from scanning).
		clearResourceCaches();

		// Initialize lifecycle processor for this context.
		initLifecycleProcessor();
		// 这一步
		// Propagate refresh to lifecycle processor first.
		getLifecycleProcessor().onRefresh();

		// Publish the final event.
		publishEvent(new ContextRefreshedEvent(this));

		// Participate in LiveBeansView MBean, if active.
		if (!NativeDetector.inNativeImage()) {
			LiveBeansView.registerApplicationContext(this);
		}
	}
org.springframework.context.support.DefaultLifecycleProcessor#startBeans
private void startBeans(boolean autoStartupOnly) {
		Map lifecycleBeans = getLifecycleBeans();
		Map phases = new TreeMap<>();

		lifecycleBeans.forEach((beanName, bean) -> {
			if (!autoStartuponly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
				int phase = getPhase(bean);
				phases.computeIfAbsent(
						phase,
						p -> new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly)
				).add(beanName, bean);
			}
		});
		if (!phases.isEmpty()) {
		//执行start
			phases.values().forEach(LifecycleGroup::start);
		}
	}
org.springframework.context.support.DefaultLifecycleProcessor.LifecycleGroup#start
public void start() {
			if (this.members.isEmpty()) {
				return;
			}
			if (logger.isDebugEnabled()) {
				logger.debug("Starting beans in phase " + this.phase);
			}
			Collections.sort(this.members);
			for (LifecycleGroupMember member : this.members) {
				doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
			}
		}
org.springframework.context.support.DefaultLifecycleProcessor#doStart
private void doStart(Map lifecycleBeans, String beanName, boolean autoStartupOnly) {
		Lifecycle bean = lifecycleBeans.remove(beanName);
		if (bean != null && bean != this) {
			String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
			for (String dependency : dependenciesForBean) {
				doStart(lifecycleBeans, dependency, autoStartupOnly);
			}
			if (!bean.isRunning() &&
					(!autoStartuponly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
				if (logger.isTraceEnabled()) {
					logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
				}
				try {
                    // 调用start
					bean.start();
				}
				catch (Throwable ex) {
					throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
				}
				if (logger.isDebugEnabled()) {
					logger.debug("Successfully started bean '" + beanName + "'");
				}
			}
		}
	}

执行 RabbitListenerEndpointRegistry 的start方法

public void start() {
		for (MessageListenerContainer listenerContainer : getListenerContainers()) {
			startIfNecessary(listenerContainer);
		}
	}
org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry#startIfNecessary
private void startIfNecessary(MessageListenerContainer listenerContainer) {
		if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
            // simpleRabbitListenerContainerFactory SimpleMessageListenerContainer
			listenerContainer.start();
		}
	}
SimpleRabbitListenerContainer
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer { 

// 执行父类 AbstractMessageListenerContainer 的start方法
public void start() {
		if (isRunning()) {
			return;
		}
		if (!this.initialized) {
			synchronized (this.lifecycleMonitor) {
				if (!this.initialized) {
					afterPropertiesSet();
				}
			}
		}
		try {
			logger.debug("Starting Rabbit listener container.");
			configureAdminIfNeeded();
            
			checkMismatchedQueues();
			doStart();
		}
		catch (Exception ex) {
			throw convertRabbitAccessException(ex);
		}
		finally {
			this.lazyLoad = false;
		}
	}
}

org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer#checkMismatchedQueues
protected void checkMismatchedQueues() {
		if (this.mismatchedQueuesFatal && this.amqpAdmin != null) {
			try {
                // 初始化
				this.amqpAdmin.initialize();
			}
			catch (AmqpConnectException e) {
				logger.info("Broker not available; cannot check queue declarations");
			}
			catch (AmqpIOException e) {
				if (RabbitUtils.isMismatchedQueueArgs(e)) {
					throw new FatalListenerStartupException("Mismatched queues", e);
				}
				else {
					logger.info("Failed to get connection during start(): " + e);
				}
			}
		}
		else {
			try {
				Connection connection = getConnectionFactory().createConnection(); // NOSONAR
				if (connection != null) {
					connection.close();
				}
			}
			catch (Exception e) {
				logger.info("Broker not available; cannot force queue declarations during start: " + e.getMessage());
			}
		}
	}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart
protected void doStart() {
		Assert.state(!this.consumerBatchEnabled || getMessageListener() instanceof BatchMessageListener
				|| getMessageListener() instanceof ChannelAwareBatchMessageListener,
				"When setting 'consumerBatchEnabled' to true, the listener must support batching");
		checkListenerContainerAware();
		super.doStart();
		synchronized (this.consumersMonitor) {
			if (this.consumers != null) {
				throw new IllegalStateException("A stopped container should not have consumers");
			}
			int newConsumers = initializeConsumers();
			if (this.consumers == null) {
				logger.info("Consumers were initialized and then cleared " +
						"(presumably the container was stopped concurrently)");
				return;
			}
			if (newConsumers <= 0) {
				if (logger.isInfoEnabled()) {
					logger.info("Consumers are already running");
				}
				return;
			}
			Set processors = new HashSet();
            // 关键代码
			for (BlockingQueueConsumer consumer : this.consumers) {
                //
				AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
				processors.add(processor);
                // 提交任务执行
				getTaskExecutor().execute(processor);
				if (getApplicationEventPublisher() != null) {
					getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
				}
			}
			waitForConsumersToStart(processors);
		}
	}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run
public void run() { // NOSonAR - line count
			if (!isActive()) {
				return;
			}

			boolean aborted = false;

			this.consumer.setLocallyTransacted(isChannelLocallyTransacted());

			String routingLookupKey = getRoutingLookupKey();
			if (routingLookupKey != null) {
				SimpleResourceHolder.bind(getRoutingConnectionFactory(), routingLookupKey); // NOSonAR both never null
			}

			if (this.consumer.getQueueCount() < 1) {
				if (logger.isDebugEnabled()) {
					logger.debug("Consumer stopping; no queues for " + this.consumer);
				}
				SimpleMessageListenerContainer.this.cancellationLock.release(this.consumer);
				if (getApplicationEventPublisher() != null) {
					getApplicationEventPublisher().publishEvent(
							new AsyncConsumerStoppedEvent(SimpleMessageListenerContainer.this, this.consumer));
				}
				this.start.countDown();
				return;
			}

			try {
				initialize();
                // 循环等待
				while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
					mainLoop();
				}
			}
			catch (InterruptedException e) {
				logger.debug("Consumer thread interrupted, processing stopped.");
				Thread.currentThread().interrupt();
				aborted = true;
				publishConsumerFailedEvent("Consumer thread interrupted, processing stopped", true, e);
			}
			catch (QueuesNotAvailableException ex) {
				logger.error("Consumer threw missing queues exception, fatal=" + isMissingQueuesFatal(), ex);
				if (isMissingQueuesFatal()) {
					this.startupException = ex;
					// Fatal, but no point re-throwing, so just abort.
					aborted = true;
				}
				publishConsumerFailedEvent("Consumer queue(s) not available", aborted, ex);
			}
			catch (FatalListenerStartupException ex) {
				logger.error("Consumer received fatal exception on startup", ex);
				this.startupException = ex;
				// Fatal, but no point re-throwing, so just abort.
				aborted = true;
				publishConsumerFailedEvent("Consumer received fatal exception on startup", true, ex);
			}
			catch (FatalListenerExecutionException ex) { // NOSonAR exception as flow control
				logger.error("Consumer received fatal exception during processing", ex);
				// Fatal, but no point re-throwing, so just abort.
				aborted = true;
				publishConsumerFailedEvent("Consumer received fatal exception during processing", true, ex);
			}
			catch (PossibleAuthenticationFailureException ex) {
				logger.error("Consumer received fatal=" + isPossibleAuthenticationFailureFatal() +
						" exception during processing", ex);
				if (isPossibleAuthenticationFailureFatal()) {
					this.startupException =
							new FatalListenerStartupException("Authentication failure",
									new AmqpAuthenticationException(ex));
					// Fatal, but no point re-throwing, so just abort.
					aborted = true;
				}
				publishConsumerFailedEvent("Consumer received PossibleAuthenticationFailure during startup", aborted, ex);
			}
			catch (ShutdownSignalException e) {
				if (RabbitUtils.isNormalShutdown(e)) {
					if (logger.isDebugEnabled()) {
						logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage());
					}
				}
				else {
					logConsumerException(e);
				}
			}
			catch (AmqpIOException e) {
				if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
						&& e.getCause().getCause().getMessage().contains("in exclusive use")) {
					getExclusiveConsumerExceptionLogger().log(logger,
							"Exclusive consumer failure", e.getCause().getCause());
					publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
				}
				else {
					logConsumerException(e);
				}
			}
			catch (Error e) { //NOSONAR
				logger.error("Consumer thread error, thread abort.", e);
				publishConsumerFailedEvent("Consumer threw an Error", true, e);
				getJavaLangErrorHandler().handle(e);
				aborted = true;
			}
			catch (Throwable t) { //NOSONAR
				// by now, it must be an exception
				if (isActive()) {
					logConsumerException(t);
				}
			}
			finally {
				if (getTransactionManager() != null) {
					ConsumerChannelRegistry.unRegisterConsumerChannel();
				}
			}

			// In all cases count down to allow container to progress beyond startup
			this.start.countDown();

			killOrRestart(aborted);

			if (routingLookupKey != null) {
				SimpleResourceHolder.unbind(getRoutingConnectionFactory()); // NOSonAR never null here
			}
		}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop
	private void mainLoop() throws Exception { // NOSonAR Exception
			try {
				boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
				if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
                    // 调整消费者数
					checkAdjust(receivedOk);
				}
				long idleEventInterval = getIdleEventInterval();
				if (idleEventInterval > 0) {
					if (receivedOk) {
						updateLastReceive();
					}
					else {
						long now = System.currentTimeMillis();
						long lastalertAt = SimpleMessageListenerContainer.this.lastNoMessagealert.get();
						long lastReceive = getLastReceive();
						if (now > lastReceive + idleEventInterval
								&& now > lastalertAt + idleEventInterval
								&& SimpleMessageListenerContainer.this.lastNoMessagealert
								.compareAndSet(lastalertAt, now)) {
							publishIdleContainerEvent(now - lastReceive);
						}
					}
				}
			}
			catch (ListenerExecutionFailedException ex) {
				// Continue to process, otherwise re-throw
				if (ex.getCause() instanceof NoSuchMethodException) {
					throw new FatalListenerExecutionException("Invalid listener", ex);
				}
			}
			catch (AmqpRejectAndDontRequeueException rejectEx) {
				
			}
		}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#checkAdjust 调整消费者
private void checkAdjust(boolean receivedOk) {
			if (receivedOk) {
				if (isActive(this.consumer)) {
					this.consecutiveIdles = 0;
					if (this.consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
						considerAddingAConsumer();
						this.consecutiveMessages = 0;
					}
				}
			}
			else {
				this.consecutiveMessages = 0;
				if (this.consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
					considerStoppingAConsumer(this.consumer);
					this.consecutiveIdles = 0;
				}
			}
		}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#considerAddingAConsumer 新增消费者
	private void considerAddingAConsumer() {
		synchronized (this.consumersMonitor) {
			if (this.consumers != null
					&& this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
				long now = System.currentTimeMillis();
				if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
					this.addAndStartConsumers(1);
					this.lastConsumerStarted = now;
				}
			}
		}
	}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#considerStoppingAConsumer 减少消费者
	private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
		synchronized (this.consumersMonitor) {
			if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
				long now = System.currentTimeMillis();
				if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
					consumer.basicCancel(true);
					this.consumers.remove(consumer);
					if (logger.isDebugEnabled()) {
						logger.debug("Idle consumer terminating: " + consumer);
					}
					this.lastConsumerStopped = now;
				}
			}
		}
	}



自记录日志信息

	try {
				initialize();
				while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
					mainLoop();
				}
			}
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#doStart
	// 关键代码
	for (BlockingQueueConsumer consumer : this.consumers) {
           
				AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
				processors.add(processor);
				// 提交任务
				getTaskExecutor().execute(processor);
				if (getApplicationEventPublisher() != null) {
					getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
				}

执行任务代码逻辑

	private final class AsyncMessageProcessingConsumer implements Runnable {
        public void run(){
            //主要代码逻辑
            try {
                initialize();
                while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
                                mainLoop();
                }
             }
        }
        
    }

private void mainLoop() throws Exception { // NOSonAR Exception
			try {
				boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
				if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
					checkAdjust(receivedOk);
				}
				long idleEventInterval = getIdleEventInterval();
				if (idleEventInterval > 0) {
					if (receivedOk) {
						updateLastReceive();
					}
					else {
						long now = System.currentTimeMillis();
						long lastalertAt = SimpleMessageListenerContainer.this.lastNoMessagealert.get();
						long lastReceive = getLastReceive();
						if (now > lastReceive + idleEventInterval
								&& now > lastalertAt + idleEventInterval
								&& SimpleMessageListenerContainer.this.lastNoMessagealert
								.compareAndSet(lastalertAt, now)) {
							publishIdleContainerEvent(now - lastReceive);
						}
					}
				}
			}
			catch (ListenerExecutionFailedException ex) {
				// Continue to process, otherwise re-throw
				if (ex.getCause() instanceof NoSuchMethodException) {
					throw new FatalListenerExecutionException("Invalid listener", ex);
				}
			}
			catch (AmqpRejectAndDontRequeueException rejectEx) {
				
			}
		}

设置了 最大消费个数的时候,发现队列数据很多的时候,并不是直接新增到设置的最大数

private void checkAdjust(boolean receivedOk) {
		// 判断是否有推送到消息
    if (receivedOk) {
        //当前消费者是否存活
				if (isActive(this.consumer)) {
					this.consecutiveIdles = 0;
                    // 当前持续的消息数 是否大于默认的配置数
					if (this.consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
                        // 此时执行 新增队列逻辑 *** 作
						considerAddingAConsumer();
						this.consecutiveMessages = 0;
					}
				}
			}
			else {
                // 当队列没有收到消息的执行代码逻辑
				this.consecutiveMessages = 0;
				if (this.consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
                    // 移除多余的channel 
					considerStoppingAConsumer(this.consumer);
					this.consecutiveIdles = 0;
				}
			}
		}
新增代码逻辑
private void considerAddingAConsumer() {
		synchronized (this.consumersMonitor) {
			if (this.consumers != null
					&& this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
				long now = System.currentTimeMillis();
				if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
                    // 新增并开启消费者
					this.addAndStartConsumers(1);
					this.lastConsumerStarted = now;
				}
			}
		}
	}
移除代码逻辑
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
		synchronized (this.consumersMonitor) {
			if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
				long now = System.currentTimeMillis();
				if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
					consumer.basicCancel(true);
					this.consumers.remove(consumer);
					if (logger.isDebugEnabled()) {
						logger.debug("Idle consumer terminating: " + consumer);
					}
					this.lastConsumerStopped = now;
				}
			}
		}
	}

org.springframework.amqp.rabbit.core.RabbitAdmin#initialize 注册

public void initialize() {

		if (this.applicationContext == null) {
			this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
			return;
		}

		this.logger.debug("Initializing declarations");
		Collection contextExchanges = new linkedList(
				this.applicationContext.getBeansOfType(Exchange.class).values());
		Collection contextQueues = new linkedList(
				this.applicationContext.getBeansOfType(Queue.class).values());
		Collection contextBindings = new linkedList(
				this.applicationContext.getBeansOfType(Binding.class).values());
		Collection customizers =
				this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();

		processDeclarables(contextExchanges, contextQueues, contextBindings);

		final Collection exchanges = filterDeclarables(contextExchanges, customizers);
		final Collection queues = filterDeclarables(contextQueues, customizers);
		final Collection bindings = filterDeclarables(contextBindings, customizers);

		for (Exchange exchange : exchanges) {
			if ((!exchange.isDurable() || exchange.isAutoDelete())  && this.logger.isInfoEnabled()) {
				this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
						+ exchange.getName()
						+ ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
						+ "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
						+ "reopening the connection.");
			}
		}

		for (Queue queue : queues) {
			if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
				this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
						+ queue.getName()
						+ ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
						+ queue.isExclusive() + ". "
						+ "It will be redeclared if the broker stops and is restarted while the connection factory is "
						+ "alive, but all messages will be lost.");
			}
		}

		if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
			this.logger.debug("Nothing to declare");
			return;
		}
		this.rabbitTemplate.execute(channel -> {
			declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
			declareQueues(channel, queues.toArray(new Queue[queues.size()]));
			declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
			return null;
		});
		this.logger.debug("Declarations finished");

	}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5069198.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-16
下一篇 2022-11-16

发表评论

登录后才能评论

评论列表(0条)

保存