RabbitMQ初步学习(Mac)

RabbitMQ初步学习(Mac),第1张

RabbitMQ初步学习(Mac) 1.RabbitMQ学习:

1.简介
2.安装
3.使用

  • 3.1.创建简单列队
  • 3.2.创建工作列队
  • 3.3.创建订阅列队
  • 3.4.创建路由列队
  • 3.5.创建主题列队
  • 3.6.事务
  • 3.7.确认模式
    • 3.7.1.同步确认
    • 3.7.2.异步确认
  • 使用springBoot 简单的实现AMQP



2.MQ简介:

​ 在计算机科学中,消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的序列用来处理一系列的输入,通常是来自用户的。消息队列提供了异步的通信协议,每一个序列中的记录包含了详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息保存在队列中,直到接收者取回它。

2.1、实现:

消息队列常常保存在链表结构中。拥有权限的进程才可以向消息队列中写入或读取消息

目前,有很多消息队列有很多的实现,包括 JBoss Messing、JORAM、Apache、ActiveMQ、SunPoen Message Queue、IBM MQ、Apache Qpid和HTTPSQS

当前使用较多的消息队列有:RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、metaMq等而部分数据库如:Redis,Mysql,以及phxsql也可以实现消息队列的功能。

2.2、特点:

MQ是消费者-生产者模型中的一个典型代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。

注意:

1.AMDP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与中间件可传递消息,并不受客户端/中间件影响,不同的开发语言等条件的限制。

2.AMS,即java消息服务(java Message Service)应用程序接口,是一个java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供服务。常见的消息列队,大部分都实现了JMI API 如:ActiveMQ,Redis以及RabbitMQ 等

2.3、优缺点: 优点:

应用耦合、异步处理、流量削峰

  • 解耦:

    传统模式

传统模式缺点:

系统间耦合性太强,如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将D系统接入,系统A还需要修改代码,太麻烦!

中间件模式:

中间件模式优点:

将消息写入列队,需要消息的系统自己从消息列队中订阅,从而系统A不需要做如何修改。

  • 异步

    传统模式:

    传统模式缺点:

    ​ 一些非必要的业务逻辑以同步的方式运行,太耗费时间。

    中间件模式:

中间件模式优点:

​ 使用消息队列发送消息,减少耗时。

  • 削峰

    传统模式:

传统模式缺点:

并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常。

中间件模式:

中间件模式的优点:

系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的最高峰期积压是允许的。

缺点:

系统可用性低、系统复杂性增加

2.4、使用场景:

​ 消息列队,是分布式系统中重要的组件,其通用的使用场景可以简单的描述为:当不需要立即获取结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息列队的时候。

​ 在项目中,将一些无需及时返回且耗时的 *** 作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器请求的响应时间,从而提高了系统的吞吐量。

2.5、为什么使用RabbitMQ:

​ AMQP,即Advanced Meassage Queueing Protocol,高级消息列队协议,是应用层的一个开发标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息的使用者的存在,反之亦然。

​ AMQP的主要特征是面向消息、列队、路由(包括点对点和发布/订阅)、可靠性、安全。

​ RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、Actionscript、XMPP、STOMP等,支持AJAX。用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

​ 总结如下

  1. 基于AMQP协议
  2. 高并发(服务器可以接受最大任务数量)
  3. 高性能(单位时间内服务器可以处理的任务数)
  4. 高可用(单位时间内的服务器可以正常工作的时间比例)
  5. 强大的社区支持,以及很多公司都在使用
  6. 支持插件
  7. 支持多语言
3.安装: 3.1、安装erlang:(Mac系统)

brew install erlang

erlang对应Rabbit版本:


测试erlang是否安装成功:

3.2、安装rabbitMQ:(Mac系统)

官网下载地址:https://www.rabbitmq.com/download.html

开启RabbitMQ图形化管理界面插件:rabbitmq-plugins enable rabbitmq_management、关闭RabbitMQ图形化管理界面插件:rabbitmq-plugins disable rabbitmq_management

  • 使用rabbitmq-plugins list指令查看 rabbitmq 的插件启动情况:

开启RabbitMQ服务rabbitmq-service、关闭RabbitMq服务rabbitmqctl stop

在浏览器访问localhost:15672进入rabbitmq图形界面管理登陆系统:

默认用户名:guest ,默认密码: guest

登陆之后进入rabbitmq图形界面管理系统:


4.使用RabbitMQ: 4.1、添加一个名称为/web的虚拟主机:


创建成功:

每次创建虚拟主机guest用户会默认加入虚拟主机

4.2、添加一个名称为web的用户:

添加成功:

4.3、将web用户添加到虚拟主机:

添加成功:

4.4、使用java代码实现AMQP:

导入依赖

		
            com.rabbitmq
            amqp-client
            5.4.3
        
