Windows安装优点:异步、解耦、削峰
缺点:系统可用性降低、复杂性提高、系统的一致性(若A系统成功,B、C系统失败)
Producer 生产者
Producer Group 生产者组,发送同一类消息的生产者
Consumer 消息消费者
Consumer Group 消费者组,消费同一类消息的多个 consumer 实例组成一个消费者组
Name Server 为producer和cunsumer提供路由信息
Broker Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备
测试1、下载安装包,解压,并配置环境变量(ROCKETMQ_HOME 为解压路径)
2、在解压bin目录下启动cmd
3、start mqnamesrv.cmd
4、start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
注意配置系统环境变量:NAMESRV_ADDR localhost:9876
发送消息:
tools.cmd org.apache.rocketmq.example.quickstart.Producer
接收消息:
tools.cmd org.apache.rocketmq.example.quickstart.Consumer
添加依赖GroupName代表具有相同角色的生产者组合或消费者组合,称为生产者组或消费者组
作用是在集群HA的情况下,一个生产者down之后,本地事务回滚后,可以继续联系该组下的另外一个生产者实例,不至于导致业务走不下去。在消费者组中,可以实现消息消费的负载均衡和消息容错目标
另外,有了GroupName,在集群下,动态扩展容量很方便。只需要在新加的机器中,配置相同的GroupName。启动后,就立即能加入到所在的群组中,参与消息生产或消费
消费者的组名配置:rocketmq.consumer.group=consumerGroup1
生产者的组名配置:rocketmq.producer.group=producerGroup1
生产者org.apache.rocketmq rocketmq-client4.9.2 org.apache.rocketmq rocketmq-spring-boot-starter2.2.1
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");//定义生产者组 producer.setNamesrvAddr("127.0.0.1:9876");//绑定生产者和 nameserver ,就是建立 和broker程序的关系 producer.start(); Message message = new Message("orderTopic","MESSAGEINFO".getBytes()); //2.通过注入的 消息提供者对象发送消息 SendResult send = producer.send(message); //SpringBoot整合,在接口中调用生产者 @Controller @Slf4j @RequestMapping("/RocketMQ") public class ProducerService { @Autowired RocketMQTemplate template; @PostMapping("/rocketMQ") public String RocketMQTEST(@RequestBody JSonObject params) throws Exception { template.convertAndSend("topic1:tag1",params.get("msg").toString()); return "SUCCESS"; } }消费者
//springboot整合 @Component @RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}", topic = "topic0", selectorexpression="tag0") public class ConsumerService implements RocketMQListener消费模式{ @Override public void onMessage(String message) { // TODO Auto-generated method stub System.out.println("message: " + message);//输出消息内容 } }
消息类型RocketMQ有两种消费模式(MessageModel):BROADCASTING广播模式,CLUSTERING集群模式,默认的是集群消费模式。
广播模式是指,一个消息被多个consumer消费
在消费者监控中配置:@RocketMQMessageListener(messageModel=MessageModel.BROADCASTING)
集群模式是指:一个consumerGroup中的consumer平均分配消费信息,消费完的信息不能被其它消费者消费
在消费者监控中配置:@RocketMQMessageListener(messageModel=MessageModel.CLUSTERING)
同步消息
即时性比较强,重要的消息,且必须有回执的消息。(回执是指rocketMQ的回执,代表发送成功)
@PostMapping("/rocketMQ") public String RocketMQTEST(@RequestBody JSonObject params) throws Exception { SendResult result = template.syncSend("topic1:tag1",params.get("msg").toString()); return "SUCCESS"; }
异步消息
即时性较弱,但需要有回执的消息
@PostMapping("/rocketMQ") public String RocketMQTEST(@RequestBody JSonObject params) throws Exception { template.asyncSend("topic1:tag1",params.get("msg").toString(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // TODO Auto-generated method stub System.out.println("发送成功:" + sendResult); } @Override public void onException(Throwable e) { // TODO Auto-generated method stub System.out.println("消息发送异常"); e.printStackTrace(); } }); }
单向消息
不需要有回执的信息,例如日志信息
template.sendoneWay("topic1:tag1",params.get("msg").toString());
延迟消息
RocketMQ 不支持任意时间自定义的延迟消息,仅支持内置预设值的延迟时间间隔的延迟消息。
默认延迟级别:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
//同步延时发送 SendResult result = template.syncSend("topic1:tag1",MessageBuilder.withPayload(params.get("msg").toString()).build(),5000,5);//5000超时时间,5延时发送 System.out.println(result); //异步延时发送 template.asyncSend("topic1:tag1",MessageBuilder.withPayload(params.get("msg").toString()).build(), new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // TODO Auto-generated method stub System.out.println("发送成功:" + sendResult); } @Override public void onException(Throwable e) { // TODO Auto-generated method stub System.out.println("消息发送异常"); e.printStackTrace(); } },5000,5);
tag过滤批量消息
这些批量消息应该有相同的topic
不能是延时消息
消息内容总长度不超过4M
//导入org.springframework.messaging.Message Listmessages = new ArrayList(); messages.add(MessageBuilder.withPayload(params.get("msg").toString().concat(">>>>1")).build()); messages.add(MessageBuilder.withPayload(params.get("msg").toString().concat(">>>>2")).build()); messages.add(MessageBuilder.withPayload(params.get("msg").toString().concat(">>>>3")).build()); SendResult result = template.syncSend("topic1:tag1",messages); System.out.println(result);
//说明:如果要消费所有 tag,用通配符*代替所有tag,否则设置需要的tag consumer.subscribe("orderTopic","*"); consumer.subscribe("orderTopic","tag1 || tag2");//消费tag1或tag2 consumer.subscribe("orderTopic","tag1");//只消费tag1语法过滤
在broker.conf中添加配置 enablePropertyFilter=true,重启broker
//生产者设置message属性 message.putUserProperty("key", "value"); //消费者 consumer.subscribe("topic", MessageSelector.bySql("key = 'key'"));消息丢失问题
1、生产者多次重试,默认重试发送两次,可调整
2、RocketMQ搭建多主多从结构
双主双从架构
主从节点间同步复制
异步刷盘(持久化,从内存到磁盘)
消息重复问题3、消费者丢失消息
重试获取15次后仍未成功,则放入死信队列,手工处理死信队列
在生产者认为网络波动时,会重复发送第二次消息,可能会造成消息重复
消费者使用去重表确保幂等性。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)