mq学习笔记

mq学习笔记,第1张

mq学习笔记 背景

在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

常见MQ的类型 rabbitmq对java支持良好,对其他语言也支持良好,跨平台,语言无关activeMQ对java支持良好,缺点是对其他语言支持不够友好,适合中小企业系统kaffka日志消息中间件 支持大数据场景

4、不应使用MQ的场景:调用方实时依赖执行结果(上游需要关注执行结果)的业务场景,也就是上游实时关注执行结果

功能特性

异步、解耦、流量削峰

  1. 是一个独立运行的服务。生产者发送消息,消费者接受消息,需要先跟服务器建立连接;
  2. 采用消息队列作为数据结构,有先进先出的特点;
  3. 具有发布订阅(publish/subscribe)的模型,消费者可以获取自己需要的消息。
带来的问题
  1. 运维成本高
  2. 系统可用性降低
  3. 系统复杂性提高
MQ使用场景
  1. 数据驱动的任务依赖
  2. 上游不关心多下游执行结果
  3. 异步返回执行时间长
AMQP核心概念

AMQP(Advanced Message Queuing Protocol)

Server

又称Broker,接收客户端连接,实现AMQP实体服务

Connection:

连接,应用程序与Broker的网络连接

Channel

网络信道,几乎所有的 *** 作都是在这上面进行,是消息读写的通道,客户端可以建立多个,每个代表一个会话。

Message

消息,服务器与应用程序之间传送数据的载体,由Properties和Body组成。Properties可以对消息进行修饰,比如优先级、延迟等高级特性,Body是消息体内容。

Vhost

虚拟主机,用于逻辑隔离,最上层的消息路由。可以有多个,一般由项目模块划分

Exchange

交换机,接收消息,根据路由键转发消息到绑定的队列

Binding

Exchange与Queue之间的虚拟连接,binding中可以包含routing key

RoutingKey

一个路由规则,虚拟机可以用它来确定如何路由一个特定的消息

Queue

Message Queue,消息队列,保存消息并将它们转发给消费者

JMS认识
  1. MQ实现参照了jms规范,(规范就是一种约定)该规范中包括

  2. 提供者:实现jms规范的中间件服务器

  3. 客户端:发送或者接受消息的应用程序

  4. 生产者/发布者:创建并发送消息的客户端

  5. 消费者/订阅者:接受并处理消息的客户端

  6. 消息:应用程序之间传递的内容

  7. 消息模式:在客户端之间传递消息的方式,jms中定义了主题和队列两种模式

    主题模式:假如发布者发布了100条消息,那么如果有n个订阅者,每个订阅者都可以获取到100条消息。

    队列模式:假如生产者发送了100条消息,如果有n个消费者,那么每个消费者加起来获取到的消息总数是100,一个消息只能被一个消费者消费。

  8. 优点

    解耦:上下游逻辑+物理解耦,除了与MQ有物理连接,模块之间都不相互依赖;将消息写入消息队列,新增一个下游消息关注方,上游不需要修改任何代码

    异步:上游执行时间短,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

    削峰:系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。

  9. 缺点

    系统可用性降低:系统更复杂,多了一个MQ组件

    消息传递路径更长,延时会增加

    消息可靠性和重复性互为矛盾,消息不丢不重难以同时保证

    上游无法知道下游的执行结果,这一点是很致命的

  10. 什么叫消息队列
    消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。 消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

RabbitMQ
  1. Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑
  2. ConnectionFactory为Connection的制造工厂
  3. Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务 *** 作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
  4. Message acknowledgment:消息确认,消费者再消费完一条消息后会发送一个回执确认已经消费的信息到MQ,MQ则将该消息从队列中移除
  5. Message durability:消息持久化,到MQ服务宕机时保证未被消费的消息不丢失。
  6. Prefetch count:预取计数,消息一条一条推送,消费完一条再推送一条。
  7. Exchange:交换器,生产者将消息发送到Exchange(交换器,下图中的X),由Exchange将消息路由到一个或多个Queue中(或者丢弃)
  8. routing key:路由选择键,在发送消息给Exchange时,通过指定routing key来决定消息流向哪里,与Exchange Type及binding key联合使用才能最终生效
  9. Binding:RabbitMQ中通过Binding将Exchange与Queue关联起来,这样RabbitMQ就知道如何正确地将消息路由到指定的Queue了
交换机类型

http://tryrabbitmq.com/ 模拟画图工具

Direct

直连交换机,使用明确的绑定键,适用于业务明确的场景。一个队列与直连类型交换机绑定,需要指定一个明确的绑定键(binding key),生产者发送消息时会携带一个路由键(routing key)。当消息的路由键与某个队列的绑定键完全匹配时,消息才会从交换机路由到这个队列上,多个队列也可以使用相同的绑定键。消费者可以从队列中获取消息进行消费。

示例代码(原生) pom

引入依赖

        
            com.rabbitmq
            amqp-client
            5.6.0
        
消费者
import com.rabbitmq.client.*;
import java.io.IOException;


