消息中间件(异步消息传递)——RabbitMQ

消息中间件(异步消息传递)——RabbitMQ,第1张

消息中间件(异步消息传递)——RabbitMQ RabbitMQ

主要内容

  1. AMQP 简介
  2. RabbitMQ 简介
  3. RabbitMQ 原理
  4. Erlang 安装
  5. 安装 RabbitMQ
  6. RabbitMQ 账户管理
  7. 交换器
一、AMQP简介 1 AMQP是什么?

AMQP(Advanced Message Queuing Protocol),高级消息队列协议)是进程之间传递 异步消息的网络协议。

2 AMQP 工作过程

发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后 AMQP 代理会将消息投递给订阅了此 队列的消费者,或者消费者按照需求自行获取。

3 队列

队列是数据结构中概念。数据存储在一个队列中,数据是有顺序的,先进的先出,后进 后出。其中一侧负责进数据,另一次负责出数据。 MQ(消息队列)很多功能都是基于此队列结构实现的

二、 RabbitMQ 简介 1 RabbitMQ

介绍 RabbitMQ 是由 Erlang 语言编写的基于 AMQP 的消息中间件。而消息中间件作为分 布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。

1.1 解决应用耦合 1.1.1 不使用 MQ 时

1.1.2 使用 MQ 解决耦合

2 RabbitMQ 适用场景

排队算法、秒杀活动、消息分发、异步处理、数据同步、处理耗时任务、流量销峰等。

三、 RabbitMQ 原理


1.Message 消息。
消息是不具名的,它由消息头消息体组成。消息体是不透明的,而消息头则由 一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的 优先权)、delivery-mode(指出消息可能持久性存储)等。

2.Publisher

消息的生产者。也是一个向交换器发布消息的客户端应用程序。
3.Consumer

消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。

4.Exchange
交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 三种常用的交换器类型

  1. direct(发布与订阅 完全匹配)
  2. fanout(广播)
  3. topic(主题,规则匹配)

5.Binding
绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息 队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

6.Queue
消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一 个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取 走。

7.Routing-key
路由键。RabbitMQ 决定消息该投递到哪个队列的规则。(也可以理解为队列的名称, 路由键是 key,队列是 value)
队列通过路由键绑定到交换器。
消息发送到 MQ 服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ 也会将 其和绑定使用的路由键进行匹配。

如果相匹配,消息将会投递到该队列。

如果不匹配,消息将会进入黑洞。

8.Connection
链接。指 rabbit 服务器和服务建立的 TCP 链接。

9.Channel
信道。
1,Channel 中文叫做信道,是 TCP 里面的虚拟链接。例如:电缆相当于 TCP,信道是一个独立光纤束,一条 TCP 连接上创建多条信道是没有问题的。
2,TCP 一旦打开,就会创建 AMQP 信道。
3,无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。
10.Virtual Host
虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证 和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器, 拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时 指定,RabbitMQ 默认的 vhost 是/

11.Borker
表示消息队列服务器实体。
交换器和队列的关系
交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的 路由键匹配,那么消息就会被路由到该绑定的队列中。
也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由 键匹配分发消息到具体的队列中。
路由键可以理解为匹配的规则。

RabbitMQ 为什么需要信道?为什么不是 TCP 直接通信?

  1. TCP 的创建和销毁开销特别大。创建需要 3 次握手,销毁需要 4 次分手。
  2. 如果不用信道,那应用程序就会以 TCP 链接 Rabbit,高峰时每秒成千上万条链接 会造成资源巨大的浪费,而且 *** 作系统每秒处理 TCP 链接数也是有限制的,必定造成性能 瓶颈。
  3. 信道的原理是一条线程一条通道,多条线程多条通道同用一条 TCP 链接。一条 TCP 链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。
四、Erlang安装

RabbitMQ 是使用 Erlang 语言编写的,所以需要先配置 Erlang

1 修改主机名

RabbitMQ 是通过主机名进行访问的,必须指定能访问的主机名。
# vim /etc/sysconfig/network


# vim /etc/hosts
新添加了一行,前面为服务器 ip,空格后面添加计算机主机名

2 安装依赖

# yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel unixODBC unixODBC-devel

3 上传并解压

