RabbitMQ-Java-01-简单队列

RabbitMQ-Java-01-简单队列,第1张

RabbitMQ-Java-01-简单队列 说明
  • RabbitMQ-Java-01-简单队列
  • 本案例是一个Maven项目
  • 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/
*** 作步骤 》安装RabbitMQ
  • 可自行安装,也可参考我的相关教程(CentOS7离线安装RabbitMq),本章假设你已经安装好了RabbitMQ。
》搭建环境
  • idea创建一个空项目
  • 创建一个Maven管理的module
  • pom.xml添加插件:指定JDK编译版本(为了支持lambda表达式,如果不手动添加后期idea报错根据提示会自动添加好)
    
        org.apache.maven.plugins
        maven-compiler-plugin
        
            8
            8
        
    
    
  • pom.xml添加依赖:RabbitMQ相关
    
        com.rabbitmq
        amqp-client
        5.13.1
    
    
    
    
        commons-io
        commons-io
        2.11.0
    
    
    
》简单案例
  • 说明
    • 我将代码分成三部分:初始化、消费者、生产者。多一层拆分思路更清晰明朗便于理解。
  • 代码组成
    • 初始化类:Initialization
      package cn.cnyasin.rabbit.hello;
      
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      
      
      public class Initialization {
          // 交换机名
          public static final String EXCHANGE_NAME = "exchange01";
      
          // 队列名
          public static final String QUEUE_NAME = "queue01";
      
          // 路由key
          public static final String ROUTING_KEY = "routing01";
      
          public static void main(String[] args) throws Exception {
              // 创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
      
              // 配置
              factory.setHost("192.168.3.202");
              factory.setPort(5672);
              factory.setUsername("admin");
              factory.setPassword("123456");
              factory.setVirtualHost("/");
      
              // 获取连接
              Connection connection = factory.newConnection();
      
              // 获取信道
              Channel channel = connection.createChannel();
      
              // 声明交换机
              channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
      
              // 声明队列
              channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      
              // 绑定队列、交换机、路由key
              channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
      
              connection.close();
      
              System.out.println("初始化成功。。。");
          }
      }
      
      
    • 消费者类:Consumer
      package cn.cnyasin.rabbit.hello;
      
      import com.rabbitmq.client.*;
      
      
      public class Consumer {
      
          // 队列名
          public static final String QUEUE_NAME = "queue01";
      
          public static void main(String[] args) throws Exception {
              // 创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
      
              factory.setHost("192.168.3.202");
              factory.setPort(5672);
              factory.setUsername("admin");
              factory.setPassword("123456");
              factory.setVirtualHost("/");
      
              // 创建连接
              Connection connection = factory.newConnection();
      
              // 获取信道
              Channel channel = connection.createChannel();
      
              // 消费队列中的消息
              channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
          }
      
          // 接收消息回调方法
          public static DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
              System.out.println(" [*] 成功处理消息:" + new String(message.getBody()));
          };
      
          // 拒绝消息回调方法
          public static CancelCallback cancelCallback = (String consumerTag) -> {
              System.out.println("消费消息失败");
          };
      }
      
    • 生产者类:Producer
      package cn.cnyasin.rabbit.hello;
      
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      import com.rabbitmq.client.ConnectionFactory;
      
      
      public class Producer {
      
          // 交换机名
          public static final String EXCHANGE_NAME = "exchange01";
      
          // 路由key
          public static final String ROUTING_KEY = "routing01";
      
          public static void main(String[] args) throws Exception {
              // 创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
      
              // 配置信息
              factory.setHost("192.168.3.202");
              factory.setPort(5672);
              factory.setUsername("admin");
              factory.setPassword("123456");
              factory.setVirtualHost("/");
      
              // 创建连接
              Connection connection = factory.newConnection();
      
              // 获取信道
              Channel channel = connection.createChannel();
      
              // 发送消息
              channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello.".getBytes());
      
              connection.close();
      
              System.out.println("消息发送成功。。。");
          }
      }
      
  • 运行初始化:Initialization -> main -> run
  • 运行消费者:Consumer -> main -> run
  • 运行生产者:Producer -> main -> run
备注
  • 该教程部分内容收集自网络,感谢原作者。
附录

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5700095.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存