public class MyConsumer {
    // 交换机名称
    public final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";
    // 队列名称
    public final static String QUEUE_NAME    = "SIMPLE_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("8.134.132.250");
        // 默认监听端口
        factory.setPort(5672);
        // 虚拟机
        factory.setVirtualHost("/");
        // 设置访问的用户
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 声明交换机
        // String exchange, String type, boolean durable, boolean autoDelete, Map arguments
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, false, null);

        // 声明队列
        // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" Waiting for message....");

        // 绑定队列和交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "tiger.best");

        // 创建消费者(依据信道channel来创建 )
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("Received message : '" + msg + "'");
                System.out.println("consumerTag : " + consumerTag);
                System.out.println("deliveryTag : " + envelope.getDeliveryTag());
            }
        };

        // 开始从队列中获取消息
        // String queue, boolean autoAck, Consumer callback
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;


public class MyProducer {
    // 交换机名称
    private final static String EXCHANGE_NAME = "SIMPLE_EXCHANGE";

    public static void main(String[] args) throws Exception {
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 连接IP
        factory.setHost("8.134.132.250");
        // 连接端口
        factory.setPort(5672);
        // 虚拟机
        factory.setVirtualHost("/");
        // 用户
        factory.setUsername("guest");
        factory.setPassword("guest");

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 发送消息
        String msg = "Hello world, Rabbit MQ!!!";
        // String exchange, String routingKey, BasicProperties props, byte[] body
        channel.basicPublish(EXCHANGE_NAME, "tiger.best", null, msg.getBytes());

        // 关闭,释放资源
        channel.close();
        conn.close();
    }
}
代码示例(死信队列)

实订单延时关闭,消息过期属性(x-message-ttl),通过设置这个属性,超过指定时间后,消息会被丢弃;通过

消费者
package com.tiger.dlx;

import com.rabbitmq.client.*;
import com.tiger.util.ResourceUtil;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;


public class DlxConsumer {

    // 死信交换机
    public final static String DEAD_LETTER_EXCHANGE = "DEAD_LETTER_EXCHANGE";
    // 死信队列
    public final static String DEAD_LETTER_QUEUE    = "DEAD_LETTER_QUEUE";
    // 正常队列
    public final static String ORI_USE_QUEUE        = "ORI_USE_QUEUE";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        // 指定队列的死信交换机
        Map arguments = new HashMap();
        arguments.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 声明队列(默认交换机AMQP default,Direct),并指定死信交换机
        channel.queueDeclare(ORI_USE_QUEUE, false, false, false, arguments);

        // 声明死信交换机
        channel.exchangeDeclare(DEAD_LETTER_EXCHANGE, "topic", false, false, false, null);
        // 声明死信队列
        channel.queueDeclare(DEAD_LETTER_QUEUE, false, false, false, null);
        // 绑定死信交换机和死信队列,设置为 #,无条件接收
        channel.queueBind(DEAD_LETTER_QUEUE, DEAD_LETTER_EXCHANGE, "#");

        System.out.println(" Waiting for message....");

        // 创建消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("Received message : '" + msg + "'");
                System.out.println("consumerTag : " + consumerTag);
                System.out.println("deliveryTag : " + envelope.getDeliveryTag());
            }
        };

        // 监听正常队列
//        channel.basicConsume(ORI_USE_QUEUE, true, consumer);
        // 开始获取消息,监听死信队列
        channel.basicConsume(DEAD_LETTER_QUEUE, true, consumer);
    }
}
生产者
package com.tiger.dlx;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.tiger.util.ResourceUtil;


public class DlxProducer {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();

        String msg = "Hello world, Rabbit MQ, DLX MSG";

        // 设置属性,消息10秒钟过期
        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                // 持久化消息
                .deliveryMode(2)
                .contentEncoding("UTF-8")
                // TTL 10秒过期
                .expiration("10000")
                .build();

        // 发送消息,默认交换机
        channel.basicPublish("", DlxConsumer.ORI_USE_QUEUE, properties, msg.getBytes());

        channel.close();
        conn.close();
    }
}
消费端限流

channel.basicQos(prefetchCount);

import com.rabbitmq.client.*;
import com.tiger.util.ResourceUtil;
import java.io.IOException;
import java.util.concurrent.TimeUnit;


public class LimitConsumerSlow {
    private final static String QUEUE_NAME = "TEST_LIMIT_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));

        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        final Channel channel = conn.createChannel();

        // 声明队列(默认交换机AMQP default,Direct)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("LimitConsumerSlow Waiting for message....");

        // 创建消费者,并接收消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                try {
                    // 睡眠5秒钟
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("LimitConsumerSlow Received message : '" + msg + "'");
                channel.basicAck(envelope.getDeliveryTag(), true);
            }
        };

        // 非自动确认消息的前提下,
        // 如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,
        // 不进行消费新的消息。因为LimitConsumerSlow的处理速率很慢,
        // 收到两条消息后都没有发送ACK,队列不会再发送消息给Consumer2
        channel.basicQos(2);
        channel.basicConsume(QUEUE_NAME, false, consumer);
    }
}
Topic

