Error[8]: Undefined offset: 7, File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 121
File: /www/wwwroot/outofmemory.cn/tmp/plugin_ss_superseo_model_superseo.php, Line: 473, decode(

记一次RocketMQ服务启动时 NullPointerException问题 记一次RocketMQ服务启动时 NullPointerException问题 背景

利用 RcoketMQ @ExtRocketMQTemplateConfiguration 注解配置拓展 RocketMQTemplate 作为发消息的temple,RocketMQConsumer 和 ExtRokcetMQTemplate 在同一个项目里。相关RocketMQ版本信息如下:


    org.apache.rocketmq
    rocketmq-client
    4.9.2



    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.2.1

问题

服务启动时, 报错

java.lang.NullPointerException
	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:475)
	at org.apache.rocketmq.spring.core.RocketMQTemplate.doSend(RocketMQTemplate.java:977)
	at org.apache.rocketmq.spring.core.RocketMQTemplate.doSend(RocketMQTemplate.java:60)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at com.lijun.samples.rocketmq.test2.Producer.sendMessage(Producer.java:22)
	at com.lijun.samples.rocketmq.test2.Task.init(Task.java:30)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389)
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$Lifecyclemetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333)
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:602)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:524)
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean 在springboot启动类上添加 @import({ExtProducerResetConfiguration.class, ListenerContainerConfiguration.class}) 来解决(AbstractBeanFactory.java:335)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:944)
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583)
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:338)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1332)
	at com.lijun.samples.rocketmq.Test2Application.main(Test2Application.java:13)
问题分析排查

相关代码:

@Component
@ExtRocketMQTemplateConfiguration(nameServer = "${rocketmqServer}", group = "testExtProducer")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
@Service
@RocketMQMessageListener(nameServer = "${rocketmqServer}",
        topic = "test_send_topic", consumerGroup = "test_group_c")
public class Consumer  implements RocketMQListener {
    @Resource
    private ExtRocketMQTemplate rocketMQTemplate;
    @Override
    public void onMessage(String massage) {
        System.out.println("consumer msg=" + message);
        // 这里只是测试复现代码 真实实现不是这样  这样会导致一直发送和消费消息 自己实现时可以调整下
        rocketMQTemplate.send("test_send_topic", new GenericMessage<>(message));
    }
}

    为什么出现: 服务启动时 RocketMQ Consumer 消费到消息后 处理完后会发送一条MQ消息出去, 我们使用的是 ExtRocketMQTemplateConfiguration 注册的拓展RocketMQTemplate 来发送消息

    源码上来看 ListenerContainerConfiguration 负责创建RocketMQ Listener 也就是注册消费者. ExtProducerResetConfiguration 负责将我们配置的拓展nameServer、group实例化成一个producer ,设置进我们的拓展 ExtRocketMQTemplate

    SmartInitializingSingleton.afterSingletonsInstantiated 调用时机为 所有单例bean 实例化完成后调用

    @Configuration
    public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
        @Override
        public void afterSingletonsInstantiated() {
            Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
                .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    
            beans.forEach(this::registerContainer);
        }
        private void registerContainer(String beanName, Object bean) {
            Class clazz = AopProxyUtils.ultimateTargetClass(bean);
    
            if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
            }
    
            if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
            }
    
            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
    
            String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
            String topic = this.environment.resolvePlaceholders(annotation.topic());
    
            boolean listenerEnabled =
                (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                    .getOrDefault(topic, true);
    
            if (!listenerEnabled) {
                log.debug(
                    "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                    consumerGroup, topic);
                return;
            }
            validate(annotation);
    
            String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
                counter.incrementAndGet());
            GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
    
            genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
                () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
            DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
                DefaultRocketMQListenerContainer.class);
            if (!container.isRunning()) {
                try {
                    // 注意这 直接启动了consumer
                    container.start();
                } catch (Exception e) {
                    log.error("Started container failed. {}", container, e);
                    throw new RuntimeException(e);
                }
            }
    
            log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
        }
    }
    
    @Configuration
    public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton { 
        @Override
        public void afterSingletonsInstantiated() {
            Map beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class)
                .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    
            beans.forEach(this::registerTemplate);
        }
        private void registerTemplate(String beanName, Object bean) {
            Class clazz = AopProxyUtils.ultimateTargetClass(bean);
    
            if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
            }
    
            ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
            GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
            validate(annotation, genericApplicationContext);
    
            DefaultMQProducer mqProducer = createProducer(annotation);
            // Set instanceName same as the beanName
            mqProducer.setInstanceName(beanName);
            try {
                mqProducer.start();
            } catch (MQClientException e) {
                throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
                    beanName), e);
            }
            RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
            // 注意这 这里才设置 producer 
            rocketMQTemplate.setProducer(mqProducer);
            rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
            log.info("Set real producer to :{} {}", beanName, annotation.value());
        }
    
        private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {
            DefaultMQProducer producer ;
            // 省略过程
            return producer;
        }
    }
    

    从启动日志上来看 ListenerContainerConfiguration 先于 ExtProducerResetConfiguration 执行完成,也就是消费者先注册成功再设置producer 在这期间 如果消费者消费到消息 马上发消息 则producer 为空

