RabbitMQ实例教程:发布订阅者消息队列

RabbitMQ实例教程:发布订阅者消息队列,第1张

RabbitMQ实例教程:发布/订阅者消息队列

消息交换


RabbitMQ消息实体模型的核心价值在于,生产者总是很难立即将所有消息推送到队列中。一般来说,生产者甚至不知道消息应该发送到什么队列。


相反,生产者只能将消息推送到交易所。开关的功能相对简单。一方面,它接受生产者发送的消息,另一方面,它将消息推送到队列。交换机必须清楚地了解消息如何处理它收到的每条消息。是否应该将其添加到特定的队列中?是不是应该增加到几个队列?还是应该扔掉?本标准是根据开关的类型定义的。


有三种类型的开关:直接、主题、标题和扇出。以fanout为例构建一个“logs”交换机。


channel.exchangeDeclare("logs", "fanout");


扇出交换机很简单,它将广播它接收到的所有队列的所有消息。


开关的名称


在前一种情况下,您可以在不知道所有开关定义的情况下推送消息。这是因为您应用了默认开关(""),但是您可以应用自己的开关。


channel.basicPublish("", "hello", null, message.getBytes()); //空字符串交换机 channel.basicPublish( "logs", "", null, message.getBytes()); //logs交换机


临时队列


在前面的例子中,每个人都指定了队列的实际名称(比如hello和task_queue)。命名队列非常重要,因为生产者和消费者是发送消息的队列名称。


但是对于日志的消息队列,大家会监控所有的日志消息,而不是一些非空子集。而且大家只关心今天生成的新闻,不关心历史时间新闻。要解决这个问题,我们必须做到以下几点:


首先,在我们连接Rabbit远程服务器之前,我们必须有一个新的空队列。我们可以自己随机生成一个队列名,也可以让网络服务器随机生成一个队列名。


其次,当消息客户失去连接时,应该自动删除队列。


在Java中,我们使用不带主参数的queueDeclare()方法来构建一个非持久的、惟一的队列,该队列可以在使用后自动删除。


String queueName = channel.queueDeclare().getQueue();


queueName可以是任何队列名,如amq.gen-jzty20brgko-hjmuj0wlg。


消息绑定


在前面,我们设置了一个扇出型交换机和队列。现在必须告诉交换机将消息推送到队列中。交换机队列和交换机队列之间的关联是消息绑定。


应用下面的编码日志开关将消息传递到队列。


channel.queueBind(queueName, "logs", "");


将开关和消息关联在一起



现在我们有一个提交日志的消息生产者,和以前的消息发布者没有太大区别。唯一的区别是,我们将消息发送到日志交换机,而不是未命名的交换机。在推送消息时,每个人都必须展示一个路由器,虽然它在fanoutswitch中没有任何作用。下面是提交日志的Java代码。


EmitLog.java

package com.favccxx.favrabbit; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog {  private static final String EXCHANGE_NAME = "logs";  public static void main(String[] argv) throws Exception {   ConnectionFactory factory = new ConnectionFactory();   factory.setHost("localhost");   Connection connection = factory.newConnection();   Channel channel = connection.createChannel();   channel.exchangeDeclare(EXCHANGE_NAME, "fanout");   String[] sendMsgs = {"I", "saw", "a", "dog"};   String message = getMessage(sendMsgs);   channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));   System.out.println(" [x] Sent '"  message  "'");   channel.close();   connection.close();  }  private static String getMessage(String[] strings) {   if (strings.length < 1)    return "info: Hello World!";   return joinStrings(strings, " ");  }  private static String joinStrings(String[] strings, String delimiter) {   int length = strings.length;   if (length == 0)    return "";   StringBuilder words = new StringBuilder(strings[0]);   for (int i = 1; i < length; i) {    words.append(delimiter).append(strings[i]);   }   return words.toString();  } }



如上图,创建了与消息网络服务器的连接后,声明了一个交换机,因为不允许系统软件发布到空交换机。如果没有与交换机相关联的队列,消息将会丢失,但是每个人都很担心。如果没有客户监控新闻,大家都会失去。


接受消息代码receivlogs.java


package com.favccxx.favrabbit; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class ReceiveLogs {  private static final String EXCHANGE_NAME = "logs";  public static void main(String[] argv) throws Exception {   ConnectionFactory factory = new ConnectionFactory();   factory.setHost("localhost");   Connection connection = factory.newConnection();   Channel channel = connection.createChannel();   channel.exchangeDeclare(EXCHANGE_NAME, "fanout");   String queueName = channel.queueDeclare().getQueue();   channel.queueBind(queueName, EXCHANGE_NAME, "");   System.out.println(" [*] Waiting for messages. To exit press CTRLC");   Consumer consumer = new DefaultConsumer(channel) {    @Override    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,      byte[] body) throws IOException {     String message = new String(body, "UTF-8");     System.out.println(" [x] Received '"  message  "'");    }   };   channel.basicConsume(queueName, true, consumer);  } }



数据测试


已经 *** 作了许多日志消息接收器的情况,并且已经使用日志消息发布器来推送消息。发现每个日志消息接收者都收到了相同的数据信息,这表明发布和订阅已经成功。


 [x] Received 'I saw a dog'




欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zz/779855.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-04
下一篇 2022-05-04

发表评论

登录后才能评论

评论列表(0条)

保存