主题交换机,一个队列与主题类型交换机绑定时,可以在绑定键中使用通配符。支持两个通配符:

#代表0个或多个单词

*代表不多不少一个单词

单词指的是用英文的点“.”隔开的字符。例如 hello.tiger.yes 是3个单词

适用于一些根据业务主题或消息登记过滤的场景,比如或一条消息可能和资金有关,又和风控有关,那么就可以设置一个多级路由键。第一个单词和资金有关,第二个单词和风控有关。

Fanout

广播交换机,队列与交换机不需要绑定键,因此生产者发送消息到交换机也不用携带路由键,当消息到达交换机时,所有与之绑定的队列都会收到相同的消息副本。

消息可靠投递 事务模式
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.tiger.util.ResourceUtil;


public class TransactionProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();
        String msg = "Hello world, Rabbit MQ";
        // 声明队列(默认交换机AMQP default,Direct)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        try {
            // 开启事务模式
            channel.txSelect();
            // 发送消息,发布了4条,但只确认了3条
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            // 提交事务
            channel.txCommit();

            channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());
            // 模拟异常,消息投递不成功
            int i = 1 / 0;
            channel.txCommit();
            System.out.println("消息发送成功");
        } catch (Exception e) {
            // 消息回滚
            channel.txRollback();
            System.out.println("消息已经回滚");
        }
        channel.close();
        conn.close();
    }
}
确认模式 Confirm 普通确认
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.tiger.util.ResourceUtil;


public class Normal/confirm/iProducer {

    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();
        String msg = "Hello world, Rabbit MQ ,Normal /confirm/i";
        // 声明队列(默认交换机AMQP default,Direct)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 开启发送方确认模式
        channel./confirm/iSelect();
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
        // 普通/confirm/i,发送一条,确认一条
        if (channel.waitFor/confirm/is()) {
            System.out.println("消息发送成功");
        } else {
            System.out.println("消息发送失败");
        }
        channel.close();
        conn.close();
    }
}
批量确认
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.tiger.util.ResourceUtil;


public class Batch/confirm/iProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();
        String msg = "Hello world, Rabbit MQ ,Batch /confirm/i";
        // 声明队列(默认交换机AMQP default,Direct)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        try {
            channel./confirm/iSelect();
            for (int i = 0; i < 5; i++) {
                // 发送消息
                channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes());
            }
            // 批量确认结果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被确认了
            // 比如5条消息可能只收到1个ACK,也可能收到2个(抓包才看得到)
            // 直到所有信息都发布,只要有一个未被Broker确认就会IOException
            channel.waitFor/confirm/isOrDie();
            System.out.println("消息发送完毕,批量确认成功");
        } catch (Exception e) {
            // 发生异常,可能需要对所有消息进行重发
            // TODO
            e.printStackTrace();
        }
        channel.close();
        conn.close();
    }
}
异步确认
import com.rabbitmq.client.Channel;
import com.rabbitmq.client./confirm/iListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.tiger.util.ResourceUtil;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;


public class Async/confirm/iProducer {
    private final static String QUEUE_NAME = "ORIGIN_QUEUE";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
        // 建立连接
        Connection conn = factory.newConnection();
        // 创建消息通道
        Channel channel = conn.createChannel();
        String msg = "Hello world, Rabbit MQ, Async /confirm/i";
        // 声明队列(默认交换机AMQP default,Direct)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 用来维护未确认消息的 deliveryTag
        final SortedSet confirmSet = Collections.synchronizedSortedSet(new TreeSet());

        // 异步监听确认和未确认的消息
        // 这里不会打印所有响应的ACK:ACK可能有多个,有可能一次确认多条,也有可能一次确认一条
        // 如果要重复运行,先停掉之前的生产者,清空队列
        channel.add/confirm/iListener(new /confirm/iListener() {
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                System.out.println("Broker未确认消息,标识:" + deliveryTag);
                if (multiple) {
                    // headSet表示后面参数之前的所有元素,全部删除
                    /confirm/iSet.headSet(deliveryTag + 1L).clear();
                } else {
                    /confirm/iSet.remove(deliveryTag);
                }
                // 这里添加重发的方法
            }

            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                // 如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,如果为false的话表示单条确认
                System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
                if (multiple) {
                    // headSet表示后面参数之前的所有元素,全部删除
                    /confirm/iSet.headSet(deliveryTag + 1L).clear();
                } else {
                    // 只移除一个元素
                    /confirm/iSet.remove(deliveryTag);
                }
                System.out.println("未确认的消息:" + /confirm/iSet);
            }
        });

        // 开启发送方确认模式
        channel./confirm/iSelect();
        for (int i = 0; i < 10; i++) {
            long nextSeqNo = channel.getNextPublishSeqNo();
            // 发送消息
            channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes());
            /confirm/iSet.add(nextSeqNo);
        }
        System.out.println("所有消息:" + /confirm/iSet);

        // 这里注释掉的原因是如果先关闭了,可能收不到后面的ACK
        //channel.close();
        //conn.close();
    }
}
代码实战案例
https://gitee.com/tiger2050/springboot-mq-demo

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5697068.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存