4.4.1、创建简单列队:

简单列队:生产者将消息发送到“hello”队列。消费者从该队列接收消息。

4.4.1.1:创建简单列队生产者:
public class Send {
    //定义队列名称
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
      	//端口号
        connectionFactory.setPort(5672);
      	//用户名
        connectionFactory.setUsername("web");
      	//用户密码
        connectionFactory.setPassword("web");
      	//虚拟主机名
        connectionFactory.setVirtualHost("/web");
        //创建连接
        try (Connection connection = connectionFactory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()){
            
             channel.queueDeclare(QUEUE_NAME,false,false,false,null);
             //准备消息
             String message = "Hello world";
             //发送消息
             channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
             System.out.println(message);
        }
    }
}

启动生产者服务:

消息堵塞:

4.4.1.2:创建简单列队消费者:
public class Recv {
    //定义队列名称
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
      	//端口号
        connectionFactory.setPort(5672);
      	//用户名
        connectionFactory.setUsername("web");
      	//用户密码
        connectionFactory.setPassword("web");
      	//虚拟主机名
        connectionFactory.setVirtualHost("/web");
        //创建信道
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //绑定队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //打印消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
           String message = new String(delivery.getBody(),"UTF-8");
            System.out.println(message);
        };
        
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{});
    }
}

启动消费者:

消费者消费消息:

4.4.2:创建工作列队

工作列队:(一个生产者对应多个消费者,但是只能有一个消费者获得消息!!!)

4.4.2.1:创建工作列队-生产者:
public class Send {
    //定义队列名称
    private static final String QUEUE_NAME = "work_fair";
    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("web");
        connectionFactory.setPassword("web");
        connectionFactory.setVirtualHost("/web");
        //创建连接
        try (Connection connection = connectionFactory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()){
         
             channel.queueDeclare(QUEUE_NAME,false,false,false,null);
             //准备消息
             String message = "Hello world";
             //发送消息
            for (int i = 0; i < 20; i++) {
                channel.basicPublish("",QUEUE_NAME,null,(message+i).getBytes(StandardCharsets.UTF_8));
                System.out.println(message+i);
            }
        }
    }
}

启动生产者:

消息堵塞:

4.4.2.2:创建工作列队-消费者01
public class Recv01 {
    //定义队列名称
    private static final String QUEUE_NAME = "work_fair";
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("web");
        connectionFactory.setPassword("web");
        connectionFactory.setVirtualHost("/web");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //绑定队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //限制消费1条消息,消费完在继续消费下一天消息(限流)
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);
        //打印消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
           String message = new String(delivery.getBody(),"UTF-8");
            System.out.println(message);
            //消费者01接收一条消息后休眠10毫秒
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        
        channel.basicConsume(QUEUE_NAME,false,deliverCallback,consumerTag->{});
    }
}

启动消费者:每个消费者每隔10 毫秒取一条消息

消费者01:成功取得列队消息->

消费者02:成功取得列队消息->

消费者02代码和消费者01一样

4.4.3:发布订阅列队

一个生产者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。

4.4.3.1:创建发布列队-生产者:
public class Send {
    //定义发布队列名称
    private static final String EXCHANGE_NAME = "exchange_fanout";

    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("web");
        connectionFactory.setPassword("web");
        connectionFactory.setVirtualHost("/web");
        //创建连接
        try (Connection connection = connectionFactory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()){
            
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
             //准备消息
             String message = "Hello world";
             //发送消息
             channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
             System.out.println(message);
        }
    }
}

启动成功:成功将消息绑定到交换机上

4.4.3.2:创建发布列队-消费者:
public class Recv01 {
    //定义订阅队列名称
    private static final String EXCHANGE_NAME = "exchange_fanout";
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("web");
        connectionFactory.setPassword("web");
        connectionFactory.setVirtualHost("/web");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        //声明队列,排他队列
        String queue = channel.queueDeclare().getQueue();
        //队列和交换机绑定
        channel.queueBind(queue,EXCHANGE_NAME,"");
        //打印消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
           String message = new String(delivery.getBody(),"UTF-8");
            System.out.println(message);
        };
        
        channel.basicConsume(queue,true,deliverCallback,consumerTag->{});
    }
}

启动订阅消费者:所有订阅消费者都可以获得消息

4.4.4:路由列队:

生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。

也就是让消费者有选择性的接收消息。

