消息队列-RabbitMQ

消息队列-RabbitMQ,第1张

消息队列-RabbitMQ 一、Docker安装Rabbitmq 1.1Docker环境搭建

1、下载docker离线安装包
https://download.docker.com/linux/static/stable/x86_64/docker-20.10.6.tgz
选择版本
https://download.docker.com/linux/static/stable/
2、离线安装工具
https://github.com/Jrohy/docker-install/
3、将下载的安装文件放在一个目录中
4、把目录上传虚拟机上
5、修改docker-install执行权限
6、执行安装

# 进入 docker-install 文件夹
cd docker-install
# 为 docker-install 添加执行权限
chmod +x install.sh
# 安装
./install.sh -f docker-20.10.6.tgz

7、设置镜像加速
修改配置文件 /etc/docker/daemon.json

cat < /etc/docker/daemon.json
{
  "registry-mirrors": [
    "https://docker.mirrors.ustc.edu.cn",
    "http://hub-mirror.c.163.com"
  ],
  "max-concurrent-downloads": 10,
  "log-driver": "json-file",
  "log-level": "warn",
  "log-opts": {
    "max-size": "10m",
    "max-file": "3"
    },
  "data-root": "/var/lib/docker"
}
EOF

然后dockers就安装好了,可以愉快的使用docker搞事情了

1.2 Rabbitmq 搭建

1、配置管理员用户名密码

mkdir /etc/rabbitmq
vim /etc/rabbitmq/rabbitmq.conf
# 添加两行配置:
default_user = admin
default_pass = admin

2、启动Rabbitmq

docker run -d --name rabbit 
-p 5672:5672 
-p 15672:15672 
-v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf 
-e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf 
--restart=always 
rabbitmq:management

访问管理控制台 http://192.168.64.140:15672

主要端口介绍
4369 – erlang发现口
5672 – client端通信口
15672 – 管理界面ui端口
25672 – server间内部通信口

二、RabbitMq使用场景 2.1 服务解耦

背景:传统的架构,上游的A服务调用下游的服务,业务之间的耦合性过于紧密。代码维护困难。
解决方法:A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可

2.2 流量削峰

背景:当特殊时间突发的访问压力,后台服务器可能顶不住访问压力而崩溃。
解决方案:使用RabbitMQ来进行流量削峰,高峰情况下,瞬间出现的大量请求数据,先发送到消息队列服务器,排队等待被处理,而我们的应用,可以慢慢的从消息队列接收请求数据进行处理,这样把数据处理时间拉长,以减轻瞬时压力

2.3 异步调用

同步调用存在的问题:系统响应时间长,每个服务执行完成,调用链才算完成。但是由于网络问题,整个执行链都无法完成。
异步调用优势:消息生产者把生产的消息放入消息队列,消费者订阅消息队列,消费消息。系统响应时间短,

三、RabbitMq六种工资模式 3.1 简单模式 3.1.1 消息生产者

1、创建连接工程
2、配置连接参数
3、创建队列
4、创建通讯通道
5、向服务器发送消息

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、连接
        ConnectionFactory f = new ConnectionFactory();
        //5672
        f.setHost("192.168.64.128");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        //创建连接
        //2、在服务器上创建队列
        Connection connection = f.newConnection();
        //创建通讯通道
        Channel c = connection.createChannel();
        
        c.queueDeclare("hello-rabbitmq",false,false,false,null);
        //3、向队列发送消息
         
        c.basicPublish("","hello-rabbitmq",null,"helloworld".getBytes());
    }
}
3.2.1 消息消费者
public class ConSumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、连接
        ConnectionFactory f = new ConnectionFactory();
        //5672
        f.setHost("192.168.64.128");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        //创建连接
        //2、在服务器上创建队列
        Connection connection = f.newConnection();
        //创建通讯通道
        //2、创建队列(谁先启动谁创建队列)
        Channel c = connection.createChannel();
        c.queueDeclare("hello-rabbitmq",false,false,false,null);
        //3、从队列 接收消息,把消息传递给回调对象处理
        //创建回调对象
        DeliverCallback deliverCallback = (consumerTag,message) -> {
            byte[] body = message.getBody();
            System.out.println(new String(body));
        };
        CancelCallback cancelCallback = consumerTag -> {};
        
        c.basicConsume("hello-rabbitmq",true,deliverCallback,cancelCallback);
3.2 工作模式

工作模式:
一个生产者,多个消费者,每个消费者获取到的消息唯一。
1、 自动模式
消费者从消息队列获取消息后,服务端就认为该消息已经成功消费。
2、 手动模式
消费者从消息队列获取消息后,服务端并没有标记为成功消费
消费者成功消费后需要将状态返回到服务端

3.2.1 消息生产者
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、连接
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.128");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        //创建连接
        //2、在服务器上创建队列
        Connection connection = f.newConnection();
        //创建通讯通道
        Channel c = connection.createChannel();
        c.queueDeclare("hello-rabbitmq",false,false,false,null);
        //3、循环输入消息
        while (true){
            System.out.print("请输入消息");
            String msg = new Scanner(System.in).nextLine();
            c.basicPublish("","hello-rabbitmq", MessageProperties.PERSISTENT_BASIC,msg.getBytes());
        }
    }
}

3.2.2 消息消费者
public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory f = new ConnectionFactory();
        f.setHost("192.168.64.128");
        f.setPort(5672);
        f.setUsername("admin");
        f.setPassword("admin");
        //创建连接
        //2、在服务器上创建队列
        Connection connection = f.newConnection();
        //创建通讯通道
        //2、创建队列(谁先启动谁创建队列)
        Channel c = connection.createChannel();
        //1、回调对象
        //2、接收消息
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            System.out.println("收到消息:" + msg);
            for (int i = 0; i < msg.length(); i++) {
                if (msg.charAt(i) == '.'){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            
            c.basicAck(message.getEnvelope().getDeliveryTag(),false);
            System.out.println("处理完成");
        };
        CancelCallback cancelCallback = consumerTag -> {
        };
        
        c.basicQos(1);
        
        c.basicConsume("hello-rabbitmq", false, deliverCallback, cancelCallback);
    }
}
3.3 发布订阅模式

3.4 路由模式


再消息上携带一个关键词,再

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5574222.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存