上传 otp_src_22.0.tar.gz 到/usr/local/tmp 目录中,进入目录并解压。
解压时注意,此压缩包不具有 gzip 属性,解压参数没有 z,只有 xf
# cd /usr/local/tmp
# tar xf otp_src_22.0.tar.gz


4 配置参数

先新建/usr/local/erlang 文件夹,作为安装文件夹
#mkdir -p /usr/local/erlang
进入文件夹
# cd otp_src_22.0
配置参数
# ./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac

5 编译并安装

编译
# make
安装
# make install

6 修改环境变量

希望能够在任意位置执行erl命令

修改/etc/profile 文件
#vim /etc/profile
在文件中添加下面代码
export PATH=$PATH:/usr/local/erlang/bin

运行文件,让修改内容生效
# source /etc/profile

7 查看配置是否成功

# erl -version

五、 安装 RabbitMQ 1 上传并解压

上传 rabbitmq-server-generic-unix-3.7.17.tar.xz 到/usr/loca/tmp 中
# cd /usr/local/tmp
# tar xf rabbitmq-server-generic-unix-3.7.17.tar.xz

2 复制到 local 下

复制解压文件到/usr/local 下,命名为 rabbitmq
# cp -r rabbitmq_server-3.7.17 /usr/local/rabbitmq

3 配置环境变量

# vim /etc/profile
在文件中添加
export PATH=$PATH:/usr/local/rabbitmq/sbin
解析文件
# source /etc/profile

4 开启 web 管理插件

进入 rabbitmq/sbin 目录
# cd /usr/local/rabbitmq/sbin
查看插件列表
# ./rabbitmq-plugins list
生效管理插件
# ./rabbitmq-plugins enable rabbitmq_management

5 后台运行

启动 rabbitmq。
#./rabbitmq-server -detached
停止命令,如果无法停止,使用 kill -9 进程号进行关闭
./rabbitmqctl stop_app
启动成功信息如下:

5.1 启动错误解决

如果启动RabbitMQ 发生下述错误,可以提供环境配置文件,解决。环境配置文件命 名为: rabbitmq-env.conf。所在位置是: $rabbitmq_home/etc/rabbitmq/目录。内如 是: HOSTNAME=主机名称

解决:

6 查看 web 管理界面

默认可以在安装 rabbitmq 的电脑上通过用户名:guest 密码 guest 进行访问 web 管 理界面
端口号:15672(放行端口,或关闭防火墙)
在虚拟机浏览器中输入:http://localhost:15672

六、 RabbitMq 账户管理 1 创建账户

语法:./rabbitmqctl add_user username password
# cd /usr/local/rabbitmq/sbin
# ./rabbitmqctl add_user admin admin123

2 给用户授予管理员角色

其中 smallming 为新建用户的用户名 ,多个角色可以用空格分隔
# ./rabbitmqctl set_user_tags admin administrator

3 给用户授权

“/”表示 RabbitMQ 根虚拟主机
admin表示用户名
".*" ".*" ".*"表示完整权限
# ./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

4 登录

使用新建账户和密码在 windows 中访问 rabbitmq 并登录
在浏览器地址栏输入:http://ip:15672/
用户名:admin
密码:admin123

七、 Exchange 交换器(交换机)

交换器负责接收客户端传递过来的消息,并转发到对应的队列中。在 RabbitMQ 中支 持四种交换器

  1. Direct Exchange:直连交换器(默认)
  2. Fanout Exchange:扇形交换器
  3. Topic Exchange:主题交换器
  4. Header Exchange:首部交换器。

在 RabbitMq 的 Web 管理界面中 Exchanges 选项卡就可以看见这四个交换器。

1 direct 交换器


direct 交换器是RabbitMQ默认交换器。默认会进行公平调度。所有接受者依次从消 息队列中获取值。Publisher 给哪个队列发消息,就一定是给哪个队列发送消息。对交换器 绑定的其他队列没有任何影响。
(代码演示)一个队列需要绑定多个消费者
需要使用注解/API:
org.springframework.amqp.core.Queue:队列

AmqpTemplate: *** 作 RabbitMQ 的接口。负责发送或接收消息

@RabbitListener(queues = "") 注解某个方法为接收消息方法

