RocketMQ

RocketMQ,第1张

RocketMQ

RocketMQTemplate的使用
      • 概念介绍
      • 生产者YML配置
            • 生产者发送消息
      • 消费者监听信息(Push方式的信息)
            • 使用@RocketMQMessageListener
      • 消费者监听信息(Pull方式的信息)
      • 死信队列

概念介绍

示意图:

https://www.cnblogs.com/weifeng1463/p/12889300.html

生产者YML配置
# 名称服务器和生产者组必须配置
rocketmq:
  producer:
    group: my-producer
  name-server: 127.0.0.1:9876

RocketMQAutoConfiguration.java

@Bean(PRODUCER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultMQProducer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "producer.group"})
public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties) {
    RocketMQProperties.Producer producerConfig = rocketMQProperties.getProducer();
    String nameServer = rocketMQProperties.getNameServer();
    String groupName = producerConfig.getGroup();

    // 名称服务器和生产者组必须配置
    Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
    Assert.hasText(groupName, "[rocketmq.producer.group] must not be null");

    String accessChannel = rocketMQProperties.getAccessChannel();

    String ak = rocketMQProperties.getProducer().getAccessKey();
    String sk = rocketMQProperties.getProducer().getSecretKey();
    boolean isEnableMsgTrace = rocketMQProperties.getProducer().isEnableMsgTrace();
    String customizedTraceTopic = rocketMQProperties.getProducer().getCustomizedTraceTopic();

    DefaultMQProducer producer = RocketMQUtil.createDefaultMQProducer(groupName, ak, sk, isEnableMsgTrace, customizedTraceTopic);

    producer.setNamesrvAddr(nameServer);
    if (!StringUtils.isEmpty(accessChannel)) {
        producer.setAccessChannel(AccessChannel.valueOf(accessChannel));
    }
    producer.setSendMsgTimeout(producerConfig.getSendMessageTimeout());
    producer.setRetryTimesWhenSendFailed(producerConfig.getRetryTimesWhenSendFailed());
    producer.setRetryTimesWhenSendAsyncFailed(producerConfig.getRetryTimesWhenSendAsyncFailed());
    producer.setMaxMessageSize(producerConfig.getMaxMessageSize());
    producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold());
    producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());

    return producer;
}
生产者发送消息
@RestController
public class ProducerController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    
    @GetMapping("/send1")
    public void send1() {
        // 直接发送消息
        rocketMQTemplate.sendOneWay("my-topic", "直接发送信息");
        // 发送有序消息,第三个参数hashKey必保持一致,才能分配到一个队列中 消费者@RocketMQMessageListener(consumeMode = ConsumeMode.ORDERLY)才能有序消费
        rocketMQTemplate.sendOneWayOrderly("my-topic", "有序信息", "1");
        System.out.println("直接发送信息");
    }

    
    @GetMapping("/send2")
    public void send2() {
        for (int i = 0; i < 5; i++) {
            rocketMQTemplate.syncSend("my-topic", "同步发送信息");
            // 发送延迟消息
            rocketMQTemplate.syncSend("my-topic", MessageBuilder.withPayload("同步发送信息" + i).build(), 1000, 3);
        }
    }

    
    @GetMapping("/send3")
    public void send3() {
        rocketMQTemplate.asyncSend("my-topic", "异步消息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult.getSendStatus());
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });
    }
}
消费者监听信息(Push方式的信息)

– 被动的收取发送过来的信息

使用@RocketMQMessageListener
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@documented
public @interface RocketMQMessageListener {
    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
    String TRACE_TOPIC_PLAqCEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
    
    String consumerGroup();
    
    String topic();

    
    SelectorType selectorType() default SelectorType.TAG;

    
    String selectorexpression() default "*";
    
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
    
    MessageModel messageModel() default MessageModel.CLUSTERING;
    
    int consumeThreadMax() default 64;
    
    long consumeTimeout() default 15L;
    
    String accessKey() default ACCESS_KEY_PLACEHOLDER;
    
    String secretKey() default SECRET_KEY_PLACEHOLDER;
    
    boolean enableMsgTrace() default true;
    
    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
    
    String nameServer() default NAME_SERVER_PLACEHOLDER;
    
    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}