解决问题

从上面分析来看,其实也就是consumer 先于producer 启动造成的,所以得想办法调整 两者的顺序

解决方案1:

分析 SmartInitializingSingleton.afterSingletonsInstantiated 调用地方源码可以发现 是根据beanDefinition 顺序来执行的, 所以我们可以尝试修改两者顺序, 第一种是改源码(我们暂时不考虑), 第二种可以尝试

public interface Lifecycle {
    void start();   //容器启动后调用
    void stop();    //容器关闭前调用
    boolean isRunning();  //当前应用是否正在运行
}
public interface SmartLifecycle extends Lifecycle, Phased {
    int DEFAULT_PHASE = 2147483647;
    default boolean isAutoStartup() {  //自动调用start()、stop()方法,默认自动调用
        return true;
    }
    default void stop(Runnable callback) {
        this.stop();    //调用stop()方法
        callback.run(); //如果不调用该方法,等待30s关闭容器;如果调用了该方法,不需要等待就可关闭容器
    }
    default int getPhase() {  //如果有多个继承了SmartLifeCycle接口的类,返回值小的start()方法先调用,stop()方法相反
        return 2147483647;
    }
}

public class DefaultListableBeanFactory extends AbstractAutowireCapableBeanFactory
		implements ConfigurableListableBeanFactory, BeanDefinitionRegistry, Serializable {
    List beanNames = new ArrayList<>(this.beanDefinitionNames);
    // 此处省略代码
	for (String beanName : beanNames) {
		Object singletonInstance = getSingleton(beanName);
		if (singletonInstance instanceof SmartInitializingSingleton) {
			StartupStep smartInitialize = this.getApplicationStartup().start("spring.beans.smart-initialize")
					.tag("beanName", beanName);
			SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance;
			if (System.getSecurityManager() != null) {
				AccessController.doPrivileged((PrivilegedAction) () -> {
					smartSingleton.afterSingletonsInstantiated();
					return null;
				}, getAccessControlContext());
			}
			else {
				smartSingleton.afterSingletonsInstantiated();
			}
			smartInitialize.end();
		}
	}
}
 
解决方案2: 

我们可以延迟 consumer 的启动来解决, 先介绍两个类 Lifecycle SmartLifecycle; 我们可以利用AOP将DefaultRocketMQListenerContainer的start方法延迟至容器启动时才调用

[+++]

具体实现方式:

 * Class Name is RocketMqListenerAspect
 */
@Component
@Aspect
public class RocketMqListenerAspect implements SmartLifecycle {

    private boolean running = false;

    @Pointcut("execution(* org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start())")
    private void pointcut() {
    }

    private final List joinPoints = new ArrayList<>(); // 方案1 持有joinPoint对象
    private final List containers = new ArrayList<>(); // 方案2 持有container对象

