rocketMQ

rocketMQ,第1张

rocketMQ RocketMQ 1.MQ简介

RocketMQ

MQ(Message Queue)消息队列,是一种用来保存消息数据的队列,是在消息传输传输过程中保存消息的容器,多用于分布式系统之间通信;

队列:数据结构的一种,特征为 “先进先出”

2.MQ作用

优势:

  • 应用解耦 :提供系统的容错性和可维护性
  • 异步提速 :提升用户体验和系统吞吐量
  • 削峰填谷 :提高系统稳定性

劣势:

  • 系统可用性降低 :系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
  • 系统复杂度提高 :MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
  • 一致性问题
3.MQ产品介绍
  1. ActiveMQ

    java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高

  2. RabbitMQ

    erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,

  3. RocketMQ

    java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强

  4. kafka

    scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多

4.RocketMQ介绍
  1. RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术metaQ,后捐赠 给Apache基金会 作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项 目。并且它现在已经在阿里内部被广泛 的应用,并且经受住了多次双十一的这种极致场景的压力 (2017年的双十一,RocketMQ流转的消息量达到了万亿 级,峰值TPS达到5600万)
  2. 解决所有缺点
5.安装

windows版本的安装;

1.下载地址:https://rocketmq.apache.org/release_notes/

2.系统环境变量配置:

变量名:ROCKETMQ_HOME
变量值:D:IDErocketmq-all-4.8.0-bin-release
//变量值MQ解压路径MQ文件夹名

6.启动

1.启动NAMESERVER

cmd命令框执行进入至‘MQ文件夹bin’下

start mqnamesrv.cmd

2.启动BROKER

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

注意:闪退回命令行 删除C:Users”当前系统用户名”store下的所有文件。

3.测试:

新建环境变量

变量名:NAMESRV_ADDR 变量值:localhost:9876

测试生产者发送消息 bin目录下

tools.cmd org.apache.rocketmq.example.quickstart.Producer

测试消费者接收消息 bin目录下

tools.cmd org.apache.rocketmq.example.quickstart.Consumer

单生产者多消费者

负载均衡模式和广播模式

producer.java

package com.edu.onetomany;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

public class Producer {
    public static void main(String[] args) throws Exception {
        
        //1.创建一个发送消息的对象Producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.设定发送的命名服务器地址
        producer.setNamesrvAddr("localhost:9876");
        //3.1启动发送的服务
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4.创建要发送的消息对象,指定topic,指定内容body
            Message msg = new Message("topic2", ("hello MQ"+i).getBytes("UTF-8"));
            //3.2发送消息
            SendResult result = producer.send(msg);
            System.out.println("返回结果:" + result);
            //5.关闭连接
        }

        producer.shutdown();
    }
}

consumer.java

package com.edu.onetomany;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;


public class Consumer {
    public static void main(String[] args)  throws Exception{
        //1.谁来收?  创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group3");
        //2.从哪里收消息?设定接收的命名服务器地址
        consumer.setNamesrvAddr("localhost:9876");
        //开启广播模式,此时同一个group也会收到全部消息(默认是负载均衡模式)
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //3.监听哪个消息队列?设置接收消息对应的topic,对应的sub标签为任意
        consumer.subscribe("topic2","*");
        //4.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍历消息
                for (MessageExt msg : list) {

                    System.out.println("收到消息:"+msg);
                    byte[] body = msg.getBody();

                    System.out.println(new String(body));
                }
                //枚举类的返回方式(注意)
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动接收消息的服务
        consumer.start();

        System.out.println("接受消息服务已经开启!");
        //6 不要关闭消费者(要监听)!

    }

}


注:消费者的group能影响接收的消息是怎样的方式以及多少条消息,存在默认配置,我们也可以重新设置配置

消息类别 1.同步消息

特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

代码实现:

SendResult result = producer.send(msg);
2.异步消息

特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息

//异步消息
producer.send(msg, new SendCallback() {
    //表示成功返回结果
    @Override
    public void onSuccess(SendResult sendResult) {
    	System.out.println(sendResult);
    }
    //表示发送消息失败
    @Override
    public void onException(Throwable throwable) {
    	System.out.println(throwable);
    }
});
3.单向消息

