springboot中rabbitmq的queue,exchange,bingding声明原理

springboot中rabbitmq的queue,exchange,bingding声明原理,第1张

springboot中rabbitmq的queue,exchange,bingding声明原理

为什么new一个queue,exchange和binding就会自动declare?

首先exchange,queue和binding都实现了Declarable

在RabbitAutoConfiguration中注册了一个bean--RabbitAdmin

里面有个方法

	@Override
	public void afterPropertiesSet() {

翻译大概如下:

如果autoStartup为true,会在ConnectionFactory上注册回调,回调用来declare所有的queue,exchange,binding, 如果回调失败,那么它可能会导致连接工厂的其他客户端失败,但如果只是交换、队列和绑定declare失败,这种情况就不会发生

声明的主要实现代码在:

@Override // NOSonAR complexity
public void initialize() {

//从上下文获取bean 并分装成链表
   Collection contextExchanges = new linkedList(
         this.applicationContext.getBeansOfType(Exchange.class).values());
   Collection contextQueues = new linkedList(
         this.applicationContext.getBeansOfType(Queue.class).values());
   Collection contextBindings = new linkedList(
         this.applicationContext.getBeansOfType(Binding.class).values());
   Collection customizers =
         this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values();

   processDeclarables(contextExchanges, contextQueues, contextBindings);

    //判断是否由admin初始化
    //通过setShouldDeclare(Declarable接口定义)可以设置是否初始化
   final Collection exchanges = filterDeclarables(contextExchanges, customizers);
   final Collection queues = filterDeclarables(contextQueues, customizers);
   final Collection bindings = filterDeclarables(contextBindings, customizers);

    //设置declare的回调在rabbittemplate上
   this.rabbitTemplate.execute(channel -> {
      declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
      declareQueues(channel, queues.toArray(new Queue[queues.size()]));
      declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
      return null;
   });
   
   //如果redeclareManualDeclarations这个参数为true,所有declarables在declare后会重新declare,如下代码
   if (this.manualDeclarables.size() > 0) {
      synchronized (this.manualDeclarables) { 
         for (Declarable dec : this.manualDeclarables.values()) {
            if (dec instanceof Queue) {
               declareQueue((Queue) dec);
            }
            else if (dec instanceof Exchange) {
               declareExchange((Exchange) dec);
            }
            else {
               declareBinding((Binding) dec);
            }
         }
      }
   } 
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5705438.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存