消息队列之 RabbitMQ(二)

消息队列之 RabbitMQ(二),第1张

消息队列之 RabbitMQ(二)

文章目录
    • RabbitMQ 的常见模式
      • 简单 (simple) 模式
      • 工作 (work) 模式
      • 发布 / 订阅 (pub/sub) 模式
      • 路由 (routing) 模式
      • 主题 (Topic) 模式

RabbitMQ 的常见模式
  • 在下图中,“P”是生产者,“C”是消费者。中间的框是一个队列(保存消息的地方)。
  • 公用代码:连接到RabbitMQ服务器
public class RabbitConnection {
    public static Connection getConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // address
        factory.setHost("127.0.0.1");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");   // Virtual Host默认节点

        return factory.newConnection();
    }
}
简单 (simple) 模式

simple 模式,是 RabbitMQ 几种模式中最简单的一种模式,其结构如下图所示:

  • simple模式有以下几个特征:
    • 只有一个生产者、一个消费者和一个队列。
    • 生产者和消费者在发送和接收消息时,只需要指定队列名,而不需要指定发送到哪个 Exchange,RabbitMQ 服务器会自动使用 Virtual host 的默认的 Exchange,默认 Exchange 的 type 为 direct。

生产者

public class Send {
    private static final String QUEUE_NAME = "simple_mq";  //消息队列名称

    public static void main(String[] args) throws Exception {
        //获取连接
        Connection connection = RabbitConnection.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 发送的消息
        String message = "hello,I'm is simple_mq.Who are you?";
        // 信道发送消息
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent.......... '" + message + "'");
    }
}

消费者

public class ReceiV {
    private static final String QUEUE_NAME = "simple_mq";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitConnection.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }
}
工作 (work) 模式

在 simple 模式下只有一个生产者和消费者,当生产者生产消息的速度大于消费者的消费速度时,我们可以添加一个或多个消费者来加快消费速度,这种在 simple 模式下增加消费者的模式,称为 work 模式,如下图所示:

  • work 模式有以下两个特征:
    • 可以有多个消费者,但一条消息只能被一个消费者获取。
    • 发送到队列中的消息,由服务器平均分配给不同消费者进行消费。

生产者

public class Send {
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消费者发送确认消息(ACK)之前,只发送一个消息给你
        channel.basicQos(1);
        
        for (int i = 0; i < 50; i++) {
            String message = "work+" + i;
            System.out.println(message);
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            Thread.sleep(i * 20);
        }
    }
}
channel.basicQos()

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
  参数:
	prefetchSize:消息的大小
	prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
	global:是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别

消费者

public class ReceiveOne {
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);  //需要修改

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [ConsumerOne is] Received '" + message + "'");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  //这里是手动应答
            }
        };
        boolean autoAck = false;  //这里需要修改自动应答为false
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
    }
}
public class ReceiveTwo {
    private static final String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws Exception {
        //建立连接
        Connection connection = RabbitConnection.getConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 不公平分发
        channel.basicQos(1);  //需要修改

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [ConsumerTwo is] Received '" + message + "'");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  //这里是手动应答
            }
        };
        boolean autoAck = false;  //这里需要修改自动应答为false
        //监听队列
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
    }
}
channel.basicAck()

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
  参数:
	deliveryTag:该消息的index
	multiple:是否批量处理.true:将一次性ack所有小于deliveryTag的消息
	requeue:被拒绝的是否重新入队列
发布 / 订阅 (pub/sub) 模式

work 模式可以将消息转到多个消费者,但每条消息只能由一个消费者获取,如果我们想一条消息可以同时给多个消费者消费呢?这时候就需要发布 / 订阅模式,其示意图如下所示:

  • 从上面的示意图我们可以看出来,在发布 / 订阅模式下,需要指定发送到哪个 Exchange 中。
    • 发布 / 订阅模式中,Exchange 的 type 为 fanout。
    • 生产者发送消息时,不需要指定具体的队列名,Exchange 会将收到的消息转发到所绑定的队列。
    • 消息被 Exchange 转到多个队列,一条消息可以被多个消费者获取。
  • 在上图中,oneQueue 中的消息要么被 CustomerA 获取,要么被 CustomerB 获取。也就是同一条消息,要么是 CustomerA + CustomerC 消费、要么是 CustomerB + CustomerC 消费。

生产者

