Topic类型的Exchange与Direct相比,都是可以根据Routingkey把消息路由分到不同的队列,只不过Topic类型Exchange可以让队列绑定Routingkey的时候使用通配符。
这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以"."分割
* 匹配一个单词
# 匹配多个单词
Provicerpublic class Provider { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("topics", "topic"); String routeKey = "user.save"; channel.basicPublish("topics",routeKey,null,"topic消息".getBytes(StandardCharsets.UTF_8)); RabbitMQUtil.closeAll(channel,connection); } }Consumer1
public class Consumer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机和交换机类型 channel.exchangeDeclare("topics","topic"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"topics","user.*"); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1:"+new String(body)); } }); } }Consumer2
public class Consumer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机和交换机类型 channel.exchangeDeclare("topics","topic"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"topics","#.user.#"); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2:"+new String(body)); } }); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)