topic 主题
package topics; import com.jia.utils.RabbitMQUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import org.junit.Test; import java.io.IOException; public class Provider { @Test public void sendMsg() throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //声明topic类型交换机 channel.exchangeDeclare("topics","topic"); //路由key String routekey= "user.save"; channel.basicPublish("topics",routekey,null,("路由订阅"+routekey).getBytes()); } }
#消费者
package com.jia.topics; import com.jia.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer1 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //声明路由主题模式交换机 channel.exchangeDeclare("topics","topic"); //创建临时队列 String queue = channel.queueDeclare().getQueue(); //路由key String routekey="user.*"; //String routekey="user.#"; # 是匹配多个 如 user.save/user.stu.save/user.peo.save~~~ * 是匹配一个 //绑定临时队列交换机 channel.queueBind(queue,"topics",routekey); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer1消费:"+new String(body)); } }); } }
package com.jia.topics; import com.jia.utils.RabbitMQUtils; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer2 { public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); //声明路由主题模式交换机 channel.exchangeDeclare("topics","topic"); //创建临时队列 String queue = channel.queueDeclare().getQueue(); //路由key String routekey="user2.*"; //绑定临时队列交换机 channel.queueBind(queue,"topics",routekey); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer2消费:"+new String(body)); } }); } }
消费2不会消费消息 user2.* 无法匹配user.save
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)