- 概念介绍
- 生产者YML配置
- 生产者发送消息
- 消费者监听信息(Push方式的信息)
- 使用@RocketMQMessageListener
- 消费者监听信息(Pull方式的信息)
- 死信队列
示意图:
https://www.cnblogs.com/weifeng1463/p/12889300.html
生产者YML配置# 名称服务器和生产者组必须配置 rocketmq: producer: group: my-producer name-server: 127.0.0.1:9876
RocketMQAutoConfiguration.java
@Bean(PRODUCER_BEAN_NAME) @ConditionalOnMissingBean(DefaultMQProducer.class) @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"}) public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) { RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer(); String nameServer = rocketMQProperties.getNameServer(); String groupName = producerConfig.getGroup(); // 名称服务器和生产者组必须配置 Assert.hasText(nameServer, "[rocketmq.name-server] must not be null"); Assert.hasText(groupName, "[rocketmq.producer.group] must not be null"); String accessChannel = rocketMQProperties.getAccessChannel(); String ak = rocketMQProperties.getProducer().getAccessKey(); String sk = rocketMQProperties.getProducer().getSecretKey(); boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace(); String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic(); DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic); producer.setNamesrvAddr(nameServer); if (!StringUtils.isEmpty(accessChannel)) { producer.setAccessChannel(AccessChannel.valueOf(accessChannel)); } producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout()); producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed()); producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed()); producer.setMaxMessageSize(producerConfig.getMaxMessageSize()); producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold()); producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer()); return producer; }生产者发送消息
@RestController public class ProducerController { @Autowired private RocketMQTemplate rocketMQTemplate; @GetMapping("/send1") public void send1() { // 直接发送消息 rocketMQTemplate.sendOneWay("my-topic", "直接发送信息"); // 发送有序消息,第三个参数hashKey必保持一致,才能分配到一个队列中 消费者@RocketMQMessageListener(consumeMode = ConsumeMode.ORDERLY)才能有序消费 rocketMQTemplate.sendOneWayOrderly("my-topic", "有序信息", "1"); System.out.println("直接发送信息"); } @GetMapping("/send2") public void send2() { for (int i = 0; i < 5; i++) { rocketMQTemplate.syncSend("my-topic", "同步发送信息"); // 发送延迟消息 rocketMQTemplate.syncSend("my-topic", MessageBuilder.withPayload("同步发送信息" + i).build(), 1000, 3); } } @GetMapping("/send3") public void send3() { rocketMQTemplate.asyncSend("my-topic", "异步消息", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult.getSendStatus()); } @Override public void onException(Throwable e) { e.printStackTrace(); } }); } }消费者监听信息(Push方式的信息)
– 被动的收取发送过来的信息
使用@RocketMQMessageListener@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @documented public @interface RocketMQMessageListener { String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}"; String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}"; String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}"; String TRACE_TOPIC_PLAqCEHOLDER = "${rocketmq.consumer.customized-trace-topic:}"; String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}"; String consumerGroup(); String topic(); SelectorType selectorType() default SelectorType.TAG; String selectorexpression() default "*"; ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; MessageModel messageModel() default MessageModel.CLUSTERING; int consumeThreadMax() default 64; long consumeTimeout() default 15L; String accessKey() default ACCESS_KEY_PLACEHOLDER; String secretKey() default SECRET_KEY_PLACEHOLDER; boolean enableMsgTrace() default true; String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER; String nameServer() default NAME_SERVER_PLACEHOLDER; String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER; }
注解类通过实现RocketMQListener接口重写onMessage方法收取消息(详情看下方例子)
ListenerContainerConfiguration.java
广播模式下消息不能顺序消费
private void validate(RocketMQMessageListener annotation) { if (annotation.consumeMode() == ConsumeMode.ORDERLY && annotation.messageModel() == MessageModel.BROADCASTING) { throw new BeanDefinitionValidationException( "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!"); } }
消费者组和主题必须要配置,否则将会初始化失败
boolean listenerEnabled = (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); return; } validate(annotation);
DefaultRocketMQListenerContainer.java
可通过实现RocketMQPushConsumerLifecycleListener或RocketMQPushConsumerLifecycleListener接口重写prepareStart方法配置消费者属性
if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer); }
例子:(请不要在onMessage方法中捕捉任何异常,因为没有异常代表消费成功)
@Service @RocketMQMessageListener(consumerGroup = "my-consumer", topic = "my-topic") public class Producer1Service implements RocketMQListener消费者监听信息(Pull方式的信息), RocketMQPushConsumerLifecycleListener { private int a = 1; @Override public void onMessage(String message) { System.out.println("======================================"); System.out.println("第" + a + "次接受"); a++; System.out.println("======================================"); System.out.println("consumer1-1:" + message); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 设置消费者消息重试次数 consumer.setMaxReconsumeTimes(1); // 设置客户端实例名称 consumer.setInstanceName("Producer1"); } }
– 主动拉取信息
YML配置
# 必须配置名称服务器,消费者组和主题 rocketmq: name-server: 127.0.0.1:9876 consumer: group: my-consumer topic: my-topic
RocketMQAutoConfiguration.java
@Bean(CONSUMER_BEAN_NAME) @ConditionalOnMissingBean(DefaultLitePullConsumer.class) @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"}) public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties) throws MQClientException { RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer(); String nameServer = rocketMQProperties.getNameServer(); String groupName = consumerConfig.getGroup(); String topicName = consumerConfig.getTopic(); // 必须配置名称服务器,消费者组和主题 Assert.hasText(nameServer, "[rocketmq.name-server] must not be null"); Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null"); Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null"); String accessChannel = rocketMQProperties.getAccessChannel(); MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel()); SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType()); String selectorexpression = consumerConfig.getSelectorexpression(); String ak = consumerConfig.getAccessKey(); String sk = consumerConfig.getSecretKey(); int pullBatchSize = consumerConfig.getPullBatchSize(); DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,groupName, topicName, messageModel, selectorType, selectorexpression, ak, sk, pullBatchSize); return litePullConsumer; }
例子:
@Service public class Test { private DefaultLitePullConsumer litePullConsumer; @Autowired public Test(DefaultLitePullConsumer litePullConsumer) throws MQClientException { this.litePullConsumer = litePullConsumer; System.out.println(litePullConsumer.getPullBatchSize()); // 开启 litePullConsumer.start(); this.test(); } public void test() { try { while (true) { // 拉取信息 List死信队列messageExts = litePullConsumer.poll(); messageExts.forEach(e -> { try { System.out.println(new String(e.getBody(), "utf-8")); } catch (UnsupportedEncodingException unsupportedEncodingException) { unsupportedEncodingException.printStackTrace(); } }); } } finally { // 关闭 litePullConsumer.shutdown(); } } }
超过最大重试次数,信息依然没有被消费,该信息会进入一个特殊的队列,即死信队列。
例子:
@Service @RocketMQMessageListener(consumerGroup = "my-consumer", topic = "%DLQ%my-consumer") public class Producer3Service implements RocketMQListener{ @Override public void onMessage(String message) { System.out.println("==================死信队列消费信息:" + message); } }
死信队列默认权限是2(只写),需要改成6(可读可写)。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)