特征:不需要有回执的消息,例如日志类消息(没有返回值)

producer.sendOneway(msg);
4.延时消息

特征:消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用

Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-8"));
//设置延时等级3(并不能设置具体的秒,定义多个等级,选择等级就好)
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);

目前支持的消息时间:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
5.批量消息

特征:批量发送消息能显著提高传递消息的性能,(每次发送一条关闭会话会很占用资源以及性能)

List msgList = new ArrayList();
Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8"));
Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8"));
Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8"));
msgList.add(msg1);
msgList.add(msg2);
msgList.add(msg3);
SendResult result = producer.send(msgList);

注意限制:这些批量消息应该有相同的topic;相同的waitStoreMsgOK(消息类型一样);不能是延时消息;消息内容总长度不超过4M

消息过滤 1.分类过滤

按照tag过滤信息(默认的方式)

生产者:

Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq2").getBytes("UTF-8"));

消费者:

//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");

2.语法过滤(SQL过滤)

生产者:

//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");

消费者:

//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
consumer.subscribe("topic6", MessageSelector.bySql("name = 'lisi'"));

注意:SQL过滤需要依赖服务器的功能支持,在broker.conf配置文件中添加对应的功能项,并开启对应功能

enablePropertyFilter=true

重启broker:

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
//或者手动更新参数(多数情况下存在缓存,要手动更新)
mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue
消息的特殊处理 1.错乱的消息顺序

说明:让消息实现“队列内有序,队列外无序”,(同一个queue中是有序的)

核心代码:

发送消息

//设置消息进入到指定的消息队列中
for (final OrderStep order : orderList) {
Message msg = new Message("topic1", order.toString().getBytes());
//发送时要指定对应的消息队列选择器
SendResult result = producer.send(msg, new MessageQueueSelector() {
//设置当前消息发送时使用哪一个消息队列
public MessageQueue select(List list, Message
message, Object o) {
//根据发送的信息不同,选择不同的消息队列
//根据id来选择一个消息队列的对象,并返回->id得到int值
long orderId = order.getOrderId();
long mqIndex = orderId % list.size();
return list.get((int) mqIndex);
}
}, null);
System.out.println(result);
}

接收消息

//使用单线程的模式从消息队列中取数据,一个线程绑定一个消息队列
consumer.registerMessageListener(new MessageListenerOrderly() {
//使用MessageListenerOrderly接口后,对消息队列的处理由一个消息队列多个线程服
务,转化为一个消息队列一个线程服务
public ConsumeOrderlyStatus consumeMessage(List list,
ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt msg : list) {
System.out.println(Thread.currentThread().getName()+"。消息:"
+ new String(msg.getBody())+"。queueId:"+msg.getQueueId());
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
2.事务消息

点击查看详细

3.集群搭建

点击详情

高级特性(重点) 1.消息的存储
  1. 消息生成者发送消息到MQ
  2. MQ返回ACK给生产者
  3. MQ push 消息给对应的消费者
  4. 消息消费者返回ACK给MQ

说明:ACK(Acknowledge character)

2.消息的存储
  1. 消息生成者发送消息到MQ
  2. MQ收到消息,将消息进行持久化,存储该消息
  3. MQ返回ACK给生产者
  4. MQ push 消息给对应的消费者
  5. 消息消费者返回ACK给MQ
  6. MQ删除消息

注意:1.第⑤步MQ在指定时间内接到消息消费者返回ACK,MQ认定消息消费成功,执行⑥ 2. 第⑤步MQ在指定时间内未接到消息消费者返回ACK,MQ认定消息消费失败,重新执行④⑤⑥

3.消息的存储介质

4.高效的消息存储与读写方式



5.消息存储结构


6.刷盘机制


7.高可用性

8.主从数据复制

9.负载均衡



10.消息重试



11.死信队列

12.死信处理

在监控平台中,通过查找死信,获取死信的messageId,然后通过id对死信进行精准消费

13.消息重复消费

14.消息幂等

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5695436.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存