4.4.4.1:创建路由列队-生产者:
public class Send {
    //定义队列名称
    private static final String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("web");
        connectionFactory.setPassword("web");
        connectionFactory.setVirtualHost("/web");
        //创建连接
        try (Connection connection = connectionFactory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()){
            
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
             //准备消息
             String message = "Hello world";
             
             channel.basicPublish(EXCHANGE_NAME,"orange",null,message.getBytes(StandardCharsets.UTF_8));
          	 channel.basicPublish(EXCHANGE_NAME,"green",null,message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

启动路由生产者:

会将消息发送到名为EXCHANGE_NAME的交换机中,分别将消息key设置为orange和green

4.4.4.2:创建路由列队-消费者:
public class Secv01 {
    //定义队列名称
    private static final String EXCHANGE_NAME = "exchange_direct";
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("web");
        connectionFactory.setPassword("web");
        connectionFactory.setVirtualHost("/web");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //声明队列,排他队列
        String queue = channel.queueDeclare().getQueue();
        //队列和交换机绑定
        channel.queueBind(queue,EXCHANGE_NAME,"black");
        channel.queueBind(queue,EXCHANGE_NAME,"green");
        //打印消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
           String message = new String(delivery.getBody(),"UTF-8");
            System.out.println(message);
        };
        
        channel.basicConsume(queue,true,deliverCallback,consumerTag->{});
    }
}

启动消费者:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HXBlxAVB-1634903973672)(/Users/haoruijie/Library/Application Support/typora-user-images/image-20211022135436669.png)]

消费者01会取到交换机名为EXCHANGE_NAME,key值为green和orange 而消费者02只会收到key值为orange的消息


4.4.5:主题路由列队:(使用最多的模式,通过模糊匹配,使得 *** 作更加自如)

通过通配符模式来判断路由key通俗的来讲就是模糊匹配。

*.匹配一个字符 #.匹配所有字符

4.4.5.1:创建主题路由列队-生产者:
public class Send {
    //定义队列名称
    private static final String EXCHANGE_NAME = "exchange_topic";

    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("web");
        connectionFactory.setPassword("web");
        connectionFactory.setVirtualHost("/web");
        //创建连接
        try (Connection connection = connectionFactory.newConnection();
             //创建信道
             Channel channel = connection.createChannel()){
            
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
             //准备消息
            String message1 = "Hello world1";
            String message2 = "Hello world2";
            String message3 = "Hello world3";
          	//设置交换机key值
            String routingKey1 = "quick.orange.rabbit";
            String routingKey2 = "lazy.pink.rabbit";
            String routingKey3 = "quick.hello.male";
             //发送消息
            channel.basicPublish(EXCHANGE_NAME,routingKey1,null,message1.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME,routingKey2,null,message2.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(EXCHANGE_NAME,routingKey3,null,message3.getBytes(StandardCharsets.UTF_8));
        }
    }
}

启动路由生产者:

会将消息发送到名为EXCHANGE_NAME的交换机中,分别将消息key设置为routingKey1、routingKey2和routingKey3

4.4.5.2:创建主题路由列队-消费者:
public class Recv01 {
    //定义队列名称
    private static final String EXCHANGE_NAME = "exchange_topic";
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("web");
        connectionFactory.setPassword("web");
        connectionFactory.setVirtualHost("/web");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //绑定交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //声明队列,排他队列
        String queue = channel.queueDeclare().getQueue();
        //队列和交换机绑定绑定key
        channel.queueBind(queue,EXCHANGE_NAME,"*.orange.*");
      	channel.queueBind(queue,EXCHANGE_NAME,"lazy.#");
        //打印消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
           String message = new String(delivery.getBody(),"UTF-8");
            System.out.println(message);
        };
        
        channel.basicConsume(queue,true,deliverCallback,consumerTag->{});
    }
}

启动消费者

运行结果:

消费者01只会匹配routingKey1和routingKey2的消息

4.5、事务:

使用事务会大幅度降低性能 (一般不会使用) 开启事务会知道生产者是否将消息成功提交到列队里

4.5.1创建事务列队: 4.5.1.1:创建事务列队-生产者:
public class Send {
    //定义队列名称
    private static final String QUEUE_NAME = "tx";
    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("web");
        connectionFactory.setPassword("web");
        connectionFactory.setVirtualHost("/web");
        //创建连接
        Connection connection = null;
        //创建信道
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
             //开启事务
             channel.txSelect();
             channel.queueDeclare(QUEUE_NAME,false,false,false,null);
             //准备消息
             String message = "Hello world";
             //发送消息
             channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
          	 //制造异常(遇到异常事务回滚)
          	 int a = 1/0;
             //提交事务
             channel.txCommit();
        }catch (Exception e){
            //事务回滚
            channel.txRollback();
            e.printStackTrace();
        }finally {
          	//关闭连接
            if (channel!=null){
                channel.close();
            }
            if (connection!=null){
                connection.close();
            }
        }
    }
}
4.5.1.2:创建事务列队-消费者:
public class Recv {
    //定义队列名称
    private static final String QUEUE_NAME = "tx";
    public static void main(String[] args) throws Exception{
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("yeb");
        connectionFactory.setPassword("yeb");
        connectionFactory.setVirtualHost("/yeb");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //绑定队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //打印消息
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
           String message = new String(delivery.getBody(),"UTF-8");
            System.out.println(message);
        };
        
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag->{});
    }
}

