下载maven :wget http://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz
编译: mvn clean package -Dmaven.test.skip=true
启动NameServer
nohup sh bin/mqnamesrv -n "公网IP:9876" & #后台运行 nameserver
tail -f ~/logs/rocketmqlogs/namesrv.log #监听nameserver日志文件
启动broker
nohup sh bin/mqbroker -n 公网IP:9876 -c conf/broker.conf autoCreateTopicEnable=true &
tail -f ~/logs/rocketmqlogs/broker.log
发送消息
export NAMESRV_ADDR=localhost:9876 #设置环境变量
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭Server
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
在阿里云服务器上部署时遇到的坑,修改conf/broker.conf文件
2、Rocket角色namesrvAddr = 自己云服务器的公网IP:9876
brokerIP1 = 自己云服务器的公网IP
broker
- Brocker面向producer和consumer接收和发送消息
- 向nameserver提交自己的信息
- 是消息中间件的消息存储、收发服务器
- 每个Broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,之后定时上报
broker集群
- Broker高可用,可以配置Master / Slave结构,Master可写可读,Slave只可以读,Master将写入的数据同步给Slave
- 一个Master可以对应多个Slave,但是一个Slave只能对应一个Master
- Master与Slave的对应关系通过制定相同的BrokerName,不同的BrokerId来定义为0表示Master,非0表示Slave
- Master多机负载,可以部署多个broker
- 每个Broker与nameserver集群中的所有节点建立长链接,定时注册Topic信息到nameserver
producer
- 消息的生产者
- 通过nameserver集群中的其中一个节点(随机选择)建立长连接,获取Topic的路由信息,包括Topic下面有哪些Queue,这些Queue分布在哪些Broker上等
- 接下来向提供Topic服务的Master建立长连接,且定时向Master发送心跳
consumer
- 消息的消费者,通过NameServer集群获得Topic的路由信息,连接到对应的Broker上消费消息。注意,由于Master和Slave都可以读取消息,因此Consumer会与Master和Slave都建立连接
nameserver
- 底层有netty实现,提供了路由管理、服务注册、服务发现,是一个无状态节点
- nameserver是服务发现者,集群中各个角色(producer、broker、consumer等)都需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报的话,nameserver会把他从列表中剔除
- nameserver可以部署多个,当多个nameserver存在的时候,其他角色同时向他们上报信息,以保证高可用
- nameserver集群键互不通信,没有主备的概念
- nameserver内存式存储,nameserver中的broker、topic等信息默认不会持久化
- 为什么不用zookeeper? rockermq希望为了提高性能,CAP定理,客户端负载均衡
对比JSM中的Topic和Queue
Topic是一个逻辑上的概念,实际上Message是在每个Broker上以Queue的形式记录。
3、发送方式 同步发送
消息发送中进入同步等待状态,可以保证消息投递一定到达
异步消息想要快速发送消息,又不想丢失的时候可以使用异步消息
producer.send(message, new SendCallback() { public void onSuccess(SendResult sendResult) { System.out.println("消息发送成功"); System.out.println("sendResult" + sendResult); } public void onException(Throwable throwable) { //如果发生异常 case 异常,尝试重投 或者调整业务逻辑 throwable.printStackTrace(); System.out.println("发送异常"); } });单向消息
只发送消息,不等待服务器响应,只发送请求不等待应答,此方式发送消息的过程耗时非常短,一般在微秒级别 peoducer.sendoneway(message);
延迟消息 4、消息 消息消费模式消息消费模式由消费者决定,可以由消费者设置MessageModel来决定消息模式
消息模式默认为集群消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setMessageModel(MessageModel.CLUSTERING);
集群消息
集群消息是指集群化部署消费者
当使用集群消费模式时,MQ认为任意一条消息只需要被集群内的任意一个消费者处理即可
特点
- 每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
- 在消息重投时不能保证路由到同一台机器上
- 消费状态由broker维护
当使用广播消费模式时,MQ会将每条消息推送给集群内所有注册过的客户端,保证至少被每台机器消费一次
特点
- 消费进度由consumer维护
- 保证每个消费者消费一次消息
- 消费失败的消息不会重投
一个group中消费者的tag selector 都不能随便变,要保持统一
TAG在Producer中使用Tag: Message msg = new Message("Topic001","TAG-A","KEY-A","tag".getBytes());
在Consumer中订阅Tag: consumer.subscribe("Topic001", "TAG-A");
SQL表达式过滤实例:MessageSelector selector = MessageSelector.bySql("order > 5");
consumer.subscribe("xxxxxx",selector);
语法:
RocketMQ只定义了一些基本的语法来支持这个功能,也可以扩展
1. 数字比较 > , >= , < , <= , between , =
2.字符比较 = ,<> , IN
3. is null 或者 is not null
4.逻辑运算 and , or , not
常量类型是:数字,字符串('abc',必须使用单引号), NULL , TRUE , FALSE
5、消息事务
事务执行流程
RocketMQ实现方式Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
检查事务状态:Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向发送者确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调
超时:如果超过回查次数,默认回滚消息
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)