RocketMQ漫谈

RocketMQ漫谈,第1张

RocketMQ漫谈

RocketMQ初步了解,介绍安装部署实现短信服务请求的简版全流程。这里以Windows系统环境为例,主要做开发本地环境用,服务器(Linux)环境安装另述。

文章目录
  • 一、为什么需要消息队列
  • 二、什么是RocketMQ
  • 三、安装启动RocketMQ
  • 四、结合SpringBoot

一、为什么需要消息队列

消息队列实际是一种单向队列的数据结构,特点是先进先出。同样需要考虑高并发流量、单点故障等应用场景。

  • 应用解耦

    举例:订单系统用户下单,同时还需要去库存系统中做库存变更、去积分系统做积分变更等,若同步执行的话很容易因为下游系统崩溃导致整个下单流程崩溃。

    采用消息队列则可以在用户下单的时候直接返回一个下单成功的消息,让下游系统去订阅该消息即可。

  • 流量削峰

    将瞬时流量暂存起来,通过消息队列稳定进行消费。

  • 消息分发

    结合应用解耦的举例,引入消息队列之后可以让后续其他系统订阅下单发出的消息,例如目前需要发送下单成功的短信提醒,则将短信业务去订阅下单消息,后续若增加了邮箱通知,则可以让邮箱业务去订阅下单消息。

二、什么是RocketMQ

​ 是java语言开发的一款分布式消息中间件。

​ 由以下四个部分组成:

  • nameServer
  • brokerServer
  • producer
  • consumer

其中nameServer、brokerServer是RocketMQ的服务端,两者一起独立的对外提供服务。producer、consumer则是RocketMQ的客户端。

三、安装启动RocketMQ
  1. 下载所需版本的bin-release包(选择Binary)

    http://rocketmq.apache.org/dowloading/releases/

  1. 解压配置环境变量

    • ROCKETMQ_HOME(配置解压后的Rocketmq根目录)
    • JAVA_HOME(需要依赖jdk环境)
  2. 启动nameServer

    进入到bin目录下打开cmd(最好保证是管理员权限),执行以下命令

    mqnamesrv.cmd
    

  1. 启动broker

开启一个新的cmd,执行以下命令,表示在9876端口下启动broker

mqbroker.cmd -n localhost:9876

当nameServer和broker都启动完成后,rocketmq的服务端就已经可以对外提供服务了。

RocketMQ的开发人员在下载的包里已经内置了一套简单的demo消息收发测试程序,以下展示用例,如果不需要用例可以跳过:

  • 启动consumer

    同样在RocketMQ的bin目录下开启一个cmd命令窗口,首先在该窗口下设定命令行窗口级别的环境变量:

    set NAMESRV_ADDR=localhost:9876
    

    然后执行以下命令:

    tools.cmd  org.apache.rocketmq.example.quickstart.Consumer
    

  • 启动producer

    同样在RocketMQ的bin目录下开启一个cmd命令窗口,首先在该窗口下设定命令行窗口级别的环境变量:

    set NAMESRV_ADDR=localhost:9876
    

    然后执行以下命令:

    tools.cmd  org.apache.rocketmq.example.quickstart.Producer
    

    如果成功的话,就能看到一口气发送了1000条普通消息,命令窗口中将会被日志信息刷屏,表示我们启动成功了。

接下来介绍如何在SpringBoot中集成RocketMQ。


四、结合SpringBoot

首先保证本地RocketMQ的nameServer、broker启动,在RocketMQ的bin目录下执行:

mqnamesrv.cmd
mqbroker.cmd -n localhost:9876

新建一个SpringBoot项目,采用最小依赖原则,我们主要用到RocketMq和lombok的依赖。


    org.projectlombok
    lombok
    true




    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.1.1

配置文件application.yaml:

rocketmq:
  nameServer: 127.0.0.1:9876
  producer:
    group: sms-group
  consumer:
    group: sms-group

新建配置类,主要配置Name Server 地址和主题名称:

public class JmsConfig {

    
    public static final String NAME_SERVER = "127.0.0.1:9876";

    
    public static final String TOPIC_SMS_SEND = "sms-send";
}

新建短信发送实体类:

@Data
public class SmsInfoDO {

    private String phone;

    private String name;
}

新建短信消费类:

@Slf4j
@Component
@RocketMQMessageListener(topic = TOPIC_SMS_SEND, consumerGroup = "sms-group")
public class SmsSendConsumer implements RocketMQListener {

    @Override
    public void onMessage(SmsInfoDO smsInfoDO) {
        log.info("[短信发送消费者]received message:{}", smsInfoDO);
        log.info("[短信发送消费者]模拟发送,发送中......");
        log.info("[短信发送消费者]发送成功");
    }
}

新建Controller进行测试,主要有三种发送方式,如下:

@Slf4j
@RestController
@RequestMapping("/mq")
public class MqMessageController {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    
    @GetMapping("/syncGo")
    public boolean syncGo(SmsInfoDO req) {
        log.info("[短信发送生产者-同步]开始推送给mq......");
        rocketMQTemplate.syncSend(TOPIC_SMS_SEND, req);
        log.info("[短信发送生产者-同步]已推送给mq");
        return true;
    }

    
    @GetMapping("/asyncGo")
    public boolean asyncGo(SmsInfoDO req){
        rocketMQTemplate.asyncSend(TOPIC_SMS_SEND, req, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("[短信发送生产者-异步]已被mq消费!{}", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.info("[短信发送生产者-异步]发送失败!{}", throwable.getMessage());
            }
        });
        return true;
    }

    
    @GetMapping("/oneWayGo")
    public boolean oneWayGo(SmsInfoDO req){
        log.info("[短信发送生产者-one-way]开始推送给mq......");
        rocketMQTemplate.sendOneWay(TOPIC_SMS_SEND, req);
        log.info("[短信发送生产者-one-way]已推送给mq");
        return true;
    }

}

总结:

  • 当发送的消息不重要时,采用one-way方式,以提高吞吐量;
  • 当发送的消息很重要是,且对响应时间不敏感的时候采用sync方式;
  • 当发送的消息很重要,且对响应时间非常敏感的时候采用async方式;

Talk is cheap,show me the code.

https://github.com/Vainycos/my_rocketmq

Tips:

​ 需要注意以上实现只能作为示例,均未做消息幂等处理,不能排除消息被重复消费的情况。后续抽空将会在项目中实现相关幂等处理。

参考资料:

  • rocketmq学习(一) rocketmq介绍与安装
  • SpringBoot整合RocketMQ
  • SpringBoot(17)—SpringBoot整合RocketMQ
  • RocketMQ简单模拟发送短信
  • rocketmq发送消息的三种方式

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4668400.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存