public class Send {
    private static final String EXCHANGE_NAME = "test_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitConnection.getConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //send a msg
        String msg = "hello exchange";

        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());

        System.out.println("send:" + msg);
        channel.close();
        connection.close();
    }
}

消费者

public class ReceiveOne {
    private static final String QUEUE_NAME = "oneQueue";
    private static final String EXCHANGE_NAME = "test_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        //队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);
        //绑定队列到交换机转发器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //这里需要绑定exchange,其他的和前面的work_queue是一样的

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [ConsumerOne is] Received '" + message + "'");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  //这里是手动应答
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });
    }
}
public class ReceiveTwo {
    private static final String QUEUE_NAME = "twoQueue";
    private static final String EXCHANGE_NAME = "test_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [ConsumerTwo is] Received '" + message + "'");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  //这里是手动应答
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });
    }
}
路由 (routing) 模式

前面几种模式,消息的目标队列无法由生产者指定,而在路由模式下,消息的目标队列,可以由生产者指定,其示意图如下所示:

  • 从上面的示意图我们可以看出来:
    • 路由模式下Exchange的 type 为direct。
    • 消息的目标队列可以由生产者按照routingKey规则指定。
    • 消费者通过BindingKey绑定自己所关心的队列。
    • 一条消息队可以被多个消息者获取。
    • 只有RoutingKey与BidingKey相匹配的队列才会收到消息。

RoutingKey用于生产者指定Exchange最终将消息路由到哪个队列,BindingKey用于消费者绑定到某个队列。

生产者

public class Send {
    private static final String EXCHANGE_NAME = "test_exchange_direct_routing";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        //send a msg
        String msg = "hello direct";
        String routingKey = "info";  //这里是定义的routingKey
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
        System.out.println("send:" + msg);
        channel.close();
        connection.close();
    }
}

消费者

// 只能接收error
public class ReceiveOne {
    private static final String QUEUE_NAME = "receive1_queue_routing";
    private static final String EXCHANGE_NAME = "test_exchange_direct_routing";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);
        String routingKey = "error";  //表示只能接收error
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [ConsumerOne is] Received '" + message + "'");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  //这里是手动应答
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });
    }
}
// 能接收error,info 和 warning
public class ReceiveTwo {

    private static final String QUEUE_NAME = "receive2_queue_routing";
    private static final String EXCHANGE_NAME = "test_exchange_direct_routing";


    public static void main(String[] args) throws Exception {
        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicQos(1);
        // 设置了三种key
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); //注意这里,我们设置了三个路由key
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [ConsumerTwo is] Received '" + message + "'");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  //这里是手动应答
            }
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
        });
    }
}
主题 (Topic) 模式

主题模式是在路由模式的基础上,将路由键和某模式进行匹配。其中#表示匹配多个词,*表示匹配一个词,消费者可以通过某种模式的 BindKey 来达到订阅某个主题消息的目的,如示意图如下所示:

  • 从上面的示意图我们可以看出来:
    • 主题模式 Exchange 的 type 取值为 topic。
    • 一条消息可以被多个消费者获取。

生产者

public class Send {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        //获取连接
        Connection connection = RabbitConnection.getConnection();
        //创建频道
        try (Channel channel = connection.createChannel()) {
            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            
            Map bindingKeyMap = new HashMap<>();
            bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
            bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
            bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
            bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
            bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
            bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
            bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
            bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");

           for (Map.Entry bindingKeyEntry: bindingKeyMap.entrySet()){
               String bindingKey = bindingKeyEntry.getKey();
               String message = bindingKeyEntry.getValue();
               channel.basicPublish(EXCHANGE_NAME,bindingKey, null, message.getBytes("UTF-8"));
               System.out.println("生产者发出消息" + message);
           }
        }
    }
}

消费者

public class ReceiveOne {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //声明 Q1 队列与绑定关系
        String queueName="Q1";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
           System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+
                    delivery.getEnvelope().getRoutingKey()+",消息:"+message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
    }
}
public class ReceiveTwo {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        Connection connection = RabbitConnection.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //声明 Q2 队列与绑定关系
        String queueName="Q2";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
        System.out.println("等待接收消息........... ");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+
            		delivery.getEnvelope().getRoutingKey()+",消息:"+message);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
    }
}

你知道的越多,你不知道的越多。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5618720.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-15

发表评论

登录后才能评论

评论列表(0条)

保存