- Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
- Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
- NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:
-
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。
- 提供心跳检测机制,检查Broker是否还存活;
-
- 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
1.下载: Apache Downloadshttps://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip
2.解压
注意:解压文件不要放在中文目录下
3配置环境变量
ROCKETMQ_HOME="rocketmq本地路径" NAMESRV_ADDR="localhost:9876"
4.启动
4.1 启动nameserver
Cmd命令框执行进入至‘MQ文件夹bin’下,然后执行‘mqnamesrv.cmd’,启动nameserver。成功后如下,此框勿关闭。
4.2
启动broker
Cmd命令框执行进入至‘MQ文件夹bin’下,然后执行‘mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true’,启动broker。成功后如下,此框勿关闭。
使用:
1.创建maven quickstart项目
2.添加依赖
org.apache.rocketmq rocketmq-client4.9.1
3.简单发送
package com.woniuxy.cloud63.simple; import com.woniuxy.cloud63.AppConstans; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import java.io.UnsupportedEncodingException; import java.util.Scanner; public class Sender { public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException { DefaultMQProducer producer = new DefaultMQProducer("TestSender"); producer.setNamesrvAddr(AppConstans.ROCKETMQ_NAMESERVER_ADDR); producer.start(); Scanner scanner = new Scanner(System.in); System.out.println("请输入要发送的消息"); String smsContext = scanner.next(); if(smsContext.equals("exit")){ producer.shutdown(); } for(int i =0;i<10;i++){ Message msg = new Message(AppConstans.SMS_TOPIC, "user_register", (smsContext+i).getBytes("UTF-8")); SendResult sendResult = producer.send(msg); System.out.println("sendResult:"+sendResult); } } }
4.简单接收
package com.woniuxy.cloud63.simple; import com.woniuxy.cloud63.AppConstans; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.io.UnsupportedEncodingException; public class Receiver { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestReceive"); consumer.setNamesrvAddr(AppConstans.ROCKETMQ_NAMESERVER_ADDR); consumer.subscribe(AppConstans.SMS_TOPIC,"*"); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)->{ for (MessageExt msgExt:msgs){ try { System.out.println("消息内容:"+new String(msgExt.getBody(),"utf-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
备注:
package com.woniuxy.cloud63; public interface AppConstans { String ROCKETMQ_NAMESERVER_ADDR="localhost:9876"; String SMS_TOPIC="SMS"; }发送的三种模式: 发送-同步确认发送结果
同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。
- 应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要您实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。
- 应用场景:异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
- 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
发送方式
发送TPS
发送结果反馈
可靠性
同步发送
快
有
不丢失
异步发送
更快
有
不丢失
单向发送
最快
无
可能丢失
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)