启动消费者:


启动生产者: 发现异常事务回调 消费者没有消息消费

4.6、确认模式:

(确认生产者是否把消息发送到了服务器)

4.6.1:同步确认:

(同步确认会影响性能一般不会使用)

4.6.1.1:创建同步-确认-生产者:
public class Send {
    //定义队列名称
    private static final String QUEUE_NAME = "sync";
    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("web");
        connectionFactory.setPassword("web");
        connectionFactory.setVirtualHost("/web");
        //创建连接
        Connection connection = null;
        //创建信道
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //启动确认模式
             channel./confirm/iSelect();
             channel.queueDeclare(QUEUE_NAME,false,false,false,null);
             //准备消息
             String message = "Hello world";
             //发送消息
             channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
             //普通确认,只能单条确认
             if(channel.waitFor/confirm/is()){
                 System.out.println("确认成功!");
             }
            //普通批量确认 ,如果有一条不成功就会抛异常,全部成功不会抛异常
						//channel.waitForConfirmsOrDie();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if (channel!=null){
                channel.close();
            }
            if (connection!=null){
                connection.close();
            }
        }
    }
}

启动生产者:

发送消息成功:

4.6.2:异步确认:

(异步确认效率是最高的)

4.6.2.1:创建异步-确认-生产者:
public class Send {
    //定义队列名称
    private static final String QUEUE_NAME = "async";
    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //连接工厂配置
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("yeb");
        connectionFactory.setPassword("yeb");
        connectionFactory.setVirtualHost("/yeb");
        //创建连接
        Connection connection = null;
        //创建信道
        Channel channel = null;
        try {
            final SortedSet set = Collections.synchronizedSortedSet(new TreeSet());
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            //启动确认模式
             channel./confirm/iSelect();
             channel.queueDeclare(QUEUE_NAME,false,false,false,null);
             //天际channel监听
             channel.add/confirm/iListener(new /confirm/iListener() {
                 //已确认
                 @Override
                 public void handleAck(long l, boolean b) throws IOException {
                     //b为true确认多条成功 为false确认单条成功
                     if (b){
                         System.out.println("确认多条成功");
                         set.headSet(l+1L).clear();
                     }else {
                         System.out.println("确认单条成功"+l);
                         set.remove(l);
                     }
                 }
                //未确认
                 @Override
                 public void handleNack(long l, boolean b) throws IOException {
                     //b为true多条未确认 为false单条未确认
                     if (b){
                         System.out.println("多条未确认");
                         set.headSet(l+1L).clear();
                     }else {
                         System.out.println("单体未确认"+l);
                         set.remove(l);
                     }
                 }
             });
             int i =0;
             while (i<20){
                 i++;
                 //准备消息
                 String message = "Hello world"+i;
                 Long seqNo = channel.getNextPublishSeqNo();
                 //发送消息
                 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
                 set.add(seqNo);
                 System.out.println("[x] Sent'"+message+"'");
             }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            if (channel!=null){
                channel.close();
            }
            if (connection!=null){
                connection.close();
            }
        }
    }
}

启动生产者:

确认发送消息成功:

消息发送成功:

4.7、使用springBoot 简单的实现AMQP: 4.7.1:创建springboot多模块项目:

4.7.1.1:创建消费者模块、并继承父模块: 4.7.1.2:编写消费者模块配置文件:
#配置端口号
server:
  port: 8002
spring:
  #Rabbitmq生产出配置
  rabbitmq:
    #ip
    host: 127.0.0.1
    #用户名
    username: guest
    #密码
    password: guest
    #端口
    port: 5672
4.7.1.3:编写消费者测试类:
@Component
public class Test {
    @RabbitListener(queues = "hello")
    public void demo(String hello){
        System.out.println(hello);
    }
}
4.7.1.4:编写queue配置文件:
@Configuration
public class RabbitmqConfig {
    @Bean
    public Queue queue(){
        return new Queue("hello");
    }
}
4.7.1.4:启动消费者服务:

启动成功hello列队已存在

4.7.2.1:创建生产者模块、继承父模块: 4.7.2.2:编写生产者配置文件:
#配置端口号
server:
  port: 8001

spring:
  #Rabbitmq生产出配置
  rabbitmq:
    #ip
    host: 127.0.0.1
    #用户名
    username: web
    #密码
    password: web
    #端口
    port: 5672

4.7.2.3:编写生产者测试类:
@Component
public class Test {
    @Autowired
    private RabbitTemplate template;
    public void demo(){
        template.convertAndSend("hello","hello world");
    }
}
4.7.2.4:发送消息:

发送被消费者立即消费

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/4683637.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存