RabbitMQ消息中间件学习笔记

RabbitMQ消息中间件学习笔记,第1张

RabbitMQ消息中间件学习笔记
1.引言 线程池的设置线程最好根据cpu核心数,最大就cpu核心数的两倍 什么是RabbitMQ?

RabbitMQ是基于amqp协议,实现的一种MQ理念的服务。类似的服务 RocketMQ、ActiveMQ、Kafka等

为什么在分布式项目中需要一款消息中间件?

消息中间件能够实现一些Feign(同步调用)无法实现的效果:

1、服务的异步调用

2、消息的广播(事件总线)

3、消息的延迟处理

4、分布式事务

5、请求削峰(处理高并发)

二、RabbitMQ的Docker安装

1)拉取镜像

docker pull rabbitmq:3.8.5-management

2)准备docker-compose模板

rabbitmq:
    image: rabbitmq:3.8.5-management
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    restart: always

3)启动rabbitmq容器

docker-compose up -d rabbitmq

 4)访问rabbitmq的管理页面(账号:guest 密码:guest)

三、RabbitMQ常用模型

1)模型一

P -> Provider(提供者)

红色方块 -> 队列(存储消息)

C -> Consumer(消费者)

2)模型二

一个提供者对应多个消费者,消息会轮训发送给两个消费者

起到一个消费端负载均衡的目的,减轻消费端的消费压力

3)模型三

发布/订阅模式 - 消息广播

多个消费者会同时收到提供者发布的消息

X -> Exchange(交换机,消息的复制转发,不能存储消息)

4)模型四

路由键 -> 交换机和队列绑定若干路由键,发布的消息可以指定路由键发送

 5)模型五

通配符的路由键

 6)模型六

Rabbitmq的同步调用模型

四、JavaAPI调用RabbitMQ

添加相关依赖


    com.rabbitmq
    amqp-client
    5.9.0

封装到工具类中

public class ConnectionUtils {

    //创建连接
    public static ConnectionFactory connectionFactory = new ConnectionFactory();

    static {

        connectionFactory.setHost("IP地址");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");        
        //默认的虚拟主机
        connectionFactory.setVirtualHost("/");
    }

    public static Connection getConnection(){
        
        //获取连接
        try {
            Connection connection = connectionFactory.newConnection();
            return connection;
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }
}

 服务的提供者

public class Provider {

    public static void main(String[] args) throws IOException {

        //1.获取连接
        Connection connection = ConnectionUtils.getConnection();

        //2、通过连接获取通道 (后续所有的 *** 作都是基于Channel)
        Channel channel = connection.createChannel();

        //3.创建一个队列
        channel.queueDeclare("myqueue",false,false,false,null);

        //4.给队列发送消息
        for (int i = 0;i<10;i++){
            String msg = "hello RabbitMQ";
            channel.basicPublish(" ","myqueue",null,msg.getBytes("utf-8"));
        }

        //5.关闭连接
        connection.close();
    }
}

 服务的消费者

public class Consumer {

    public static void main(String[] args) throws IOException {
        //1.获取连接
        Connection connection = ConnectionUtils.getConnection();
        //2、通过连接获取通道
        Channel channel = connection.createChannel();
        //3.监听队列
        channel.basicConsume("myqueue",true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者接收到的消息:"+new String(body,"utf-8"));

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

模型二

一个提供者对应多个消费者,消息会轮训发送给两个消费者

起到一个消费端负载均衡的目的,减轻消费端的消费压力

模拟多个消费者

 

 模型三

发布/订阅模式 - 消息广播

多个消费者会同时收到提供者发布的消息

X -> Exchange(交换机,消息的复制转发,不能存储消息)

public class Provider {

    public static void main(String[] args) throws IOException {

        //1.获取连接
        Connection connection = ConnectionUtils.getConnection();

        //2、通过连接获取通道 (后续所有的 *** 作都是基于Channel)
        Channel channel = connection.createChannel();

        //3.创建交换机
        channel.exchangeDeclare("myexchange","fanout");


        //4.给队列发送消息
        for (int i = 0;i<10;i++){
            String msg = "hello RabbitMQ---"+i;
            channel.basicPublish("myexchange","",null,msg.getBytes("utf-8"));
        }

        //5.关闭连接
        connection.close();
    }
}
public class Consumer {

    public static void main(String[] args) throws IOException {

        //1.获取连接
        Connection connection = ConnectionUtils.getConnection();

        //2、通过连接获取通道
        Channel channel = connection.createChannel();

        //3.创建队列
        channel.queueDeclare("myqueue",false,false,false,null);

        //不限制启动顺序
        channel.exchangeDeclare("myexchange","fanout");

        //4.将交换机与队列进行绑定,参数1是队列,参数2是交换机,参数3是表示绑定的路由键(fanout类型的交换机无效)
        channel.queueBind("myqueue","myexchange","");

        //5.监听队列
        channel.basicConsume("myqueue",true,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2接收到的消息:"+new String(body,"utf-8"));

                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

 模型四

路由键 -> 交换机和队列绑定若干路由键,发布的消息可以指定路由键发送

五、常用方法的参数
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, 				boolean autoDelete,Map arguments) throws IOException;

参数一:queue 队列的名称

参数二:durable 是否持久化,默认为false,非持久化

参数三:exclusive 是否为排他队列,排他队列只有创建这个队列的连接可以 *** 作,其他连接不能 *** 作该队列

参数四:autoDelete 是否自动删除,默认为false,如果为true,那么当所有监听这个队列的消费端断开监听后,队列会自动删除

参数五:arguments 用来设置一个额外的参数

Exchange.DeclareOk exchangeDeclare(String exchange,
                                          String type,
                                          boolean durable,
                                          boolean autoDelete,
                                          boolean internal,
                                          Map arguments) throws 																		IOException;

参数一:exchange 交换机的名称

参数二:type 交换机的类型(fanout|direct|topic|header)

参数三:durable 持久化,默认false

参数四:autoDelete 是否自动删除,默认为false,如果未true,所以绑定到交换机的队列,解绑后,交换机就会自动删除

参数五:internal 是否为内置交换机,默认为false,如果为true表示,当前交换机只能绑定其他交换机,提供者不能直接发消息给该交换机

参数六:arguments 用来设置一个额外的参数

六、TTL - 过期时间

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5575232.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存