RabbitMq------路由模式

RabbitMq------路由模式,第1张

RabbitMq------路由模式

         官方文档:文档:目录 — RabbitMQ

      生产者按routing key发送消息,不同的消费者端按不同的routing key接收消息。

       路由模式消费者端和发布订阅模式消费者端的区别:

1、exchange的type为direct

2、发送消息的时候加入了routing key

配置类

package com.xmx;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
    public static Connection getConnection() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setVirtualHost("/j98");//虚拟主机名
        factory.setUsername("jianzi");//账号
        factory.setPassword("jianzi");//密码
        //创建连接
        Connection newConnection = factory.newConnection();
        return newConnection;
    } 
}

 Producer:消息发送者

package com.xmx;



import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.util.Random;


public class Producter4 {
    private final static String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        //声明交换机Exchange类型为direct
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        for (int i = 1; i <= 10; i++) {
            int random = (new Random()).nextInt(3) + 1;//1-3
            if (random == 1) {
                //发布消息3种routingKey的消息
                String message = "hello info";
                channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
                System.out.println("路由模式发布info消息:" + message);
            } else if (random == 2) {
                String message = "hello warning";
                channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes());
                System.out.println("路由模式发布warning消息:" + message);
            } else {
                String message = "hello error";
                channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes());
                System.out.println("路由模式发布error消息:" + message);
            }
        }
        channel.close();
        connection.close();
    }
}



Consumer:消费者1

package com.xmx;

import com.rabbitmq.client.*;

import java.io.IOException;


public class Customer4 {
    private final static String QUEUE_NAME = "publishSubscrible_queue1";
    private final static String EXCHANGE_NAME = "topic_exchange";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //申明队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //队列绑定交换机,指定路由routingKey
        //结束路由routingKey为info和warning的消息
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
//        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

        //同一时刻服务器只发送1条消息给消费者(能者多劳,消费消息快的,会消费更多的消息)
        //保证在接收端一个消息没有处理完时不会接收另一个消息,即消费者端发送了ack后才会接收下一个消息。
        //在这种情况下生产者端会尝试把消息发送给下一个空闲的消费者。
        channel.basicQos(1);
        //声明消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("路由模式 消费者1 消费消息:"+message);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}


Consumer:消费者2

消费者2与消费者1不同的地方是绑定的队列不同

package com.xmx;

import com.rabbitmq.client.*;

import java.io.IOException;

//路由模式
public class Customer4 {
    private final static String QUEUE_NAME2 = "publishSubscrible_queue2";
    private final static String EXCHANGE_NAME = "topic_exchange";
    public static void main(String[] args) throws Exception{
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //申明队列
        channel.queueDeclare(QUEUE_NAME2, true, false, false, null);

        //队列绑定交换机,指定路由routingKey
        channel.queueBind(QUEUE_NAME2, EXCHANGE_NAME, "warning");


        //同一时刻服务器只发送1条消息给消费者(能者多劳,消费消息快的,会消费更多的消息)
        //保证在接收端一个消息没有处理完时不会接收另一个消息,即消费者端发送了ack后才会接收下一个消息。
        //在这种情况下生产者端会尝试把消息发送给下一个空闲的消费者。
        channel.basicQos(1);
        //声明消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("路由模式 消费者2 消费消息:"+message);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME2, false, consumer);
    }
}


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5707654.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存