RabbitMQ 6 种工作模式 - (五)Topics 通配符模式

RabbitMQ 6 种工作模式 - (五)Topics 通配符模式,第1张

RabbitMQ 6 种工作模式 - (五)Topics 通配符模式

概念:
  1. Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!

  2. Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

  3. 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert

  1. 红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到

  2. 黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配

生产者 producer:

 

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer_Topics {
    public static void main(String[] args) throws IOException, TimeoutException {
        
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置参数
        factory.setHost("192.168.16.62"); // ip 默认值 localhost
        factory.setPort(5672); // 端口 默认值 5672
        factory.setVirtualHost("/"); //虚拟机 默认值 /
        factory.setUsername("guest"); // 用户名 默认值 guest
        factory.setPassword("guest"); // 密码 默认值 guest
        // 3、创建连接 Connection
        Connection connection = factory.newConnection();
        // 4、创建Channel
        Channel channel = connection.createChannel();
        // 5、创建交换机
        
        String exchangeName = "test_topic";
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
        // 6、创建队列
        
        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";
        channel.queueDeclare(queue1Name, true, false, false, null);
        channel.queueDeclare(queue2Name, true, false, false, null);
        // 7、绑定队列和交换机
        
        // routing key 系统的名称.日志的级别
        // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
        channel.queueBind(queue1Name, exchangeName, "#.error");
        channel.queueBind(queue1Name, exchangeName, "order.*");

        channel.queueBind(queue2Name, exchangeName, "*.*");
        // 8、发送消息
        
        String body1 = "日志信息:order:findALl  日志级别:info";
        String body2 = "日志信息:user:findALl  日志级别:info";
        channel.basicPublish(exchangeName,"order.info",null,body1.getBytes());
        channel.basicPublish(exchangeName,"user.info",null,body2.getBytes());
        // 9、释放资源
        channel.close();
        connection.close();
    }
}
 消费者 consumer:
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Topic1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置参数
        // ip 默认值 localhost, 192.168.16.62 :启动MQ服务器的ip
        factory.setHost("192.168.16.62");
        factory.setPort(5672); // 端口 默认值 5672
        factory.setVirtualHost("/"); //虚拟机 默认值 /
        factory.setUsername("guest"); // 用户名 默认值 guest
        factory.setPassword("guest"); // 密码 默认值 guest
        // 3、创建连接 Connection
        Connection connection = factory.newConnection();
        // 4、创建Channel
        Channel channel = connection.createChannel();
        // 5、创建队列, 生产者已经创建,无需再声明

        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";

        //6、接收消息
        
        Consumer consumer = new DefaultConsumer(channel){
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag : "+consumerTag);
//                System.out.println("envelope : "+envelope);
//                System.out.println("properties : "+properties);
                System.out.println("保存数据库:" + new String(body));
            }
        };
        channel.basicConsume(queue1Name,true,consumer);
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_Topic2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        
        // 1、创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 2、设置参数
        // ip 默认值 localhost, 192.168.16.62 :启动MQ服务器的ip
        factory.setHost("192.168.16.62");
        factory.setPort(5672); // 端口 默认值 5672
        factory.setVirtualHost("/"); //虚拟机 默认值 /
        factory.setUsername("guest"); // 用户名 默认值 guest
        factory.setPassword("guest"); // 密码 默认值 guest
        // 3、创建连接 Connection
        Connection connection = factory.newConnection();
        // 4、创建Channel
        Channel channel = connection.createChannel();
        // 5、创建队列, 生产者已经创建,无需再声明

        String queue1Name = "test_topic_queue1";
        String queue2Name = "test_topic_queue2";

        //6、接收消息
        
        Consumer consumer = new DefaultConsumer(channel){
            
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag : "+consumerTag);
//                System.out.println("envelope : "+envelope);
//                System.out.println("properties : "+properties);
                System.out.println("控制台:" + new String(body));
            }
        };
        channel.basicConsume(queue2Name,true,consumer);
    }
}
结果:

 

 

小结:

Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4666912.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-06
下一篇 2022-11-06

发表评论

登录后才能评论

评论列表(0条)

保存