注解类通过实现RocketMQListener接口重写onMessage方法收取消息(详情看下方例子)

ListenerContainerConfiguration.java

广播模式下消息不能顺序消费

private void validate(RocketMQMessageListener annotation) {
        if (annotation.consumeMode() == ConsumeMode.ORDERLY &&
            annotation.messageModel() == MessageModel.BROADCASTING) {
            throw new BeanDefinitionValidationException(
                "Bad annotation definition in @RocketMQMessageListener, messageModel BROADCASTING does not support ORDERLY message!");
        }
    }

消费者组和主题必须要配置,否则将会初始化失败

boolean listenerEnabled =
            (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                .getOrDefault(topic, true);

if (!listenerEnabled) {
    log.debug(
        "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
        consumerGroup, topic);
    return;
}
validate(annotation);

DefaultRocketMQListenerContainer.java

可通过实现RocketMQPushConsumerLifecycleListener或RocketMQPushConsumerLifecycleListener接口重写prepareStart方法配置消费者属性

if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
    ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
} else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
    ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
}

例子:(请不要在onMessage方法中捕捉任何异常,因为没有异常代表消费成功)

@Service
@RocketMQMessageListener(consumerGroup = "my-consumer", topic = "my-topic")
public class Producer1Service implements RocketMQListener, RocketMQPushConsumerLifecycleListener {
    private int a = 1;

    @Override
    public void onMessage(String message) {
        System.out.println("======================================");
        System.out.println("第" + a + "次接受");
        a++;
        System.out.println("======================================");
        System.out.println("consumer1-1:" + message);
    }

    @Override
    public void prepareStart(DefaultMQPushConsumer consumer) {
        // 设置消费者消息重试次数
        consumer.setMaxReconsumeTimes(1);
        // 设置客户端实例名称
        consumer.setInstanceName("Producer1");
    }
}
消费者监听信息(Pull方式的信息)

– 主动拉取信息

YML配置

# 必须配置名称服务器,消费者组和主题
rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: my-consumer
    topic: my-topic

RocketMQAutoConfiguration.java

@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultLitePullConsumer.class)
@ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
    throws MQClientException {
    RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
    String nameServer = rocketMQProperties.getNameServer();
    String groupName = consumerConfig.getGroup();
    String topicName = consumerConfig.getTopic();

    // 必须配置名称服务器,消费者组和主题
    Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
    Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
    Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");

    String accessChannel = rocketMQProperties.getAccessChannel();
    MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
    SelectorType selectorType = SelectorType.valueOf(consumerConfig.getSelectorType());
    String selectorexpression = consumerConfig.getSelectorexpression();
    String ak = consumerConfig.getAccessKey();
    String sk = consumerConfig.getSecretKey();
    int pullBatchSize = consumerConfig.getPullBatchSize();

    DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,groupName, topicName, messageModel, selectorType, selectorexpression, ak, sk, pullBatchSize);
    return litePullConsumer;
}

例子:

@Service
public class Test {

    private DefaultLitePullConsumer litePullConsumer;

    @Autowired
    public Test(DefaultLitePullConsumer litePullConsumer) throws MQClientException {
        this.litePullConsumer = litePullConsumer;
        System.out.println(litePullConsumer.getPullBatchSize());
        // 开启
        litePullConsumer.start();
        this.test();
    }

    public void test() {
        try {
            while (true) {
                // 拉取信息
                List messageExts = litePullConsumer.poll();
                messageExts.forEach(e -> {
                    try {
                        System.out.println(new String(e.getBody(), "utf-8"));
                    } catch (UnsupportedEncodingException unsupportedEncodingException) {
                        unsupportedEncodingException.printStackTrace();
                    }
                });
            }
        } finally {
            // 关闭
            litePullConsumer.shutdown();
        }
    }
}
死信队列

超过最大重试次数,信息依然没有被消费,该信息会进入一个特殊的队列,即死信队列。

例子:

@Service
@RocketMQMessageListener(consumerGroup = "my-consumer", topic = "%DLQ%my-consumer")
public class Producer3Service implements RocketMQListener {
    @Override
    public void onMessage(String message) {
        System.out.println("==================死信队列消费信息:" + message);
    }
}

死信队列默认权限是2(只写),需要改成6(可读可写)。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5665215.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存