    @Around(value = "pointcut()")
    public Object aroundStart(ProceedingJoinPoint pjp) throws Throwable {
        if (!isRunning()) {
            // 如果已经执行过了 就不需要再执行了 
            joinPoints.add(pjp); // 1
            containers.add((DefaultRocketMQListenerContainer) pjp.getThis());//2
            return null;
        }
        return pjp.proceed();
    }

    @Override
    public void stop() {
        // 2 stop 可以不用实现 服务关闭时 会自动 调用 DefaultRocketMQListenerContainer.destroy 方案关闭
        // if (isRunning()) {
        //     System.err.println("stop rocketMq listener!");
        //     this.running = false;
        //     containers.forEach(DefaultRocketMQListenerContainer::stop);
        // }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    @Override
    public void start() {
        if (!isRunning()) {
            System.err.println("start");
            this.running = true;
            // 1
            for (DefaultRocketMQListenerContainer listenerContainer : containers) {
                System.err.println("real start " + listenerContainer);
                listenerContainer.start();
            }
            // 2
            for (ProceedingJoinPoint proceedingJoinPoint : joinPoints) {
                try {
                    System.err.println("real start " + proceedingJoinPoint.getThis());
                    proceedingJoinPoint.proceed();
                } catch (Throwable e) {
                    e.printStackTrace();
                }
            }

        }
    }
    @Override
    public int getPhase() {
        return Integer.MIN_VALUE;
    }
}

这里可以持有 DefaultRocketMQListenerContainer对象 也可以持有 ProceedingJoinPoint 对象 2选一

)
File: /www/wwwroot/outofmemory.cn/tmp/route_read.php, Line: 126, InsideLink()
File: /www/wwwroot/outofmemory.cn/tmp/index.inc.php, Line: 166, include(/www/wwwroot/outofmemory.cn/tmp/route_read.php)
File: /www/wwwroot/outofmemory.cn/index.php, Line: 30, include(/www/wwwroot/outofmemory.cn/tmp/index.inc.php) 记一次RocketMQ服务启动时 NullPointerException问题_随笔_内存溢出

记一次RocketMQ服务启动时 NullPointerException问题

记一次RocketMQ服务启动时 NullPointerException问题,第1张

记一次RocketMQ服务启动时 NullPointerException问题 记一次RocketMQ服务启动时 NullPointerException问题 背景

利用 RcoketMQ @ExtRocketMQTemplateConfiguration 注解配置拓展 RocketMQTemplate 作为发消息的temple,RocketMQConsumer 和 ExtRokcetMQTemplate 在同一个项目里。相关RocketMQ版本信息如下:


    org.apache.rocketmq
    rocketmq-client
    4.9.2



    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.2.1

问题

服务启动时, 报错

java.lang.NullPointerException
	at org.apache.rocketmq.spring.core.RocketMQTemplate.syncSend(RocketMQTemplate.java:475)
	at org.apache.rocketmq.spring.core.RocketMQTemplate.doSend(RocketMQTemplate.java:977)
	at org.apache.rocketmq.spring.core.RocketMQTemplate.doSend(RocketMQTemplate.java:60)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at com.lijun.samples.rocketmq.test2.Producer.sendMessage(Producer.java:22)
	at com.lijun.samples.rocketmq.test2.Task.init(Task.java:30)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:389)
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$Lifecyclemetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:333)
	at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:157)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1778)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:602)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:524)
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean 在springboot启动类上添加 @import({ExtProducerResetConfiguration.class, ListenerContainerConfiguration.class}) 来解决(AbstractBeanFactory.java:335)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208)
	at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:944)
	at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:918)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:583)
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:145)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:754)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:434)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:338)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1332)
	at com.lijun.samples.rocketmq.Test2Application.main(Test2Application.java:13)
问题分析排查

相关代码:

@Component
@ExtRocketMQTemplateConfiguration(nameServer = "${rocketmqServer}", group = "testExtProducer")
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
@Service
@RocketMQMessageListener(nameServer = "${rocketmqServer}",
        topic = "test_send_topic", consumerGroup = "test_group_c")