1.1 代码实现 1.1.1 新建项目 Publisher 1.1.1.1添加依赖


    4.0.0

    com.bjsxt
    amqp_rabbit
    pom
    1.0-SNAPSHOT
    
    
        
            
                org.springframework.boot
                spring-boot-dependencies
                2.2.5.RELEASE
                import
                pom
            
        
    


    
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
    


创建Consumer

RabbitMQ_direct交换器_消息消费者处理逻辑代码开发_RabbitListener注解描述类型

package com.bjsxt.rabbit.consumer;

import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;


@RabbitListener(bindings = {
    @QueueBinding(
            value = @Queue(value = "log-info-queue", autoDelete = "false"),
            exchange = @Exchange(value = "log-ex-direct", type = "direct", autoDelete = "false"),
            key = "direct-rk-info"
    )
})
@Component
public class InfoLogConsumer {
    
    @RabbitHandler
    public void onMessage(String msg){
        System.out.println("InfoLogConsumer 消费消息:" + msg);
    }
}

RabbitMQ_direct交换器_消息消费者配置及启动

创建启动类

package com.bjsxt;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitConsumerApp {
    public static void main(String[] args) {
        SpringApplication.run(RabbitConsumerApp.class, args);
    }
}
1.1.1.2编写配置文件

新建 application.yml.
host:默认值 localhost
username 默认值:guest
password 默认值:guest

spring:
  rabbitmq:
    host: 192.168.89.141  # RabbitMQ服务的地址,默认localhost
    port: 5672  # RabbitMQ的端口,默认5672。
    username: bjsxt # 访问RabbitMQ的用户名,默认guest
    password: bjsxt # 访问RabbitMQ的密码,默认guest
    virtual-host: /  # 访问RabbitMQ的哪一个虚拟主机,默认为 /

启动测试






RabbitMQ_direct交换器_消息消费者处理逻辑代码开发_RabbitListener注解描述方法

创建LogConsumers

package com.bjsxt.rabbit.consumer;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class LogConsumers {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "log-error-queue"),
                    exchange = @Exchange(value = "log-ex-direct"),
                    key = "direct-rk-error"
            )
    })
    public void onLogErrorMessage(String msg){
        System.out.println("错误日志信息:" + msg);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "log-warn-queue", autoDelete = "false"),
                    exchange = @Exchange(value = "log-ex-direct"),
                    key = "direct-rk-warn"
            )
    })
    public void onLogWarnMessage(String msg){
        System.out.println("警告日志信息:" + msg);
    }
}

启动测试





RabbitMQ_direct交换器_消息发送者代码开发

创建publisher

package com.bjsxt.rabbit.sender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class LogMessageSender {
    @Autowired
    private AmqpTemplate template;

    
    public void sendMessage(String exchange, String routingKey, String message){
        this.template.convertAndSend(exchange, routingKey, message);
    }
}

创建启动类

package com.bjsxt;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitPublisherApp {
    public static void main(String[] args) {
        SpringApplication.run(RabbitPublisherApp.class, args);
    }
}

创建测试代码

package com.bjsxt.test;

import com.bjsxt.RabbitPublisherApp;
import com.bjsxt.entity.User;
import com.bjsxt.rabbit.fanoutsender.UserMessageSender;
import com.bjsxt.rabbit.sender.LogMessageSender;
import com.bjsxt.rabbit.topicsender.TopicMessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;


@SpringBootTest(classes = {RabbitPublisherApp.class})
@RunWith(SpringRunner.class)
public class TestPublisher {
    @Autowired
    private LogMessageSender sender;
    @Autowired
    private UserMessageSender userMessageSender;
    @Autowired
    private TopicMessageSender topicMessageSender;

    private String exchange = "log-ex-direct";
    private String rkInfo = "direct-rk-info";
    private String rkError = "direct-rk-error";
    private String rkWarn = "direct-rk-warn";


    @Test
    public void testSend(){
        Random r = new Random();
        // 发送10条消息。
        for(int i = 0 ; i < 10; i++){
            // rInt%3 - 0:投递消息到info;1:投递消息到error;2:投递消息到warn
            int rInt = r.nextInt(100);
            if(rInt%3 == 0){
                this.sender.sendMessage(exchange, rkInfo, "发送Info日志消息 - index="+i+";rInt="+rInt);
            }else if(rInt%3 == 1){
                this.sender.sendMessage(exchange, rkError, "发送error日志消息 - index="+i+";rInt="+rInt);
            }else{
                this.sender.sendMessage(exchange, rkWarn, "发送warn日志消息 - index="+i+";rInt="+rInt);
            }
        }
    }
}

