RabbitMQ学习笔记

RabbitMQ学习笔记,第1张

RabbitMQ学习笔记

RabbitMQ学习笔记
  • RabbitMQ
    • 听课笔记
      • 一、优点:
      • 二、市面上常用的MQ:
      • 三、访问web管理工具(本地与远程):
      • 四、消息应答(确认收到消息或消费完消息的回复)
        • 1.自动应答
        • 2.手动应答(最佳选择)
        • 3.消息重新入列
      • 五、消息的分发机制:
        • 1.轮训分发(公平分发):
        • 2.不公平分发(建议使用):
        • 3.预取值分发(建议使用):
      • 六、生产者的发布确认:
        • 1.单次发布确认:
        • 2.批量发布确认:
        • 3.异步发布确认:
        • 4.高并发集合工具类**ConcurrentSkipListMap**和**ConcurrentHashMap**
      • 七、交换机:
        • 1.概念与作用:
        • 2.fanout(扇形广播发布)
        • 3.direct exchange(直接交换)
        • 4.Topics exchange(主题交换机,功能最强大)
          • (1)Topic 的要求(routingKey命名规则)

RabbitMQ 听课笔记 一、优点:

请求不直接传到后台,而是先经过rabbitMq队列,通过队列传输再传到服务器,因此给了rabbitMQ *** 作的空间

(1)可以有削峰的作用:当某段时间请求突然变多,超出了服务器的承载范围,但因为有rabbitmq限制传给服务器请求的数量,多余的存在消息队列中排队,可以达到削峰的目的。

(2)应用解耦:就如同下单系统,里面会调用多个其它系统的接口,如其中一个系统报错,则整个环节就出问题,当如果有rabbitmq,订单系统直接把订单信息放在队列中,队列返回一个成功信息,这时订单系统的任务已经完成了,后面有没有报错都和他没关系。至于后面的执行,就是队列不断监督后续环节的进行。

(3)异步处理:当一个接口方法执行会很久,但主程序想准确知道他的执行时间,拿到执行结果,这可以通过消息队列实现:

二、市面上常用的MQ:

(1)activeMQ:现在他们的团队不多维护。

(2)kafka:吞吐量大,但有缺点,就是不对消息消费错误处理(消息处理报错就消失)。

(3)RocketMQ:阿里巴巴出品,用Java语言实现,只支持Java和C++使用。

(4)RabbitMQ:公司常用,最主流的消息中间件之一。

三、访问web管理工具(本地与远程):

按照老师的步骤,启动web管理工具后,网页可以用localhost打开,但不能远程打开。

(tips:15672端口是访问web管理工具的,5672是用来给rabbitmq队列传消息的端口)

远程访问不了的解决方法:

1.服务器命令行上关闭防火墙或开放15672端口;

2.如果用的是阿里云服务器,要去实例服务器上配置一下安全组,开放一下端口(这是阿里云的保护机制),顺便开放一下5672的端口,下面连接rabbitMQ会用到:

四、消息应答(确认收到消息或消费完消息的回复)

保证传到消费机上的消息不会被丢失

1.自动应答

实际的意义是指:消费服务器接受到消息后返回消息消费成功的信息给rabbitMQ,让rabbitMQ对确认消费的消息进行删除,注意这里是接受到数据后直接返回消费成功的信息,如果此时这一个消息消费较长时间中服务器宕机了,rabbitMQ早早此信息删除了(因消费机返回消费成功信息),这就发生了信息丢失问题,而且还会发生消息堆积。


所以,这里的自动应答应配合良好的服务器环境,但事实上你也不能确保你的服务器不会发生问题,所以最好配置手动应答。

2.手动应答(最佳选择)

在每次消费完信息后调用消息应答,即在deliverCallback方法执行最后调用应答方法。multiple参数是指是否批量应答,同在一个channel的消息共用一次应答(可能会发生消息丢失,不建议使用,可适用于高并发但不重要的场合)


其它的应答方法:

3.消息重新入列

在消费机获取消息后因发生故障(如宕机等),未将ack消息(消费成功消息)发送到rabbitMQ中(其中检测消费机发生故障可能采用了心跳机制,检测故障与接受到成功消息的时间没有关系)。rabbitMq就会将此消息重新入列,交给其它消费机处理。

