官方文档:文档:目录 — RabbitMQ
在Publish/Subscribe工作模式中,必须先配置一个fanout类型的交换器,不需要指定对应的路由键(Routing key),同时会将消息路由到每一个消息队列上,然后每个消息队列都可以对相同的消息进行接收存储,进而由各自消息队列关联的消费者进行消费。
适用于进行相同业务功能处理的场合
配置类:
package com.xmx; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static Connection getConnection() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setVirtualHost("/j98");//虚拟主机名 factory.setUsername("jianzi");//账号 factory.setPassword("jianzi");//密码 //创建连接 Connection newConnection = factory.newConnection(); return newConnection; } }
Producer:消息发布者
package com.xmx; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Exchanner { private final static String EXCHANGE_NAME = "publishSubscrible_exchange"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机Exchange类型为fanout channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); for (int i = 1; i <= 20; i++) { String message = "publish/subscrible hello world--" + i; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println("发布订阅 生产者 发布消息:" + message); } channel.close(); connection.close(); } }
生产者端发布消息到交换机,使用“fanout”方式发送,即广播消息,不需要使用queue,发送端不需要关心谁接收。
消费者1:
package com.xmx; import com.rabbitmq.client.*; import java.io.IOException; //发布订阅模式 public class Customer1 { private final static String TestQueue1 = "publishSubscrible_queue1"; private final static String EXCHANGE_NAME = "publishSubscrible_exchange"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //声明队列 channel.queueDeclare(TestQueue1, true, false, false, null); //绑定队列到交换机 channel.queueBind(TestQueue1, EXCHANGE_NAME, ""); //同一时刻服务器只发送1条消息给消费者(能者多劳,消费消息快的,会消费更多的消息) //保证在接收端一个消息没有处理完时不会接收另一个消息,即消费者端发送了ack后才会接收下一个消息。 //在这种情况下生产者端会尝试把消息发送给下一个空闲的消费者。 //channel.basicQos(1); //消费者 DefaultConsumer consumer1 = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("发布订阅 消费者1 消费消息:"+message); //手动返回结果 channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(TestQueue1, false, consumer1); } }
消费者2:与消费者1不同的是队列名不同
package com.xmx; import com.rabbitmq.client.*; import java.io.IOException; //发布订阅模式 public class Customer2{ private final static String TestQueue2 = "publishSubscrible_queue2"; private final static String EXCHANGE_NAME = "publishSubscrible_exchange"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //声明队列 channel.queueDeclare(TestQueue2, true, false, false, null); //绑定队列到交换机 channel.queueBind(TestQueue2, EXCHANGE_NAME, ""); //同一时刻服务器只发送1条消息给消费者(能者多劳,消费消息快的,会消费更多的消息) //保证在接收端一个消息没有处理完时不会接收另一个消息,即消费者端发送了ack后才会接收下一个消息。 //在这种情况下生产者端会尝试把消息发送给下一个空闲的消费者。 //channel.basicQos(1); //消费者 DefaultConsumer consumer1 = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("发布订阅 消费者1 消费消息:"+message); //手动返回结果 channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(TestQueue2, false, consumer1); } }
消费者端:
1、声明和生产者端一样的交换机。
2、注意binding queue的时候,channel.queueBind()的第三个参数Routing key为空,即所有的消息都接收。如果这个值不为空,在exchange type为“fanout”方式下该值被忽略!
3、消息队列和交换机绑定
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)