动态订阅模型,可以使用通配符进行绑定 这种模型的RoutingKey一般是由一个或多个单词组成,多个单词之间用"."分割,例如 item.insert 通配符: - * 匹配不多不少恰好一个词 - # 匹配一个或多个词(采用hash策略)生产者
package com.huixiang.rabbitmq.topics; import com.huixiang.utils.RabbitmqUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; // 主题模型(topic quene) 生产者 动态路由 public class Productor { public static void main(String[] args) { Connection connection = null; Channel channel = null; try { // 获取连接 connection = RabbitmqUtils.setConnection(); // 获取通道 channel = connection.createChannel(); // 指定key名 String keyName = "logs.topic.save"; channel.exchangeDeclare("logs_topic","topic"); //要多条。所以用循环 for (int i =0;i<10;i++){ //生产消息 channel.basicPublish("logs_topic",keyName,null,"nihao".getBytes()); } }catch (Exception e){ e.printStackTrace(); }finally { RabbitmqUtils.closeConnectionChanel(channel,connection); } } }消费者1
package com.huixiang.rabbitmq.topics; import com.huixiang.utils.RabbitmqUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static void main(String[] args) { Connection connection = null; Channel channel = null; try { connection = RabbitmqUtils.setConnection(); channel = connection.createChannel(); // 创建临时对列 String queue = channel.queueDeclare().getQueue(); // 指定key名 String keyName1 = "logs.#"; channel.exchangeDeclare("logs_topic","topic"); // 交换机绑定对列(临时对列) channel.queueBind(queue,"logs_topic",keyName1); channel.basicConsume(queue, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("消费者1:收到消息" + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("失败"); } }); }catch (Exception e){ e.printStackTrace(); } } }消费者2
package com.huixiang.rabbitmq.topics; import com.huixiang.utils.RabbitmqUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static void main(String[] args) { Connection connection = null; Channel channel = null; try { connection = RabbitmqUtils.setConnection(); channel = connection.createChannel(); // 创建临时对列 String queue = channel.queueDeclare().getQueue(); // 指定key名 String keyName1 = "logs.topic.*"; channel.exchangeDeclare("logs_topic","topic"); // 交换机绑定对列(临时对列) channel.queueBind(queue,"logs_topic",keyName1); channel.basicConsume(queue, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("消费者1:收到消息" + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("失败"); } }); }catch (Exception e){ e.printStackTrace(); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)