五、消息的分发机制:

1.轮训分发(公平分发):

没设置之前,默认为轮训分发,即prefetchCount默认为0

此现象针对与同一队列中有多个消费者,轮训分发指每个消费者顺序的获取消息,且只有上一个消费者获取消息后消费完返回ack消费信息(消费成功信息)后才会给下一个消费者传递下一个消息。

2.不公平分发(建议使用):

在多个效率不同的消费机的情况下,上面的公平分发不能充分利用服务器的资源,所以有不公平分发:谁做的快谁发的多;意思是队列会找空闲的消费机派发消息,那做的快点的消费机自然做的多。

3.预取值分发(建议使用):

这是不公平分发的升级版,因为消息的发送本身需要时间(rabbitmq通常装在其它服务器,消息发送通过网络),所以会有延迟时间,所以这里设置了预取值,发送多条消息缓存在消费机上,当消费机有一条消息被消费,队列接收到消费成功的消息后会向消费机发送一条消息,但消费机缓存的消息总数始终不会高于预取值。

注意:因为自己用的服务器自然知道它的性能,程序员应根据现实情况设置预取值,如果预取值设置过大,服务器效率低,就会造成预取值后面的消息长时间没有消费,会造成部分消息消费时间长,所以现实中程序员应根据服务器性能设置预取值。

六、生产者的发布确认:

为确认生产者发送的消息是否被RabbitMQ接收到,好让生产者及时做出信息重新发送的准备,因此有发布确认。

1.单次发布确认:

即每次发送一条数据,就确认一下RabbitMQ是否接收到,发送时间较长。

 
    public static void publicMessageIndividually(int num) throws Exception {
        Channel channel = RabbitMQConnect.getChannel();
        String queueName= UUID.randomUUID().toString();
        channel.queueDeclare(queueName,true,false,false,null);
        channel./confirm/iSelect();
        long begin = System.currentTimeMillis();
        for (int i=0;i 
2.批量发布确认: 

这里步骤和单次确认发布是差不多的,不同的点是我们要靠代码控制确认发布的频率。但不能保证监控所有信息是否被接收

    public static void publicMessageBench(int num,int target) throws Exception {
        Channel channel = RabbitMQConnect.getChannel();
        //开启发布确认模式
        channel./confirm/iSelect();
        String queueName=UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,true,null);
        long begin = System.currentTimeMillis();
        for (int i=0;i 
3.异步发布确认: 

其实这里实质就是一个线程只管发消息,开另一个线程监听消息成功(失败)发送的消息从而做出相应的行为,这样发信息就快很多,因为它不需要等待结果,另一个线程只管慢慢监测消息发送的结果,失败则做出相应补救。

    public static void publicMessageAsync(int num) throws Exception {
        Channel channel = RabbitMQConnect.getChannel();
        String queueName=UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        channel./confirm/iSelect();
        
        ConcurrentSkipListMap map=new ConcurrentSkipListMap();

        /confirm/iListener callBack = new /confirm/iListener() {

            
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple){
                    //将序号小于或等于当前消息的序号的消息都清空
                    ConcurrentNavigableMap confirm = map.headMap(deliveryTag,true);
                    /confirm/i.clear();
                }else {
                    //将当前的消息删除掉
                    map.remove(deliveryTag);
                    System.out.println("确认收到信息"+deliveryTag);

                }
            }
            //没有接收到确认消息的回调
            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("信息+"+deliveryTag+"未被确认");
            }
        };

        
        channel.add/confirm/iListener(callBack);
        long begin = System.currentTimeMillis();
        for (int i=0;i 
4.高并发集合工具类ConcurrentSkipListMap和ConcurrentHashMap 

上面用了高并发工具集合类ConcurrentSkipListMap,线程安全有序的一个哈希表,适用于高并发的情况,内部数据结构用了跳表。其实ConcurrentHashMap这里也适用,同样是线程安全的,同样数据结构适合高并发的场合(高数据吞吐量),但ConCurrentSkipListMap在更多线程同时访问时更有优势,线程同时访问的数量几乎ConCurrentSkipListMap没有影响。

七、交换机: 1.概念与作用:

交换机起到选择对某队列添加消息的作用:

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产

者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机**(exchange)**,交换机工作的内容非常简单,一方面它接收来

自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消

息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

注意:一个队列中的一条消息仅能被消费一次,这是永恒不变的原则

2.fanout(扇形广播发布)

此类交换机收到消息后会将此消息添加到它绑定的所有队列。它所有队列的routeKey为空。

下面是代码例子:

1.生产者:

public class Product {
    private static final String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQConnect.getChannel();
        //创建了fanout类型exchange
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //开启发布确认
        channel./confirm/iSelect();
        
        //异步发布确认监听器
        /confirm/iListener confirmListener =new /confirm/iListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息"+deliveryTag+"已成功接收");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息"+deliveryTag+"未被成功接收");
            }
        };
        
        channel.add/confirm/iListener(/confirm/iListener);
        Scanner scanner =new Scanner(System.in);
        while (scanner.hasNextLine()){
            String s = scanner.nextLine();
            channel.basicPublish(EXCHANGE_NAME,"",null,s.getBytes());
        }
    }
}

