一、交换机 Exchanges
1.1 交换机Exchanges概念
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们放到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
1.2 交换机Exchanges的类型
直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
1.3 无名exchange
前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实 是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话
二. 临时队列之前的章节我们使用的是具有特定名称的队列。队列的名称对我们来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。
每当我们连接到Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称 的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
创建临时队列的方式如下: String queueName = channel.queueDeclare().getQueue();
三、绑定(bindings)什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队 列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定
四、发布订阅模式(Fanout) 4.1 Fanout介绍Fanout这种类型非常简单。它是将接收到的所有消息广播到它所知道的所有队列中。
4.2 代码实现为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消息,第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘, 另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费者。
生产者代码
public class EmitLog { private static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection=RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message=scanner.next(); channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println("生产者发出的消息:"+message); } RabbitMqUtils.closeConnectionAndChanel(connection,channel); } }
消费者:
public class ReceiveLogs01 { private static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); //获取队列的名字,随机生成的 String queueName=channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println("等待接收消息,把消息打印在屏幕上。。。"); DeliverCallback deliverCallback=(consumerTag,delivery)->{ String message=new String(delivery.getBody(),"UTF-8"); System.out.println("控制台打印收到的消息:"+message); }; CancelCallback cancelCallback = consumerTag->{ System.out.println("消费者取消消息"); }; channel.basicConsume(queueName,true,deliverCallback,cancelCallback); } }五、路由模式Direct exchange
上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希 望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。
生产者代码
public class EmitLog { private static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection=RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message=scanner.next(); channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes()); System.out.println("生产者发出的消息:"+message); } RabbitMqUtils.closeConnectionAndChanel(connection,channel); } }
消费者代码
public class ReceiveLogs02 { private static final String EXCHANGE_NAME="logs"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName=channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"info"); channel.queueBind(queueName,EXCHANGE_NAME,"warning"); System.out.println("等待接收消息,把消息写入日志文件。。。"); DeliverCallback deliverCallback=(consumerTag,delivery)->{ String message=new String(delivery.getBody(),"UTF-8"); System.out.println("日志文件收到的消息:"+message); }; CancelCallback cancelCallback = consumerTag->{ System.out.println("消费者取消消息"); }; channel.basicConsume(queueName,true,deliverCallback,cancelCallback); } }六、Topics 6.1 存在的问题
尽管使用direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候 就只能使用 topic 类型。
6.2 Topic 的要求发送类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的
*(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
6.3 生产者代码
public class EmitLog { private static final String EXCHANGE_NAME="topicLogs"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection=RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map6.4 消费者代码bindingMap=new HashMap<>(); bindingMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到"); bindingMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到"); bindingMap.put("quick.orange.fox","被队列 Q1 接收到"); bindingMap.put("lazy.brown.fox","被队列 Q2 接收到"); bindingMap.put("lazy.pink.rabbit","被队列 Q2 接收到"); bindingMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingMap.put("qquick.orange.male.rabbit","不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingMap.put("lazy.orange.male.rabbit","被队列 Q2 接收到"); for (Map.Entry bindingEntry: bindingMap.entrySet()) { String bindingKey = bindingEntry.getKey(); String message = bindingEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8")); System.out.println("生产者发出消息" + message); } //RabbitMqUtils.closeConnectionAndChanel(connection,channel); } }
public class ReceiveLogs01 { private static final String EXCHANGE_NAME="topicLogs"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); //获取队列的名字,随机生成的 String queueName=channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*"); System.out.println("等待接收消息,把消息打印在屏幕上。。。"); DeliverCallback deliverCallback=(consumerTag,delivery)->{ String message=new String(delivery.getBody(),"UTF-8"); System.out.println("控制台打印收到的消息:"+message+"t绑定关系:"+delivery.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback = consumerTag->{ System.out.println("消费者取消消息"); }; channel.basicConsume(queueName,true,deliverCallback,cancelCallback); } }
public class ReceiveLogs02 { private static final String EXCHANGE_NAME="topicLogs"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMqUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName=channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#"); System.out.println("等待接收消息,把消息写入日志文件。。。"); DeliverCallback deliverCallback=(consumerTag,delivery)->{ String message=new String(delivery.getBody(),"UTF-8"); System.out.println("日志文件收到的消息:"+message+"t绑定关系:"+delivery.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback = consumerTag->{ System.out.println("消费者取消消息"); }; channel.basicConsume(queueName,true,deliverCallback,cancelCallback); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)