rocketmq基础:发送消息、延时消费、消费重试

rocketmq基础:发送消息、延时消费、消费重试,第1张

rocketmq支持3种消息发送方式
//rocketmq-spring-boot-starter提供的 *** 作rocketmq的类
@Autowired
private RocketMQTemplaterocketMQTemplate;
同步消息(sync message)

producer向broker发送消息,broker服务器返回发送结果后再继续往下执行

public void sendSyncMsg(Stringtopic,String msg){
    rocketMQTemplate.syncSend(tpic,msg);
}
异步消息(sync message)

producer向broker发送消息时指定消息发送成功以及发送异常的回调方法,producer发送消息后线程不阻塞继续往下执行,消息发送成功或失败的回调任务在新的线程中执行

public void sendAsyncMsg(String topic,String msg){
    rocketMQTemplate.asyncSend(topic, msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println(sendResult);
            
        @Override
        public void onException(Throwable throwable) {
            System.out.println(throwable.getMessage());
        }
    });
}
单向消息(sync message)

producer向broker发送消息不等待broker服务器的结果

public void sendOneWayMsg(String topic,String msg){
    rocketMQTemplate.sendOneWay(topic,msg);
}
rocketmq发送自定义消息类型 生产者
public void sendMsgByJson(String topic, OrderExt orderExt){
    //同步发送
    rocketMQTemplate.convertndSend(topic,orderExt);
}
消费者
@Component
@RocketMQMessageListener(topic = "rocketmq-demo-obj",consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener {
    @Override
    public void onMessage(OrderExt orderExt) {
        System.out.println(orderExt);
    }
}
rocketmq发送延迟消息

发送延迟消息只需要设置延迟时间即可,延迟时间存在18个等级(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h),调用setDelayTimeLevel(level)设置与延迟时间相对于的延迟级别即可。

发送同步消息

public void sendMsgByJsonDelay(String topic, OrderExt orderExt){
    Message message =MessageBuilder.withPayload(orderExt).build();
    //第四个参数设置延迟级别
    rocketMQTemplate.syncSend(topic,message,3000,3);
    System.out.printf("send msg:%s",orderExt);
}

发送异步消息

public void sendAsyncMsgByJsonDelay(String topic, OrderExt orderExt) throws RemotingException, MQClientException, InterruptedException {
    String s = JSON.toJSONString(orderExt);
    org.apache.rocketmq.common.message.Message message = neworg.apache.rocketmq.common.message.Message(topic,s.getBytes(Charset.orName("utf-8")));
    message.setDelayTimeLevel(3);
    //异步消息使用原生方法发送
    rocketMQTemplate.getProducer().send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println(sendResult);
        }
        @Override
        public void onException(Throwable throwable) {
            System.out.println(throwable.getMessage());
        }
    });
    System.out.printf("sent msg:%s",s);
}
rocketmq消费重试
@Component
@RocketMQMessageListener(topic = "rocketmq-demo-obj",consumerGroup = "demo-consumer-group-obj")
public class ConsumerSimpleObj implements RocketMQListener {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println(new Date());
        System.out.println(JSON.parseObject(new String(messageExt.getBody()),OrderExt.class));
        //消费发生异常后会重复消费同一数据,默认会按第3级及之后的延时时间间隔重复消费(即第一次重复消费在首次消费10s后,第二次重复消费在第一次重复消费30s后,以此类推),可以根据消费次数终止重复消费
        if(messageExt.getReconsumeTimes()>2){
            System.out.println("停止重试,写入数据库...");
            return;
        }
        
        throw new RuntimeException("处理消息失败");
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/919960.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存