RocketMQ学习笔记

RocketMQ学习笔记,第1张

RocketMQ学习笔记

优点:异步、解耦、削峰

缺点:系统可用性降低、复杂性提高、系统的一致性(若A系统成功,B、C系统失败)

适用于上下游关系,上游作为生产者,下游作为消费者

Producer 生产者

Producer Group 生产者组,发送同一类消息的生产者

Consumer 消息消费者

Consumer Group 消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组

Name Server 为producer和cunsumer提供路由信息

Broker Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备

Windows安装

1、下载安装包,解压,并配置环境变量(ROCKETMQ_HOME 为解压路径)

2、在解压bin目录下启动cmd

3、start mqnamesrv.cmd

4、start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

测试

配置系统环境变量:NAMESRV_ADDR localhost:9876

发送消息:

tools.cmd org.apache.rocketmq.example.quickstart.Producer

接收消息:

tools.cmd org.apache.rocketmq.example.quickstart.Consumer

注意

GroupName代表具有相同角色的生产者组合或消费者组合,称为生产者组或消费者组

作用是在集群HA的情况下,一个生产者down之后,本地事务回滚后,可以继续联系该组下的另外一个生产者实例,不至于导致业务走不下去。在消费者组中,可以实现消息消费的负载均衡和消息容错目标

另外,有了GroupName,在集群下,动态扩展容量很方便。只需要在新加的机器中,配置相同的GroupName。启动后,就立即能加入到所在的群组中,参与消息生产或消费

消费者的组名配置:rocketmq.consumer.group=consumerGroup1

生产者的组名配置:rocketmq.producer.group=producerGroup1

添加依赖
        
            org.apache.rocketmq
            rocketmq-client
            4.9.2
        
        
        
            org.apache.rocketmq
            rocketmq-spring-boot-starter
            2.2.1
        
生产者
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");//定义生产者组
producer.setNamesrvAddr("127.0.0.1:9876");//绑定生产者和 nameserver ,就是建立 和broker程序的关系
producer.start();
Message message = new Message("orderTopic","MESSAGEINFO".getBytes());
//2.通过注入的 消息提供者对象发送消息
SendResult send = producer.send(message);
​
​
//SpringBoot整合,在接口中调用生产者
@Controller
@Slf4j
@RequestMapping("/RocketMQ")
public class ProducerService {
    
    @Autowired
    RocketMQTemplate template;
    
    @PostMapping("/rocketMQ")
    public String RocketMQTEST(@RequestBody JSonObject params) throws Exception {
        template.convertAndSend("topic1:tag1",params.get("msg").toString());
        return "SUCCESS";
    }
}
消费者
//springboot整合
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "topic0", selectorexpression="tag0")
public class ConsumerService implements RocketMQListener{
​
    @Override
    public void onMessage(String message) {
        // TODO Auto-generated method stub
        System.out.println("message: " + message);//输出消息内容
        
    }
}
消费模式

RocketMQ有两种消费模式(MessageModel):BROADCASTING广播模式,CLUSTERING集群模式,默认的是集群消费模式。

广播模式是指,一个消息被多个consumer消费

在消费者监控中配置:@RocketMQMessageListener(messageModel=MessageModel.BROADCASTING)

集群模式是指:一个consumerGroup中的consumer平均分配消费信息,消费完的信息不能被其它消费者消费

在消费者监控中配置:@RocketMQMessageListener(messageModel=MessageModel.CLUSTERING)

消息类型

同步消息

即时性比较强,重要的消息,且必须有回执的消息。(回执是指rocketMQ的回执,代表发送成功)

@PostMapping("/rocketMQ")
    public String RocketMQTEST(@RequestBody JSonObject params) throws Exception {
        SendResult result = template.syncSend("topic1:tag1",params.get("msg").toString());
        return "SUCCESS";
    }

异步消息

即时性较弱,但需要有回执的消息

@PostMapping("/rocketMQ")
    public String RocketMQTEST(@RequestBody JSonObject params) throws Exception {
                        template.asyncSend("topic1:tag1",params.get("msg").toString(), new SendCallback() {
​
            @Override
            public void onSuccess(SendResult sendResult) {
                // TODO Auto-generated method stub
                System.out.println("发送成功:" + sendResult);
            }
​
            @Override
            public void onException(Throwable e) {
                // TODO Auto-generated method stub
                System.out.println("消息发送异常");
                e.printStackTrace();
            }
            
        });
    }

单向消息

不需要有回执的信息,例如日志信息

template.sendoneWay("topic1:tag1",params.get("msg").toString());

延迟消息

RocketMQ 不支持任意时间自定义的延迟消息,仅支持内置预设值的延迟时间间隔的延迟消息。

默认延迟级别:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

//同步延时发送
        SendResult result = template.syncSend("topic1:tag1",MessageBuilder.withPayload(params.get("msg").toString()).build(),5000,5);//5000超时时间,5延时发送
        System.out.println(result);
​
//异步延时发送
template.asyncSend("topic1:tag1",MessageBuilder.withPayload(params.get("msg").toString()).build(), new SendCallback() {
​
            @Override
            public void onSuccess(SendResult sendResult) {
                // TODO Auto-generated method stub
                System.out.println("发送成功:" + sendResult);
            }
​
            @Override
            public void onException(Throwable e) {
                // TODO Auto-generated method stub
                System.out.println("消息发送异常");
                e.printStackTrace();
            }
            
        },5000,5);

批量消息

这些批量消息应该有相同的topic

不能是延时消息

消息内容总长度不超过4M

//导入org.springframework.messaging.Message
List messages = new ArrayList();
        messages.add(MessageBuilder.withPayload(params.get("msg").toString().concat(">>>>1")).build());
        messages.add(MessageBuilder.withPayload(params.get("msg").toString().concat(">>>>2")).build());
        messages.add(MessageBuilder.withPayload(params.get("msg").toString().concat(">>>>3")).build());
        
        SendResult result = template.syncSend("topic1:tag1",messages);
        System.out.println(result);

tag过滤
//说明:如果要消费所有 tag,用通配符*代替所有tag,否则设置需要的tag
consumer.subscribe("orderTopic","*");
​
consumer.subscribe("orderTopic","tag1 || tag2");//消费tag1或tag2
​
consumer.subscribe("orderTopic","tag1");//只消费tag1
语法过滤

在broker.conf中添加配置 enablePropertyFilter=true,重启broker

//生产者设置message属性
message.putUserProperty("key", "value");
//消费者
consumer.subscribe("topic", MessageSelector.bySql("key = 'key'"));
消息丢失问题

1、生产者多次重试,默认重试发送两次,可调整

2、RocketMQ搭建多主多从结构

双主双从架构

主从节点间同步复制

异步刷盘(持久化,从内存到磁盘)

3、消费者丢失消息

重试获取15次后仍未成功,则放入死信队列,手工处理死信队列

消息重复问题

在生产者认为网络波动时,会重复发送第二次消息,可能会造成消息重复

消费者使用去重表确保幂等性。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5683644.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存