public class Consumer  implements RocketMQListener {
    @Resource
    private ExtRocketMQTemplate rocketMQTemplate;
    @Override
    public void onMessage(String massage) {
        System.out.println("consumer msg=" + message);
        // 这里只是测试复现代码 真实实现不是这样  这样会导致一直发送和消费消息 自己实现时可以调整下
        rocketMQTemplate.send("test_send_topic", new GenericMessage<>(message));
    }
}

    为什么出现: 服务启动时 RocketMQ Consumer 消费到消息后 处理完后会发送一条MQ消息出去, 我们使用的是 ExtRocketMQTemplateConfiguration 注册的拓展RocketMQTemplate 来发送消息

    源码上来看 ListenerContainerConfiguration 负责创建RocketMQ Listener 也就是注册消费者. ExtProducerResetConfiguration 负责将我们配置的拓展nameServer、group实例化成一个producer ,设置进我们的拓展 ExtRocketMQTemplate

    SmartInitializingSingleton.afterSingletonsInstantiated 调用时机为 所有单例bean 实例化完成后调用

    @Configuration
    public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
        @Override
        public void afterSingletonsInstantiated() {
            Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
                .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    
            beans.forEach(this::registerContainer);
        }
        private void registerContainer(String beanName, Object bean) {
            Class clazz = AopProxyUtils.ultimateTargetClass(bean);
    
            if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
            }
    
            if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
            }
    
            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
    
            String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
            String topic = this.environment.resolvePlaceholders(annotation.topic());
    
            boolean listenerEnabled =
                (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                    .getOrDefault(topic, true);
    
            if (!listenerEnabled) {
                log.debug(
                    "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                    consumerGroup, topic);
                return;
            }
            validate(annotation);
    
            String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
                counter.incrementAndGet());
            GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
    
            genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
                () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
            DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
                DefaultRocketMQListenerContainer.class);
            if (!container.isRunning()) {
                try {
                    // 注意这 直接启动了consumer
                    container.start();
                } catch (Exception e) {
                    log.error("Started container failed. {}", container, e);
                    throw new RuntimeException(e);
                }
            }
    
            log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
        }
    }
    
    @Configuration
    public class ExtProducerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton { 
        @Override
        public void afterSingletonsInstantiated() {
            Map beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class)
                .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    
            beans.forEach(this::registerTemplate);
        }
        private void registerTemplate(String beanName, Object bean) {
            Class clazz = AopProxyUtils.ultimateTargetClass(bean);
    
            if (!RocketMQTemplate.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " is not instance of " + RocketMQTemplate.class.getName());
            }
    
            ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
            GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
            validate(annotation, genericApplicationContext);
    
            DefaultMQProducer mqProducer = createProducer(annotation);
            // Set instanceName same as the beanName
            mqProducer.setInstanceName(beanName);
            try {
                mqProducer.start();
            } catch (MQClientException e) {
                throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
                    beanName), e);
            }
            RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
            // 注意这 这里才设置 producer 
            rocketMQTemplate.setProducer(mqProducer);
            rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
            log.info("Set real producer to :{} {}", beanName, annotation.value());
        }
    
        private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annotation) {
            DefaultMQProducer producer ;
            // 省略过程
            return producer;
        }
    }
    

    从启动日志上来看 ListenerContainerConfiguration 先于 ExtProducerResetConfiguration 执行完成,也就是消费者先注册成功再设置producer 在这期间 如果消费者消费到消息 马上发消息 则producer 为空

解决问题

从上面分析来看,其实也就是consumer 先于producer 启动造成的,所以得想办法调整 两者的顺序

解决方案1:

分析 SmartInitializingSingleton.afterSingletonsInstantiated 调用地方源码可以发现 是根据beanDefinition 顺序来执行的, 所以我们可以尝试修改两者顺序, 第一种是改源码(我们暂时不考虑), 第二种可以尝试

