-
Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
-
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
-
通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
-
红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
-
黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer_Topics { public static void main(String[] args) throws IOException, TimeoutException { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置参数 factory.setHost("192.168.16.62"); // ip 默认值 localhost factory.setPort(5672); // 端口 默认值 5672 factory.setVirtualHost("/"); //虚拟机 默认值 / factory.setUsername("guest"); // 用户名 默认值 guest factory.setPassword("guest"); // 密码 默认值 guest // 3、创建连接 Connection Connection connection = factory.newConnection(); // 4、创建Channel Channel channel = connection.createChannel(); // 5、创建交换机 String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null); // 6、创建队列 String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); // 7、绑定队列和交换机 // routing key 系统的名称.日志的级别 // 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库 channel.queueBind(queue1Name, exchangeName, "#.error"); channel.queueBind(queue1Name, exchangeName, "order.*"); channel.queueBind(queue2Name, exchangeName, "*.*"); // 8、发送消息 String body1 = "日志信息:order:findALl 日志级别:info"; String body2 = "日志信息:user:findALl 日志级别:info"; channel.basicPublish(exchangeName,"order.info",null,body1.getBytes()); channel.basicPublish(exchangeName,"user.info",null,body2.getBytes()); // 9、释放资源 channel.close(); connection.close(); } }消费者 consumer:
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_Topic1 { public static void main(String[] args) throws IOException, TimeoutException { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置参数 // ip 默认值 localhost, 192.168.16.62 :启动MQ服务器的ip factory.setHost("192.168.16.62"); factory.setPort(5672); // 端口 默认值 5672 factory.setVirtualHost("/"); //虚拟机 默认值 / factory.setUsername("guest"); // 用户名 默认值 guest factory.setPassword("guest"); // 密码 默认值 guest // 3、创建连接 Connection Connection connection = factory.newConnection(); // 4、创建Channel Channel channel = connection.createChannel(); // 5、创建队列, 生产者已经创建,无需再声明 String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; //6、接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // System.out.println("consumerTag : "+consumerTag); // System.out.println("envelope : "+envelope); // System.out.println("properties : "+properties); System.out.println("保存数据库:" + new String(body)); } }; channel.basicConsume(queue1Name,true,consumer); } }
import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer_Topic2 { public static void main(String[] args) throws IOException, TimeoutException { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2、设置参数 // ip 默认值 localhost, 192.168.16.62 :启动MQ服务器的ip factory.setHost("192.168.16.62"); factory.setPort(5672); // 端口 默认值 5672 factory.setVirtualHost("/"); //虚拟机 默认值 / factory.setUsername("guest"); // 用户名 默认值 guest factory.setPassword("guest"); // 密码 默认值 guest // 3、创建连接 Connection Connection connection = factory.newConnection(); // 4、创建Channel Channel channel = connection.createChannel(); // 5、创建队列, 生产者已经创建,无需再声明 String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; //6、接收消息 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // System.out.println("consumerTag : "+consumerTag); // System.out.println("envelope : "+envelope); // System.out.println("properties : "+properties); System.out.println("控制台:" + new String(body)); } }; channel.basicConsume(queue2Name,true,consumer); } }结果:
小结:
Topic 主题模式可以实现 Pub/Sub 发布与订阅模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的时候可以使用通配符,显得更加灵活。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)