编写配置文件

spring:
  rabbitmq:
    host: 192.168.89.141
    username: bjsxt
    password: bjsxt

启动测试
先启动Consumer
运行testSend


RabbitMQ_direct交换器_消息消费者集群搭建方式

相同代码启动多次,自动搭建Consumer集群

package com.bjsxt;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitConsumerApp1 {
    public static void main(String[] args) {
        SpringApplication.run(RabbitConsumerApp1.class, args);
    }
}



创建测试代码

package com.bjsxt.test;

import com.bjsxt.RabbitPublisherApp;
import com.bjsxt.entity.User;
import com.bjsxt.rabbit.fanoutsender.UserMessageSender;
import com.bjsxt.rabbit.sender.LogMessageSender;
import com.bjsxt.rabbit.topicsender.TopicMessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;


@SpringBootTest(classes = {RabbitPublisherApp.class})
@RunWith(SpringRunner.class)
public class TestPublisher {
    @Autowired
    private LogMessageSender sender;
    @Autowired
    private UserMessageSender userMessageSender;
    @Autowired
    private TopicMessageSender topicMessageSender;

    private String exchange = "log-ex-direct";
    private String rkInfo = "direct-rk-info";
    private String rkError = "direct-rk-error";
    private String rkWarn = "direct-rk-warn";

    @Test
    public void testSend2Consumers(){
        for(int i = 0; i < 10; i++){
            this.sender.sendMessage(exchange, rkInfo, "info消息"+i);
            this.sender.sendMessage(exchange, rkError, "error消息"+i);
            this.sender.sendMessage(exchange, rkWarn, "warn消息"+i);
        }
    }

    @Test
    public void testSend(){
        Random r = new Random();
        // 发送10条消息。
        for(int i = 0 ; i < 10; i++){
            // rInt%3 - 0:投递消息到info;1:投递消息到error;2:投递消息到warn
            int rInt = r.nextInt(100);
            if(rInt%3 == 0){
                this.sender.sendMessage(exchange, rkInfo, "发送Info日志消息 - index="+i+";rInt="+rInt);
            }else if(rInt%3 == 1){
                this.sender.sendMessage(exchange, rkError, "发送error日志消息 - index="+i+";rInt="+rInt);
            }else{
                this.sender.sendMessage(exchange, rkWarn, "发送warn日志消息 - index="+i+";rInt="+rInt);
            }
        }
    }
}

2 fanout 交换器


扇形交换器,实际上做的事情就是广播,fanout 会把消息发送给所有的绑定在当前交 换器上的队列。对应 Consumer 依然采用公平调度方式。
(代码演示)一个交换器需要绑定多个队列
需要使用注解/API:

  • FanoutExchange:fanout 交换器
  • Binding:绑定交换器和队列
  • BindingBuilder:Binding 的构建器
  • amq.fanout:内置 fanout 交换器名称
2.1 RabbitMQ_fanout交换器_消息消费者代码开发 2.1.1 代码


创建pojo

package com.bjsxt.entity;

import java.io.Serializable;
import java.util.Objects;

// 实体类型
public class User implements Serializable {
    // 定义一个序列化唯一ID。
    public static final long serialVersionUID = 1L;
    private Long id;
    private String name;
    private int age;

