- 1、发布与订阅模式
- 1.1、消费者
- 1.2、生产者
- 1.3、运行测试
- 2、Routing 路由模式
- 2.1、消费者
- 2.2、生产者
- 3.3、运行测试
- 3、Topics 主题模式
- 3.1、消费者
- 3.2、生产者
- 3.3、运行测试
基于上篇博客的基础
1、发布与订阅模式1.1、消费者发布与订阅模式中多一个交换机的角色,也就是生产者生产出消息之后不会直接放进队列,而是会交给交换机,由交换机取交给自己绑定的队列,然后消费者再去队列里面拿,如下图所示:
消费者1:
public class BiaDu { public static void main(String[] args) throws IOException, TimeoutException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_BAI_DU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到) channel.queueBind(RabbitConstant.QUEUE_BAI_DU, RabbitConstant.QUEUE_EXCHANGE_WEATHER, ""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAI_DU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
消费者2:
public class Sina { public static void main(String[] args) throws IOException, TimeoutException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_SIN_LANG, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到) channel.queueBind(RabbitConstant.QUEUE_SIN_LANG, RabbitConstant.QUEUE_EXCHANGE_WEATHER, ""); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SIN_LANG , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }1.2、生产者
public class WeatherBureau { public static void main(String[] args) throws Exception { Connection connection = RabbitUtils.getConnection(); String input = new Scanner(System.in).next(); Channel channel = connection.createChannel(); //第一个参数交换机名字 其他参数和之前的一样 channel.basicPublish(RabbitConstant.QUEUE_EXCHANGE_WEATHER,"" , null , input.getBytes()); channel.close(); connection.close(); } }1.3、运行测试
先运行消费者,再运行生产者:
生产者:
消费者:
2.1、消费者路由模式相当于在发布与订阅模式上面增加了路由条件,发布与订阅模式是交换机把数据都交给绑定自己的队列,而路由模式则给这些队列加了条件,相当于一个过滤的功能,如下图所示:
消费者1:
public class BiaDu { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_BAI_DU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key channel.queueBind(RabbitConstant.QUEUE_BAI_DU, RabbitConstant.QUEUE_EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20201127"); channel.queueBind(RabbitConstant.QUEUE_BAI_DU, RabbitConstant.QUEUE_EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAI_DU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
消费者2:
public class Sina { public static void main(String[] args) throws IOException, TimeoutException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_SIN_LANG, false, false, false, null); //指定队列与交换机以及routing key之间的关系 channel.queueBind(RabbitConstant.QUEUE_SIN_LANG, RabbitConstant.QUEUE_EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201127"); channel.queueBind(RabbitConstant.QUEUE_SIN_LANG, RabbitConstant.QUEUE_EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20201127"); channel.queueBind(RabbitConstant.QUEUE_SIN_LANG, RabbitConstant.QUEUE_EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20201128"); channel.queueBind(RabbitConstant.QUEUE_SIN_LANG, RabbitConstant.QUEUE_EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20201012"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SIN_LANG , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }2.2、生产者
public class WeatherBureau { public static void main(String[] args) throws Exception { Map area = new linkedHashMap3.3、运行测试(); area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据"); area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据"); area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据"); area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据"); area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据"); area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据"); area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据"); area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据"); Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); Iterator > itr = area.entrySet().iterator(); while (itr.hasNext()) { Map.Entry me = itr.next(); //第一个参数交换机名字 第二个参数作为 消息的routing key channel.basicPublish(RabbitConstant.QUEUE_EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes()); } channel.close(); connection.close(); } }
先运行消费者,再运行生产者
生产者截图:
3、Topics 主题模式消费者截图:
所谓Topics 主题模式也可以叫通配符模式,在路由模式中我们写了很多需要接收信息的条件,在Topics 主题模式中可以用通配符代替它。
如下图所示:
消费者1:
public class BiaDu { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_BAI_DU, false, false, false, null); //queueBind用于将队列与交换机绑定 //参数1:队列名 参数2:交互机名 参数三:路由key channel.queueBind(RabbitConstant.QUEUE_BAI_DU, RabbitConstant.QUEUE_EXCHANGE_WEATHER_TOPIC, "*.*.*.20201127"); // channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hebei.shijiazhuang.20201128"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_BAI_DU , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("百度天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
消费者2:
public class Sina { public static void main(String[] args) throws IOException, TimeoutException { //获取TCP长连接 Connection connection = RabbitUtils.getConnection(); //获取虚拟连接 final Channel channel = connection.createChannel(); //声明队列信息 channel.queueDeclare(RabbitConstant.QUEUE_SIN_LANG, false, false, false, null); //指定队列与交换机以及routing key之间的关系 channel.queueBind(RabbitConstant.QUEUE_SIN_LANG, RabbitConstant.QUEUE_EXCHANGE_WEATHER_TOPIC, "us.#"); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SIN_LANG , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("新浪天气收到气象信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag() , false); } }); } }3.2、生产者
public class WeatherBureau { public static void main(String[] args) throws Exception { Map area = new linkedHashMap3.3、运行测试(); area.put("china.hunan.changsha.20201127", "中国湖南长沙20201127天气数据"); area.put("china.hubei.wuhan.20201127", "中国湖北武汉20201127天气数据"); area.put("china.hunan.zhuzhou.20201127", "中国湖南株洲20201127天气数据"); area.put("us.cal.lsj.20201127", "美国加州洛杉矶20201127天气数据"); area.put("china.hebei.shijiazhuang.20201128", "中国河北石家庄20201128天气数据"); area.put("china.hubei.wuhan.20201128", "中国湖北武汉20201128天气数据"); area.put("china.henan.zhengzhou.20201128", "中国河南郑州20201128天气数据"); area.put("us.cal.lsj.20201128", "美国加州洛杉矶20201128天气数据"); Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); Iterator > itr = area.entrySet().iterator(); while (itr.hasNext()) { Map.Entry me = itr.next(); //第一个参数交换机名字 第二个参数作为 消息的routing key channel.basicPublish(RabbitConstant.QUEUE_EXCHANGE_WEATHER_TOPIC,me.getKey() , null , me.getValue().getBytes()); } channel.close(); connection.close(); } }
先运行消费者,再运行生产者:
生产者运行截图如下:
消费者运行截图如下:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)