RabbitMQ

RabbitMQ,第1张

RabbitMQ RabbitMQ消息中间件 RabbitMQ介绍与安装

1.RabbitMQ的介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(也称为消息的中间件)。AMQP(Advanced Message Queuing Protocol)高级消息队列协议:客户端向MQ发送的消息协议是AMQP协议。JMS(Java Message Server)一样,都是一种消息规范,相比而言可定是AMQP高级一些。
2.MQ的功能
1)异步处理:把用户的请求发送给消息中间件后,消息中间件会先进行局部响应,不需要用户等待。
2)应用解耦:模块和模块之间没有强耦合,都是通过某个中间件进行连接的
3)流量消峰:在某一段时间内,突然发生了高并发,如果某个服务器承受不住,那么添加中间件就会将当前的请求存储到队列中,一个一个处理,不会全部直接请求服务器保证了服务器的安全性。
没有添加消息中间件:

添加消息中间件

AMQP和JMS区别:
JMS是定义一套java接口,对消息处理进行了统一 *** 作,但是AMQP是通过规定协议的方式统一数据交互的。JMS必须使用java语言 *** 作,AMQP只是协议,不是实现代码,跨语言的。
JMS中只定义了两种消息模式,但是AMPQ消息模式很多,很丰富。
使用较多的消息队列有ActiveMQ、RabbitMQ、ZeroMQ、ZeroMQ、kafka、metaMQ、RocketMQ
ActiveMQ:apache公司的
Kafka:一般大数据领域应用多,吞吐量达,不用考虑数据安全性
RecketMQ:是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。支持多种客户端,安全性能好。(推荐使用RabbitMQ,RabbitMQ用的是AMQP协议)。

创建虚拟节点

RabbitMQ管理平台默认地址:http://localhost:15672
默认用户名和密码是:guest和guest,最好不要用整个用户 *** 作。
1、使用guest登陆后,选择admin,创建一个新用户xxx。
2、创建虚拟节点/hello
3、添加成功

4、设置虚拟节点的权限(给哪个用户用)

RabbitMQ五种消息模式

RabbitMQ提供了五种消息模型(其实一共7种,第六种RPC远程服务调用,第七种发布确认)以下只解释了五种,基本消息模式,工作队列模式,Fanout订阅模式,Direct订阅模式,Topic订阅模式。

编写客户端连接RabbitMQ服务器 生产者与消费者模式(一对一)

1.创建工程
2.导入RabbitMQ依赖


  com.rabbitmq
  amqp-client
  5.9.0

3.编写连接工具类

public class RabbitMQUtil {
    
    public static Connection getConnection() throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2.设置连接地址  我使用的windows 所以是localhost
        connectionFactory.setHost("localhost");
        //3.端口号 ,因为是java客户端,所以用amqp协议
        connectionFactory.setPort(5672);
        //4.设置用户名,密码 , 虚拟节点名
        connectionFactory.setVirtualHost("/hello");
        connectionFactory.setUsername("Gyz");
        connectionFactory.setPassword("123123");

        Connection connection = connectionFactory.newConnection();

        return connection;
    }
}

4.端口号解释

ProtocolBoundtoPortampq(高级消息队列协议)::5672(java客户端连接)clustering::25672(集群)http::15672(浏览器访问)

5.编写案例:Hello World

工作队列模式

1.如果先运行消息的提供方,在运行消息的消费方,谁优先启动,谁就直接获取了所有的消息
2.如果我们先把两个消费者方启动后,监听消息的提供方,提供数据的时候,那么我们两个消费方就会均匀的拿取数据,因为RabbitMQ属于轮询方式发送消息,所以RabbitMQ会自动把数据分配好
3.如果我们先把两个消费者方启动后,在提供数据,有一个消费者出现了阻塞状态,可以设置成能者多劳的模式,要出现这个模式的前提要关闭ack自动确认机制,在使用channel.basicQos(1),设置每次拿一个消息,消费一个后再去拿第二个,当出现阻塞的时候就不会出现第二种情况

能者多劳模式的运行结果

消费者1 接收的信息为: 第1条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第3条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第5条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第6条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第8条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第9条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第11条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第12条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第14条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第15条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第17条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第18条:  地瓜地瓜 我是土豆
消费者1 接收的信息为: 第20条:  地瓜地瓜 我是土豆