    public User(){}

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + ''' +
                ", age=" + age +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        User user = (User) o;
        return age == user.age &&
                Objects.equals(id, user.id) &&
                Objects.equals(name, user.name);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, name, age);
    }

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

创建 FanoutConsumers

package com.bjsxt.rabbit.fanout;

import com.bjsxt.entity.User;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class FanoutConsumers {
    
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-user-1", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-fanout", type = "fanout", autoDelete = "false")
            )
    })
    public void onMessage1(User user){
        System.out.println("onMessage1 run : " + user);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-user-2", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-fanout", type = "fanout")
            )
    })
    public void onMessage2(User user){
        System.out.println("onMessage2 run : " + user);
    }
}

启动查看



2.2RabbitMQ_fanout交换器_消息发送者代码开发 2.2.1 创建发送消息者

package com.bjsxt.rabbit.fanoutsender;

import com.bjsxt.entity.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class UserMessageSender {
    @Autowired
    private AmqpTemplate template;

    
    public void send(User user){
        this.template.convertAndSend("ex-fanout", "", user);
    }
}

创建测试代码

package com.bjsxt.test;

import com.bjsxt.RabbitPublisherApp;
import com.bjsxt.entity.User;
import com.bjsxt.rabbit.fanoutsender.UserMessageSender;
import com.bjsxt.rabbit.sender.LogMessageSender;
import com.bjsxt.rabbit.topicsender.TopicMessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;


@SpringBootTest(classes = {RabbitPublisherApp.class})
@RunWith(SpringRunner.class)
public class TestPublisher {
    @Autowired
    private LogMessageSender sender;
    @Autowired
    private UserMessageSender userMessageSender;
    @Autowired
    private TopicMessageSender topicMessageSender;

    private String exchange = "log-ex-direct";
    private String rkInfo = "direct-rk-info";
    private String rkError = "direct-rk-error";
    private String rkWarn = "direct-rk-warn";


    @Test
    public void testSendUserMessage2Fanout(){
        for(int i = 0; i < 3; i++){
            User user = new User();
            user.setId((long) i);
            user.setName("姓名 - " + i);
            user.setAge(20+i);

            this.userMessageSender.send(user);
        }
    }

    @Test
    public void testSend2Consumers(){
        for(int i = 0; i < 10; i++){
            this.sender.sendMessage(exchange, rkInfo, "info消息"+i);
            this.sender.sendMessage(exchange, rkError, "error消息"+i);
            this.sender.sendMessage(exchange, rkWarn, "warn消息"+i);
        }
    }

    @Test
    public void testSend(){
        Random r = new Random();
        // 发送10条消息。
        for(int i = 0 ; i < 10; i++){
            // rInt%3 - 0:投递消息到info;1:投递消息到error;2:投递消息到warn
            int rInt = r.nextInt(100);
            if(rInt%3 == 0){
                this.sender.sendMessage(exchange, rkInfo, "发送Info日志消息 - index="+i+";rInt="+rInt);
            }else if(rInt%3 == 1){
                this.sender.sendMessage(exchange, rkError, "发送error日志消息 - index="+i+";rInt="+rInt);
            }else{
                this.sender.sendMessage(exchange, rkWarn, "发送warn日志消息 - index="+i+";rInt="+rInt);
            }
        }
    }
}

先启动Consumer
启动 testSendUserMessage2Fanout

3 topic 交换器 3.1 RabbitMQ_topic交换器_执行流程介绍


允许在路由键(RoutingKey)中出现匹配规则。

路由键的写法和包写法相同。com.bjsxt.xxxx.xxx 格式。

在绑定时可以带有下面特殊符号,中间可以出现:

  • * : 代表一个单词(两个.之间内容)
  • # : 0 个或多个字符

接收方依然是公平调度,同一个队列中内容轮换获取值。

需要使用注解/API:

  • TopicExchange:Topic 交换器
  • amq.topic:内置 topic 交换器名称
RabbitMQ_topic交换器_消息消费者代码开发 代码

创建Consumer

package com.bjsxt.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Component
public class TopicConsumers {
    
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-sms-topic", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-topic", type = "topic"),
                    key = "*.rk.sms"
            )
    })
    public void onUserSMSMessage(String message){
        System.out.println("用户短信消息内容是:" + message);
    }

    
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-email-topic", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-topic", type = "topic"),
                    key = "*.rk.email"
            )
    })
    public void onUserEmailMessage(String message){
        System.out.println("用户邮件消息内容是:" + message);
    }

    
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "queue-all-topic", autoDelete = "false"),
                    exchange = @Exchange(value = "ex-topic", type = "topic"),
                    key = "*.rk.*"
            )
    })
    public void onUserServiceMessage(String message){
        System.out.println("执行的消息处理逻辑是:" + message);
    }
}

启动RabbitConsumerAPP



RabbitMQ_topic交换器_消息发送者代码开发 代码

package com.bjsxt.rabbit.topicsender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class TopicMessageSender {
    @Autowired
    private AmqpTemplate template;

    
    public void send(String exchange, String routingKey, String message){
        template.convertAndSend(exchange, routingKey, message);
    }
}

创建测试类

package com.bjsxt.test;

import com.bjsxt.RabbitPublisherApp;
import com.bjsxt.entity.User;
import com.bjsxt.rabbit.fanoutsender.UserMessageSender;
import com.bjsxt.rabbit.sender.LogMessageSender;
import com.bjsxt.rabbit.topicsender.TopicMessageSender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Random;


@SpringBootTest(classes = {RabbitPublisherApp.class})
@RunWith(SpringRunner.class)
public class TestPublisher {
    @Autowired
    private LogMessageSender sender;
    @Autowired
    private UserMessageSender userMessageSender;
    @Autowired
    private TopicMessageSender topicMessageSender;

    private String exchange = "log-ex-direct";
    private String rkInfo = "direct-rk-info";
    private String rkError = "direct-rk-error";
    private String rkWarn = "direct-rk-warn";

    @Test
    public void testSendMessage2Topic(){
        // 随机数%6
        // 0 rk - user.rk.sms *.rk.*  *.rk.sms
        // 1 rk - user.rk.email   *.rk.* *.rk.email
        // 2 rk - order.rk.sms *.rk.*  *.rk.sms
        // 3 rk - order.rk.email  *.rk.* *.rk.email
        // 4 rk - reg.rk.sms *.rk.*  *.rk.sms
        // 5 rk - reg.rk.qq  *.rk.*
        Random r = new Random();
        for(int i = 0; i < 10; i++){
            int rInt = r.nextInt(100);
            if(rInt%6 == 0){
                this.topicMessageSender.send("ex-topic",
                        "user.rk.sms",
                        "用户登录验证码是123456 - 发送短信");
            }else if(rInt%6 == 1){
                this.topicMessageSender.send("ex-topic",
                        "user.rk.email",
                        "用户登录验证码是123456 - 发送到邮箱");
            }else if(rInt%6 == 2){
                this.topicMessageSender.send("ex-topic",
                        "order.rk.sms",
                        "订单下订成功 - 发送短信");
            }else if(rInt%6 == 3){
                this.topicMessageSender.send("ex-topic",
                        "order.rk.email",
                        "订单下订成功 - 发送到邮箱");
            }else if(rInt%6 == 4){
                this.topicMessageSender.send("ex-topic",
                        "reg.rk.sms",
                        "注册验证码是654321 - 发送短信");
            }else if(rInt%6 == 5){
                this.topicMessageSender.send("ex-topic",
                        "reg.rk.qq",
                        "注册验证码是654321 - 发送QQ信息");
            }
        }
    }

    @Test
    public void testSendUserMessage2Fanout(){
        for(int i = 0; i < 3; i++){
            User user = new User();
            user.setId((long) i);
            user.setName("姓名 - " + i);
            user.setAge(20+i);

            this.userMessageSender.send(user);
        }
    }

    @Test
    public void testSend2Consumers(){
        for(int i = 0; i < 10; i++){
            this.sender.sendMessage(exchange, rkInfo, "info消息"+i);
            this.sender.sendMessage(exchange, rkError, "error消息"+i);
            this.sender.sendMessage(exchange, rkWarn, "warn消息"+i);
        }
    }

    @Test
    public void testSend(){
        Random r = new Random();
        // 发送10条消息。
        for(int i = 0 ; i < 10; i++){
            // rInt%3 - 0:投递消息到info;1:投递消息到error;2:投递消息到warn
            int rInt = r.nextInt(100);
            if(rInt%3 == 0){
                this.sender.sendMessage(exchange, rkInfo, "发送Info日志消息 - index="+i+";rInt="+rInt);
            }else if(rInt%3 == 1){
                this.sender.sendMessage(exchange, rkError, "发送error日志消息 - index="+i+";rInt="+rInt);
            }else{
                this.sender.sendMessage(exchange, rkWarn, "发送warn日志消息 - index="+i+";rInt="+rInt);
            }
        }
    }
}

先运行Consumer
运行testSendMessage2Topic

项目练习源码:https://gitee.com/cutelili/rabbit-mq/tree/master/amqp_rabbit

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4657368.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存