RabbitMQ五种消息模式

RabbitMQ五种消息模式,第1张

RabbitMQ五种消息模式 1,简单模式

工作的流程:

  1. 当生产者生产消息后,将消息发往队列.
  2. 当队列中有消息时,消费者会实时的监听队列中的消息.如果有消息则会执行消息

2,工作模式

        默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。

采用工作队列

在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。channel.basicQos(1);

public class Consumer1 {
    private static final String QUEUE_NAME = "yao-queue";

    public static void main(String[] args) throws IOException, TimeoutException, IOException, TimeoutException {
        // 1.创建连接
        Connection connection = RabbitMQConnection.getConnection();
        // 2.设置通道
        Channel channel = connection.createChannel();
        //指定我们消费者每次批量获取消息
        channel.basicQos(2);
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                String msg = new String(body, "UTF-8");
                System.out.println("消费者获取消息:" + msg);
                try {
                    // 消费者完成 删除该消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }catch (Exception e){
                }
            }
        };
        // 3.监听队列
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    }
}
3,Fanout exchange(扇型交换机)发布订阅模式

                生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者。

 

原理:

  1. 需要创建两个队列 ,每个队列对应一个消费者;
  2. 队列需要绑定我们交换机
  3. 生产者投递消息到交换机中,交换机在将消息分配给两个队列中都存放起来;
  4. 消费者从队列中获取这个消息。
 

    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //  创建Connection
        Connection connection = RabbitMQConnection.getConnection();
        // 创建Channel
        Channel channel = connection.createChannel();
        // 通道关联交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true);
        String msg = "6666";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        channel.close();
        connection.close();
    }

    private static final String QUEUE_NAME = "heng-queue";
    
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("邮件消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("邮件消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}

 
    private static final String QUEUE_NAME = "yao-queue";
    
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("短信消费者...");
        // 创建我们的连接
        Connection connection = RabbitMQConnection.getConnection();
        // 创建我们通道
        final Channel channel = connection.createChannel();
        // 关联队列消费者关联队列
        channel.queueBind   (QUEUE_NAME, EXCHANGE_NAME, "");
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("短信消费者获取消息:" + msg);
            }
        };
        // 开始监听消息 自动签收
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

    }
}
4,Direct exchange(直连交换机)路由模式

当交换机类型为direct类型时,根据队列绑定的路由建转发到具体的队列中存放消息

 // 生产者   通道关联交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
        String msg = "6666";
        channel.basicPublish(EXCHANGE_NAME, "email", null, msg.getBytes());
//消费者1
  // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
//消费者2
 // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");
5,Topic exchange(主题交换机)主题模式

当交换机类型为topic类型时,根据队列绑定的路由建模糊转发到具体的队列中存放。

#号表示支持匹配多个词;

*号表示只能匹配一个词

// 生产者 通道关联交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
        String msg = "6666";
        channel.basicPublish(EXCHANGE_NAME, "yao.sms", null, msg.getBytes());
//消费者1
 // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "yao.*");
//消费者2
 // 关联队列消费者关联队列
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "hang.*");

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5679146.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存