rocketmq的安装及使用

rocketmq的安装及使用,第1张

rocketmq的安装及使用 概念模型:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:
    • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。
    • 提供心跳检测机制,检查Broker是否还存活;
    • 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
安装:

1.下载: Apache Downloadshttps://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip

 2.解压

注意:解压文件不要放在中文目录下

3配置环境变量

ROCKETMQ_HOME="rocketmq本地路径"
NAMESRV_ADDR="localhost:9876"

4.启动

4.1 启动nameserver

        Cmd命令框执行进入至‘MQ文件夹bin’下,然后执行‘mqnamesrv.cmd’,启动nameserver。成功后如下,此框勿关闭。

 

4.2

启动broker

        Cmd命令框执行进入至‘MQ文件夹bin’下,然后执行‘mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true’,启动broker。成功后如下,此框勿关闭。

 

 使用:

1.创建maven quickstart项目

2.添加依赖


    org.apache.rocketmq
    rocketmq-client
    4.9.1

3.简单发送

package com.woniuxy.cloud63.simple;


import com.woniuxy.cloud63.AppConstans;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.Scanner;

public class Sender {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer producer = new DefaultMQProducer("TestSender");
       producer.setNamesrvAddr(AppConstans.ROCKETMQ_NAMESERVER_ADDR);
       producer.start();
        Scanner scanner = new Scanner(System.in);
            System.out.println("请输入要发送的消息");
            String smsContext = scanner.next();
            if(smsContext.equals("exit")){
                producer.shutdown();
            }
            for(int i =0;i<10;i++){
                Message msg = new Message(AppConstans.SMS_TOPIC, "user_register", (smsContext+i).getBytes("UTF-8"));
                SendResult sendResult = producer.send(msg);
                System.out.println("sendResult:"+sendResult);
            }
            
    }

}

4.简单接收

package com.woniuxy.cloud63.simple;


import com.woniuxy.cloud63.AppConstans;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;

public class Receiver {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TestReceive");

        consumer.setNamesrvAddr(AppConstans.ROCKETMQ_NAMESERVER_ADDR);
        consumer.subscribe(AppConstans.SMS_TOPIC,"*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)->{
            for (MessageExt msgExt:msgs){
                try {
                    System.out.println("消息内容:"+new String(msgExt.getBody(),"utf-8"));

                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

备注:

package com.woniuxy.cloud63;

public interface AppConstans {
    String ROCKETMQ_NAMESERVER_ADDR="localhost:9876";
    String SMS_TOPIC="SMS";
}
发送的三种模式: 发送-同步确认发送结果

同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。

  • 应用场景:此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

发送-异步确认发送结果

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要您实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。

  • 应用场景:异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

发送-结束 oneway

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

  • 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

发送方式

发送TPS

发送结果反馈

可靠性

同步发送

不丢失

异步发送

更快

不丢失

单向发送

最快

可能丢失

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5653802.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存