服务之间通信的中间件。 可以让应用之间解耦,相互之间依赖减小,形成异步调用。还可以用来流量削峰。数据分发。
但是会有消息一致性问题,系统复杂性增加,如果Mq宕机,系统可用性会降低。
灵活可扩展,支持海量消息单机10万级别,使用文件做持久化, 并支持分布式事务(虽然可能造成较多的写脏), 异步刷盘,内存预分配, 高可用采用了同步双写及异步复制的方式
三.介绍
RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。
其主要功能有1.灵活可扩展性、2.海量消息堆积能力、3.支持顺序消息、4.多种消息过滤方式、5.支持事务消息、6.回溯消费等常用功能。
RocketMQ 核心的四大组件:
Name Server(消息的总控制)、是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。 在消息队列 RocketMQ 中提供命名服务,更新和发现 Broker 服务。
NameServer即名称服务,两个功能:
接收broker的请求,注册broker的路由信息
接收client(producer/consumer)的请求,根据某个topic获取其到broker的路由信息
NameServer没有状态,可以横向扩展。每个broker在启动的时候会到NameServer注册; Producer在发送消息前会根据topic到NameServer获取路由(到broker)信息;Consumer也会定 时获取topic路由信息。
Broker(分发消息),消息中转角色,负责存储消息,转发消息,可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker是RocketMQ的核心,它不不能挂的,所以需要保证broker的高可用。
Producer(生产者)、
Consumer(消费者) ,
每个组件都可以部署成集群模式进行水平扩展。
消息由topic区分消息类型(一级分类):如订单消息,物流消息等
tag为二级分类
message queue为消息类型下的消息队列。
用于并行发送和接受消息。
模式:broker分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker
Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。
每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。Broker 启动后需要完成一次将自己注册至 Name Server 的 *** 作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。Master Broker 和其对应的 Slave Broker会进行数据同步
单master模式 可靠性低风险大,宕机后服务将不可用,线上线上慎用
多master模式 配置简单,单master宕机或重启,其他master还可以继续提供服务
多master多slaver(异步复制)(主从模式)即使磁盘损坏,因为还有从服务,只会丢失异步复制瞬间差的非常少量数据。性能和多master差不多
多master多slaver(同步复制)(主从模式) 服务可用性和数据的可靠性都非常高,消息无延迟,丢失概率低。但是性能相对咯低。
四.集群工作流程
1.启动NameServer, NameServer起来后监听端口,等待Broker. Producer. Consumer连 上来,相当于-个路由控制中心。
2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
3.收发消息前,先创建Topic, 创建Topic时需要指定该Topic要存储在哪些Broker上, 也可以在发送消息时自动创建Topic.
4. Producer发送消息,启动时先跟NameServer集群中的其中- -台建立长连接, 并从NameServer中获取当前发送的Topic存 在哪些Broker上,轮询从队列列表中选择-一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
5. Consumer跟Producer类似,跟其中-台NameServer建立长连接, 获取当前订阅Topic存在哪些Broker上, 然后直接跟Broker建立连接通道,开始消费消息。
6.集群搭建
多主多从 (主节点接受消息,从节点消费消息)
一台机器可以有多个broker但只能配置一个nameserver
2台机器配置分别为主1从2 ,主2从1
7.消息发送分类:
基本消息: 同步消息(性能要求相对低,可靠性要求高),异步消息(性能要求高,可靠性要求低),单向发送消息
顺序消息 通过同一类型消息选择同一个队列保证消息的局部顺序性(先进先出)
延时消息 设置延迟时间发送消息 消费者会等待生产者设置延迟时间后才能消费
批量消息 把message放在list里面一起发送
事务消息 三种状态 提交状态 回滚状态 中间状态 未知状态回查 创建事务生产者对象 创建事务监听器 执行事务状态, 向mqserver发送消息后不会立马执行,而是生成一个消费者不可见的半消息,然后回调发送者的事务监听器执行事务状态处理
过滤消息 tag过滤 还有sql过滤
8.消费者广播模式和负载均衡模式 (默认为负载均衡模式)
广播模式:消息都能够被几个消费者消费
负载均衡模式:消息被几个消费者平均分配消费(不会重复消费)
9.先进先出 多队列模式
保证局部数据的有序性,需要把同样数据放在同一个队列,比如根据订单号
导包
org.apache.rocketmq rocketmq-common4.3.0 org.apache.rocketmq rocketmq-client4.4.0
生产者
package org.userMgs5002; import java.util.ArrayList; import java.util.List; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; public class RocketMqTest { public static DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer; producer = new DefaultMQProducer("test-demo"); producer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876"); producer.setRetryTimesWhenSendFailed(10000); try { producer.start(); } catch (MQClientException e) { System.out.println(e); } return producer; } public static TransactionMQProducer getTransactionMQProducer(){ TransactionMQProducer producer = new TransactionMQProducer("test-demo"); producer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876"); producer.setRetryTimesWhenSendFailed(10000); //事务监听器 producer.setTransactionListener(new TransactionListener(){ //执行本地事务 //msg :消息对象 //arg :人工参数 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { if("test6".equals(msg.getTags())){ return LocalTransactionState.COMMIT_MESSAGE; }else if("test1".equals(msg.getTags())||"test2".equals(msg.getTags())||"test3".equals(msg.getTags())) { return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.UNKNOW; //未知状态调回查方法 } //消息事务状态回查方法 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { return LocalTransactionState.ROLLBACK_MESSAGE; } }); return producer; } public static void sendSysMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{ DefaultMQProducer producer = getRocketMQProducer(); Message msg = new Message(); msg.setTopic("test-demo"); msg.setTags("test1"); //msg.putUserProperty("i", String.valueOf(5)); 设置人工属性 消费者可以用sql筛选 msg.setBody("ok".getBytes()); for (int i = 0; i < 10; i++) { SendResult s = producer.send(msg ,100000); System.out.println(s.getMsgId()); System.out.println(s.getMessageQueue().getQueueId()); System.out.println(s.getSendStatus()); } producer.shutdown(); } public static void sendAysMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{ DefaultMQProducer producer = getRocketMQProducer(); Message msg = new Message(); msg.setTopic("test-demo"); msg.setTags("test2"); msg.setBody("ok".getBytes()); for (int i = 0; i < 10; i++) { Thread.sleep(10000); producer.send(msg ,new SendCallback() { @Override public void onSuccess(SendResult arg0) { System.out.println("发送结果"+arg0); } @Override public void onException(Throwable arg0) { System.out.println("发送异常"+arg0); } }); } } public static void senddMsg() throws MQClientException, RemotingException, InterruptedException{ DefaultMQProducer producer = getRocketMQProducer(); Message msg = new Message(); msg.setTopic("test-demo"); msg.setTags("test3"); msg.setBody("ok".getBytes()); for (int i = 0; i < 10; i++) { producer.sendoneway(msg);; } producer.shutdown(); } public static void sendSxMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{ DefaultMQProducer producer = getRocketMQProducer(); Message msg = new Message(); msg.setTopic("test-demo"); msg.setTags("test4"); for (int i = 10; i < 100; i++) { msg.setBody(String.valueOf(i).getBytes()); Thread.sleep(2000); //模式i取模相等为同一种消息放在同一个队列,形成局部消息有序性 SendResult s =producer.send(msg ,new MessageQueueSelector() { @Override public MessageQueue select(Listmqs, Message msg, Object arg) { int i =(int)arg; int index = i%3; //根据消息的id取模相等的设为同一种消息然后放在同一个队列 return mqs.get(index); } } ,i); System.out.println(s.getMsgId()); System.out.println(s.getMessageQueue().getQueueId()); System.out.println(s.getSendStatus()); } producer.shutdown(); } public static void sendYcMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{ DefaultMQProducer producer = getRocketMQProducer(); Message msg = new Message(); msg.setTopic("test-demo"); msg.setTags("test5"); msg.setBody("ok".getBytes()); msg.setDelayTimeLevel(5); for (int i = 0; i < 10; i++) { SendResult s = producer.send(msg ,100000); System.out.println(s.getMsgId()); System.out.println(s.getMessageQueue().getQueueId()); System.out.println(s.getSendStatus()); } producer.shutdown(); } public static void sendPlMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{ DefaultMQProducer producer = getRocketMQProducer(); List list = new ArrayList (); Message msg0 = new Message(); msg0.setTopic("test-demo"); msg0.setTags("test5"); msg0.setBody("ok".getBytes()); Message msg1 = new Message(); msg1.setTopic("test-demo"); msg1.setTags("test5"); msg1.setBody("ok".getBytes()); Message msg2 = new Message(); msg2.setTopic("test-demo"); msg2.setTags("test5"); msg2.setBody("ok".getBytes()); list.add(msg0); list.add(msg1); list.add(msg2); SendResult s = producer.send(list ,100000); System.out.println(s.getMsgId()); System.out.println(s.getMessageQueue().getQueueId()); System.out.println(s.getSendStatus()); producer.shutdown(); } public static void sendSwMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{ TransactionMQProducer producer = getTransactionMQProducer(); Message msg = new Message(); msg.setTopic("test-demo"); msg.setTags("test6"); //msg.putUserProperty("i", String.valueOf(5)); 设置人工属性 消费者可以用sql筛选 msg.setBody("ok".getBytes()); for (int i = 0; i < 3; i++) { SendResult s = producer.sendMessageInTransaction(msg ,null); System.out.println(s.getMsgId()); System.out.println(s.getMessageQueue().getQueueId()); System.out.println(s.getSendStatus()); } producer.shutdown(); } public static void main(String[] args) { try { //sendSysMes(); //sendAysMes(); //senddMsg(); sendSxMes(); //sendPlMes(); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { e.printStackTrace(); } } }
package org.userMgs5002; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; public class RocketMqConumer { public static void consumeMessage() throws MQClientException{ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-demo"); consumer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876"); //consumer.subscribe("test-demo", "test1 || test2"); consumer.subscribe("test-demo", "*"); //订阅主题topic以及tag tag为*代表全部 //consumer.setMessageModel(MessageModel.BROADCASTING); //设置消费者模式 广播或者负载均衡模式 默认为负载均衡模式 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(Listmsgs, ConsumeConcurrentlyContext context) { try { for (MessageExt message : msgs) { String msgbody = new String(message.getBody(), "utf-8"); System.out.println("消息体内容为 "+msgbody+" "+"详细信息: " + msgs); if (msgbody.equals("HelloWorld - RocketMQ")) { System.out.println("======错误======="); } } } catch (Exception e) { e.printStackTrace(); if (msgs.get(0).getReconsumeTimes() == 3) { // 该条消息可以存储到DB或者LOG日志中,或其他处理方式 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功 } else { return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试 } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } public static void consumeMessagesxx() throws MQClientException{ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-demo"); consumer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876"); consumer.subscribe("test-demo", "test4"); //订阅主题topic以及tag tag为*代表全部 //consumer.setMessageModel(MessageModel.BROADCASTING); //设置消费者模式 广播或者负载均衡模式 默认为负载均衡模式 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { try{ for (MessageExt message : msgs) { String msgbody = new String(message.getBody(), "utf-8"); System.out.println("消息体内容为 "+msgbody+" "+"详细信息: " + msgs); if (msgbody.equals("HelloWorld - RocketMQ")) { System.out.println("======错误======="); } } }catch(Exception e){ e.printStackTrace(); if (msgs.get(0).getReconsumeTimes() == 3) { // 该条消息可以存储到DB或者LOG日志中,或其他处理方式 return ConsumeOrderlyStatus.SUCCESS;// 成功 } else { return ConsumeOrderlyStatus.ROLLBACK;// 重试 } } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started."); } public static void main(String[] args) throws MQClientException { //consumeMessage(); consumeMessagesxx(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)