public interface Lifecycle {
    void start();   //容器启动后调用
    void stop();    //容器关闭前调用
    boolean isRunning();  //当前应用是否正在运行
}
public interface SmartLifecycle extends Lifecycle, Phased {
    int DEFAULT_PHASE = 2147483647;
    default boolean isAutoStartup() {  //自动调用start()、stop()方法,默认自动调用
        return true;
    }
    default void stop(Runnable callback) {
        this.stop();    //调用stop()方法
        callback.run(); //如果不调用该方法,等待30s关闭容器;如果调用了该方法,不需要等待就可关闭容器
    }
    default int getPhase() {  //如果有多个继承了SmartLifeCycle接口的类,返回值小的start()方法先调用,stop()方法相反
        return 2147483647;
    }
}

public class DefaultListableBeanFactory extends AbstractAutowireCapableBeanFactory
		implements ConfigurableListableBeanFactory, BeanDefinitionRegistry, Serializable {
    List beanNames = new ArrayList<>(this.beanDefinitionNames);
    // 此处省略代码
	for (String beanName : beanNames) {
		Object singletonInstance = getSingleton(beanName);
		if (singletonInstance instanceof SmartInitializingSingleton) {
			StartupStep smartInitialize = this.getApplicationStartup().start("spring.beans.smart-initialize")
					.tag("beanName", beanName);
			SmartInitializingSingleton smartSingleton = (SmartInitializingSingleton) singletonInstance;
			if (System.getSecurityManager() != null) {
				AccessController.doPrivileged((PrivilegedAction) () -> {
					smartSingleton.afterSingletonsInstantiated();
					return null;
				}, getAccessControlContext());
			}
			else {
				smartSingleton.afterSingletonsInstantiated();
			}
			smartInitialize.end();
		}
	}
}
 
解决方案2: 

我们可以延迟 consumer 的启动来解决, 先介绍两个类 Lifecycle SmartLifecycle; 我们可以利用AOP将DefaultRocketMQListenerContainer的start方法延迟至容器启动时才调用

具体实现方式:

 * Class Name is RocketMqListenerAspect
 */
@Component
@Aspect
public class RocketMqListenerAspect implements SmartLifecycle {

    private boolean running = false;

    @Pointcut("execution(* org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer.start())")
    private void pointcut() {
    }

    private final List joinPoints = new ArrayList<>(); // 方案1 持有joinPoint对象
    private final List containers = new ArrayList<>(); // 方案2 持有container对象

    @Around(value = "pointcut()")
    public Object aroundStart(ProceedingJoinPoint pjp) throws Throwable {
        if (!isRunning()) {
            // 如果已经执行过了 就不需要再执行了 
            joinPoints.add(pjp); // 1
            containers.add((DefaultRocketMQListenerContainer) pjp.getThis());//2
            return null;
        }
        return pjp.proceed();
    }

    @Override
    public void stop() {
        // 2 stop 可以不用实现 服务关闭时 会自动 调用 DefaultRocketMQListenerContainer.destroy 方案关闭
        // if (isRunning()) {
        //     System.err.println("stop rocketMq listener!");
        //     this.running = false;
        //     containers.forEach(DefaultRocketMQListenerContainer::stop);
        // }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    @Override
    public void start() {
        if (!isRunning()) {
            System.err.println("start");
            this.running = true;
            // 1
            for (DefaultRocketMQListenerContainer listenerContainer : containers) {
                System.err.println("real start " + listenerContainer);
                listenerContainer.start();
            }
            // 2
            for (ProceedingJoinPoint proceedingJoinPoint : joinPoints) {
                try {
                    System.err.println("real start " + proceedingJoinPoint.getThis());
                    proceedingJoinPoint.proceed();
                } catch (Throwable e) {
                    e.printStackTrace();
                }
            }

        }
    }
    @Override
    public int getPhase() {
        return Integer.MIN_VALUE;
    }
}

这里可以持有 DefaultRocketMQListenerContainer对象 也可以持有 ProceedingJoinPoint 对象 2选一

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5704927.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)