消费者2 接收的信息为: 第2条:  地瓜地瓜 我是土豆
消费者2 接收的信息为: 第4条:  地瓜地瓜 我是土豆
消费者2 接收的信息为: 第7条:  地瓜地瓜 我是土豆
消费者2 接收的信息为: 第10条:  地瓜地瓜 我是土豆
消费者2 接收的信息为: 第13条:  地瓜地瓜 我是土豆
消费者2 接收的信息为: 第16条:  地瓜地瓜 我是土豆
消费者2 接收的信息为: 第19条:  地瓜地瓜 我是土豆

消费者2消费的少,这样资源就合理利用,不浪费, 用到极致
发布订阅(Publish/Subscribe)
1、1个生产者,多个消费者
2、每个消费者都有自己的队列
3、生产者没有将消息直接发送到队列中,是直接发送到转换机exchange中
4、每个队列都需要绑定转换机
5、生产者发送的消息,经过交换机到达队列,实现一个消息被国歌消费者消费(Fanout分发)

exchange:交换机一方面是接收生产这发送的消息,另一方面他自己知道如何让处理消息,例如将消息交给队列,或者将某些消息丢弃。

Fanout(分发广播):将消息交给所有绑定到交换机的队列
   		//1.获取连接
        Connection conn = RabbitMQUtil.getConnection();

        //2.创建一个通道
        Channel channel = conn.createChannel();

        //3.声明一个交换机  交换机处理消息的类型是什么: fanout , Direct , Topic
        //参数一: 交换机的名字, 参数二: 处理消息的类型
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        String message = "地瓜地瓜 我是土豆";
        //参数一: 交换机的名字 ,参数二:路由标识
        channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes());
        //发布数据

        System.out.println("消息已发送: "+ message);
		//====================
		 //1.获取连接
        Connection conn = RabbitMQUtil.getConnection();

        //2.创建通道
        Channel channel = conn.createChannel();

        //3.创建一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

        //4.从通道中拿去数据
        channel.queueDeclare(QUEUE_NAME,false,false ,false,null);

        //5.将队列 绑定到 交换机上
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

        //匿名类: 从通道中 获取数据
        //4. 监听队列中是否有数据,参数1: 是队列名字 , 参数二: 是否自动进行消息的确认
        channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel){
            //重写方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str  =new String(body);
                System.out.println("接收的信息为1 : "+str);
            }
        });

总结:分发是先声明交换机,再将指定的队列绑定到交换机上

Direct(选择定向):将消息发送给指定的Routing Key的队列

功能:各取所需,按需分配,选择性的接收消息。
在订阅模式中,发布者发布消息后,订阅者可以使用选择性的获取所需要的消息,这里面需要添加一个标识,就是路由Routing Key这样就可以分开了。

p:生产者,向exchange交换机中发送消息,发送消息时带有一个Routing Key 路由
X:交换机,接受生产者发布的消息,然后把消息传递给符合Routing Key路由匹配的队列
C1:消费者1,所绑定的队列按照路由标识去消费,按照图片解释,消费orange
C2:消费者2  所绑定的队列按照路由标识去消费,按照图片解释,消费black,green

部分代码

=====provider
		//1.获取连接
        Connection conn = RabbitMQUtil.getConnection();

        //2.创建一个通道
        Channel channel = conn.createChannel();

        //3.声明一个交换机  交换机处理消息的类型是什么: fanout , Direct , Topic
        //参数一: 交换机的名字, 参数二: 处理消息的类型
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");

        String message = "橘子橘子  我是苹果";
        //参数一: 交换机的名字 ,参数二:路由标识orange
        channel.basicPublish(EXCHANGE_NAME,"orange",null,message.getBytes());
        //发布数据

        System.out.println("消息已发送: "+ message);
        ============================consumer
         //1.获取连接
        Connection conn = RabbitMQUtil.getConnection();

        //2.创建通道
        Channel channel = conn.createChannel();

        //3.创建一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");

        //4.从通道中拿去数据
        channel.queueDeclare(QUEUE_NAME,false,false ,false,null);

        //5.将队列 绑定到 交换机上
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"orange");

        //匿名类: 从通道中 获取数据
        //4. 监听队列中是否有数据,参数1: 是队列名字 , 参数二: 是否自动进行消息的确认
        channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel){
            //重写方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str  =new String(body);
                System.out.println("接收的信息为1 : "+str);
            }
        });
    }

