实例教程:主题交换机

实例教程:主题交换机,第1张

RabbitMQ实例教程:主题交换机

在前面的例子中,虽然我们使用了直接路由而不是扇出路由来解决盲播问题,但是直接路由也有它的缺陷,它不能转发基于多种标准的路由。


在上面的日志系统中,如果不仅要基于日志级别订阅,还要基于日志的来源订阅,该怎么办?这时,你可能会想到unix系统工具中的syslog服务。它不仅基于日志级别(info/warn/crit)进行路由和转发...),而且还根据 *** 作(auth/cron/kern)路由和转发...).


在这种情况下,日志系统更加灵活。它不仅可以监视来自“cron”的关键错误,还可以监视来自“kern”的所有日志。其实话题交换可以解决这个问题。


话题交流


交换机的路由代码不能任意写,必须是用小树隔开的单词列表。这些词可以随便写,但通常是与连接消息的特征相关的词。有效的路由代码应该是“stock.usd.nyse”、“nyse.vmw”和“quick.orange.rabbit”。路由码可以随意写,但长度限制在255字节。


请注意,绑定代码也必须采用相同的形式。主题交换机类似于直接交换机,具有特定路由代码的消息被发送到所有匹配绑定代码的队列,但是有两个特殊的绑定代码:


*:可以代替一个单词。


#:可以替换0个或多个单词


在这个例子中,我们向所有的动物发送一个信息,这个信息与三个词一致(第一个词描述速度;第二个词描述颜色;第三个词描述物种)会发送一条消息:“

我们创建了三个绑定:Q1使用“*.orange.*”绑定,Q2使用“*”。*.兔子”和“懒。#"绑定。这些绑定的含义如下:

Q1描述了所有颜色是橙色的动物。

Q2描述了兔子和懒惰的动物。

这样,“quick.orange.rabbit”消息通过该路由被转发到Q1和Q2队列。“lazy.orange.elephant”消息也将被转发到Q1和Q2队列。“quick.orange.fox”消息只会转发到Q1队列,“lazy.brown.fox”只会转发到Q2队列。“lazy.pink.rabbit”将被转发到Q2队列一次,即使它匹配两个绑定。如果“quick.brown.fox”不匹配任何队列,它将被丢弃。

如果我们违反规则,一次只发送一个或四个单词,如“orange”或“quick.orange.male.rabbit”,这些消息如果不匹配任何绑定,就会被丢弃。但是如果你发送一条类似“lazy.orange.male.rabbit”的消息,它仍然会被转发到Q2队列,因为它匹配最后一个绑定。

主题开关是一个非常强大的开关。当它只绑定“#”时,它将接收所有消息,类似于扇出开关。当不使用“*”和“#”符号时,主题切换相当于直接切换。


源代码


EmitLogTopic.java

package com.favccxx.favrabbit; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String[] routingKeys = { "fast.orange.duck", "slow.orange.fish", "grey.rabbit", "fast.black.rabbit", "quick.white.rabbit", "lazy.dog", "lazy.black.pig" }; String[] messages = { "Hello", "Guys", "Girls", "Babies" }; for (int i = 0; i < routingKeys.length; i++) { for (int j = 0; j < messages.length; j++) { channel.basicPublish(EXCHANGE_NAME, routingKeys[i], null, messages[j].getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKeys[i] + "':'" + messages[j] + "'"); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } }


receivelogstopic.Java

package com.favccxx.favrabbit; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); String[] bindingKeys = { "*.orange.*", "*.*.rabbit", "lazy.#" }; for (final String bindingKey : bindingKeys) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[" + bindingKey + "] Received message :'" + message + "' from routingKey : " + envelope.getRoutingKey()); } }; channel.basicConsume(queueName, true, consumer); } } }


运行消息发送方,消息接收平台上的输出内容如下

[*.orange.*] Received message :'Hello' from routingKey : fast.orange.duck [*.*.rabbit] Received message :'Guys' from routingKey : fast.orange.duck [lazy.#] Received message :'Girls' from routingKey : fast.orange.duck [*.orange.*] Received message :'Babies' from routingKey : fast.orange.duck [*.*.rabbit] Received message :'Hello' from routingKey : slow.orange.fish [lazy.#] Received message :'Guys' from routingKey : slow.orange.fish [*.orange.*] Received message :'Girls' from routingKey : slow.orange.fish [*.*.rabbit] Received message :'Babies' from routingKey : slow.orange.fish [lazy.#] Received message :'Hello' from routingKey : fast.black.rabbit [*.orange.*] Received message :'Guys' from routingKey : fast.black.rabbit [*.*.rabbit] Received message :'Girls' from routingKey : fast.black.rabbit [lazy.#] Received message :'Babies' from routingKey : fast.black.rabbit [*.orange.*] Received message :'Hello' from routingKey : quick.white.rabbit [*.*.rabbit] Received message :'Guys' from routingKey : quick.white.rabbit [lazy.#] Received message :'Girls' from routingKey : quick.white.rabbit [*.orange.*] Received message :'Babies' from routingKey : quick.white.rabbit [*.*.rabbit] Received message :'Hello' from routingKey : lazy.dog [lazy.#] Received message :'Guys' from routingKey : lazy.dog [*.orange.*] Received message :'Girls' from routingKey : lazy.dog [*.*.rabbit] Received message :'Babies' from routingKey : lazy.dog [lazy.#] Received message :'Hello' from routingKey : lazy.black.pig [*.orange.*] Received message :'Guys' from routingKey : lazy.black.pig [*.*.rabbit] Received message :'Girls' from routingKey : lazy.black.pig [lazy.#] Received message :'Babies' from routingKey : lazy.black.pig


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zz/779856.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-04
下一篇 2022-05-04

发表评论

登录后才能评论

评论列表(0条)

保存