官方文档:文档:目录 — RabbitMQ
生产者按routing key发送消息,不同的消费者端按不同的routing key接收消息。
路由模式消费者端和发布订阅模式消费者端的区别:
1、exchange的type为direct
2、发送消息的时候加入了routing key
配置类
package com.xmx; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class ConnectionUtil { public static Connection getConnection() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setVirtualHost("/j98");//虚拟主机名 factory.setUsername("jianzi");//账号 factory.setPassword("jianzi");//密码 //创建连接 Connection newConnection = factory.newConnection(); return newConnection; } }
Producer:消息发送者
package com.xmx; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.util.Random; public class Producter4 { private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); //声明交换机Exchange类型为direct channel.exchangeDeclare(EXCHANGE_NAME, "direct"); for (int i = 1; i <= 10; i++) { int random = (new Random()).nextInt(3) + 1;//1-3 if (random == 1) { //发布消息3种routingKey的消息 String message = "hello info"; channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes()); System.out.println("路由模式发布info消息:" + message); } else if (random == 2) { String message = "hello warning"; channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes()); System.out.println("路由模式发布warning消息:" + message); } else { String message = "hello error"; channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes()); System.out.println("路由模式发布error消息:" + message); } } channel.close(); connection.close(); } }
Consumer:消费者1
package com.xmx; import com.rabbitmq.client.*; import java.io.IOException; public class Customer4 { private final static String QUEUE_NAME = "publishSubscrible_queue1"; private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //申明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); //队列绑定交换机,指定路由routingKey //结束路由routingKey为info和warning的消息 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); // channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning"); //同一时刻服务器只发送1条消息给消费者(能者多劳,消费消息快的,会消费更多的消息) //保证在接收端一个消息没有处理完时不会接收另一个消息,即消费者端发送了ack后才会接收下一个消息。 //在这种情况下生产者端会尝试把消息发送给下一个空闲的消费者。 channel.basicQos(1); //声明消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("路由模式 消费者1 消费消息:"+message); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, consumer); } }
Consumer:消费者2
消费者2与消费者1不同的地方是绑定的队列不同
package com.xmx; import com.rabbitmq.client.*; import java.io.IOException; //路由模式 public class Customer4 { private final static String QUEUE_NAME2 = "publishSubscrible_queue2"; private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws Exception{ Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); //声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //申明队列 channel.queueDeclare(QUEUE_NAME2, true, false, false, null); //队列绑定交换机,指定路由routingKey channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "warning"); //同一时刻服务器只发送1条消息给消费者(能者多劳,消费消息快的,会消费更多的消息) //保证在接收端一个消息没有处理完时不会接收另一个消息,即消费者端发送了ack后才会接收下一个消息。 //在这种情况下生产者端会尝试把消息发送给下一个空闲的消费者。 channel.basicQos(1); //声明消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("路由模式 消费者2 消费消息:"+message); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME2, false, consumer); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)