总结:消费者只能消费自己绑定的路由键的数据,获取不到没绑定的

Topic(通配符):根据模糊查询确定发送到哪个队列


Topic类型的Exchange与Direct相比,都是根据RoutiingKey把消息路由到不同的队列中的,只不过是Topic类型Exchange交换机的路由RoutingKey可以使用通配符

#:匹配一个或多个词
*:匹配一个词

部分代码

		======provider
 		 //1.获取连接
        Connection conn = RabbitMQUtil.getConnection();

        //2.创建一个通道
        Channel channel = conn.createChannel();

        //3.声明一个交换机  交换机处理消息的类型是什么: fanout , Direct , Topic
        //参数一: 交换机的名字, 参数二: 处理消息的类型
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        String message = "绿色绿色";

        //参数一: 交换机的名字 ,参数二:路由标识orange
        channel.basicPublish(EXCHANGE_NAME,"com.oracle.pojo.haha",null,message.getBytes());
        //发布数据

        System.out.println("消息已发送: "+ message);

		===========consumer
		   //1.获取连接
        Connection conn = RabbitMQUtil.getConnection();

        //2.创建通道
        Channel channel = conn.createChannel();

        //3.创建一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //4.从通道中拿去数据
        channel.queueDeclare(QUEUE_NAME,false,false ,false,null);

        //5.将队列 绑定到 交换机上
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"com.oracle.*");

        //匿名类: 从通道中 获取数据
        //4. 监听队列中是否有数据,参数1: 是队列名字 , 参数二: 是否自动进行消息的确认
        channel.basicConsume(QUEUE_NAME,true, new DefaultConsumer(channel){
            //重写方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String str  =new String(body);
                System.out.println("接收的信息为1 : "+str);
            }
        });
死信队列

简称:DLX(dead-letter-exchange)就是单独的创建一个队列,这个队列中存储的就是过期的消息和被拒绝的消息。
消息变成死信的情况:

  • 消息被拒绝时,或者requeue=false,重新入队失败
  • 消息的TTL(Time-To-Live)过期
  • 队列达到最大长度时,信息存储不进去
    死信消息的处理过程:
  • DLX也是一个正常的Exchange交换机,和一般的交换机没有区别,它能在任何队列上绑定,实际上就是设置某个队列的属性;
  • 当前这个队列中有死信时,RabbitMQ会自动将这个消息发布到设置好的Exchange上去,就是被路由到另一个交换机上
  • 可以监听这个队列中的信息
RabbitMQ的Ack机制

消息自动确认,如果时work Queue,形式要取消自动确认,变成手动

SpringBoot整合RabbitTemplate

配置yml文件:

spring:
  application:
    name: springboot_rabbitmq  #当前项目名
  rabbitmq:
    virtual-host: /hello #虚拟节点
    host: localhost
    port: 5672
    username: xxx
    password: 123123
    listener:
      direct:
        acknowledge-mode: manual  #设置消息开启手动确认

RabbitConfig类

package com.oracle.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitmqConfig {

    workQueue模式 , 创建一个队列
    @Bean
    public Queue queueWork1(){
        return new Queue("workQueueBoot");
    }
}

发送类

@Component
public class RabbitUtil {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(){
        for(int i = 1; i <= 20 ;i++){
            rabbitTemplate.convertAndSend("workQueueBoot","SpringBoot测试RabbitMQ: "+ i);
        }
    }
}

监听类

package com.oracle.listener;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;



@Component
public class MessageReceiveListener {

    //消费者1
    @RabbitListener(queues = "workQueueBoot")
    public void receiveMessage1(String msg, Channel channel, Message message) throws IOException {
        //只包含发送的消息
        System.out.println("消费者1收到消息: " + msg);
        //channel 代表通道信息
        //message  附加的参数信息
    }

    //消费者2
    @RabbitListener(queues = "workQueueBoot")
    public void receiveMessage2(Object obj,Channel channel,Message message){
        //obj 是所有消息
        System.out.println("消费者2收到消息: "+ obj);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5665602.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存