- RabbitMQ-Java-01-简单队列
- 本案例是一个Maven项目
- 官方文档已包含绝大多数本案例内容。请移步:https://docs.spring.io/spring-amqp/docs/current/reference/html/
- 可自行安装,也可参考我的相关教程(CentOS7离线安装RabbitMq),本章假设你已经安装好了RabbitMQ。
- idea创建一个空项目
- 创建一个Maven管理的module
- pom.xml添加插件:指定JDK编译版本(为了支持lambda表达式,如果不手动添加后期idea报错根据提示会自动添加好)
org.apache.maven.plugins maven-compiler-plugin8 - pom.xml添加依赖:RabbitMQ相关
com.rabbitmq amqp-client5.13.1 commons-io commons-io2.11.0
- 说明
- 我将代码分成三部分:初始化、消费者、生产者。多一层拆分思路更清晰明朗便于理解。
- 代码组成
- 初始化类:Initialization
package cn.cnyasin.rabbit.hello; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Initialization { // 交换机名 public static final String EXCHANGE_NAME = "exchange01"; // 队列名 public static final String QUEUE_NAME = "queue01"; // 路由key public static final String ROUTING_KEY = "routing01"; public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 配置 factory.setHost("192.168.3.202"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); // 获取连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); // 声明队列 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 绑定队列、交换机、路由key channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY); connection.close(); System.out.println("初始化成功。。。"); } }
- 消费者类:Consumer
package cn.cnyasin.rabbit.hello; import com.rabbitmq.client.*; public class Consumer { // 队列名 public static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.3.202"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); // 创建连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); // 消费队列中的消息 channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } // 接收消息回调方法 public static DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> { System.out.println(" [*] 成功处理消息:" + new String(message.getBody())); }; // 拒绝消息回调方法 public static CancelCallback cancelCallback = (String consumerTag) -> { System.out.println("消费消息失败"); }; }
- 生产者类:Producer
package cn.cnyasin.rabbit.hello; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { // 交换机名 public static final String EXCHANGE_NAME = "exchange01"; // 路由key public static final String ROUTING_KEY = "routing01"; public static void main(String[] args) throws Exception { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 配置信息 factory.setHost("192.168.3.202"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); factory.setVirtualHost("/"); // 创建连接 Connection connection = factory.newConnection(); // 获取信道 Channel channel = connection.createChannel(); // 发送消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "hello.".getBytes()); connection.close(); System.out.println("消息发送成功。。。"); } }
- 初始化类:Initialization
- 运行初始化:Initialization -> main -> run
- 运行消费者:Consumer -> main -> run
- 运行生产者:Producer -> main -> run
- 该教程部分内容收集自网络,感谢原作者。
- 无
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)