2.两个消费者代码一样:

public class Consumer1 {
    private static final String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQConnect.getChannel();
        //建立fanout exchange
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        String queue = channel.queueDeclare().getQueue();
        //fanout中的routingKey都为"",所以都可以收到
        channel.queueBind(queue,EXCHANGE_NAME,"");
        System.out.println("Consumer1开始接收消息_____________");
        DeliverCallback deliverCallback=(consumerTag,message) ->{
            String string=new String(message.getBody());
            System.out.println("成功接收到消息"+string);
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        };

        CancelCallback cancelCallback =(consumerTag)->{
            System.out.println("消息("+consumerTag+")取消了");
        };
        channel.basicConsume(queue,false,deliverCallback,cancelCallback);

    }
}
3.direct exchange(直接交换)

这里是交换机根据routingKey寻找队列添加消息,与fanout的广播式不同,它是有目的根据routingkey去找的。

注意:

1.一个队列可以有多个routingKey,即可以被多个交换机绑定

2.一个交换机可以用一个routingkey绑定多个队列,这里有点像fanout。

代码例子:

1.生产者:

public class Product {
    private static final String EXCHANGE_NAME="direct";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQConnect.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //开启确认发布
        channel./confirm/iSelect();
        //定义监听器
        /confirm/iListener confirmListener =new /confirm/iListener() {
            @Override
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息"+deliveryTag+"已成功接收");
            }

            @Override
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("消息"+deliveryTag+"未被成功接收");
            }
        };
        //添加异步发布确认监听器
        channel.add/confirm/iListener(/confirm/iListener);
        Scanner scanner =new Scanner(System.in);
        while (scanner.hasNextLine()){
            String s = scanner.nextLine();
            //给routingKey为”queue1“的队列添加信息
            channel.basicPublish(EXCHANGE_NAME,"queue1",null,s.getBytes());
        }
    }
}

2.消费者代码和上一节的消费者大致相同,除了:

//指定使用临时队列
        String queue = channel.queueDeclare().getQueue();
        //创建direct类型的exchange
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //将queue指定routingKey绑定到direct交换机上
        channel.queueBind(queue,EXCHANGE_NAME,"queue2");
4.Topics exchange(主题交换机,功能最强大)

它集合了上面两个交换机的功能,给指定routingKey寻找赋予了类似字符串的正则表达式寻找。

(1)Topic 的要求(routingKey命名规则)

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。 在这个规则列表中,其中有两个替换符是大家需要注意的

*(星号)可以代替一个单词

#(井号)可以替代零个或多个单词

上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的

quick.orange.rabbit 被队列 Q1Q2 接收到

lazy.orange.elephant 被队列 Q1Q2 接收到

quick.orange.fox 被队列 Q1 接收到

lazy.brown.fox 被队列 Q2 接收到

lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次

quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃

quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃

lazy.orange.male.rabbit 是四个单词但匹配 Q2

2.代码例子:

生产者:

channel.basicPublish(EXCHANGE_NAME,"quick.orange.rabbit ", null,message.getBytes("UTF-8"));

消费者

//声明 Q1 队列与绑定关系
 String queueName="Q1";
 channel.queueDeclare(queueName, false, false, false, null);
 channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5695972.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存