目录
核心组件
运行原理
实现步骤
构建Gradle项目
入门案例(简单模式)
生产者(代码)
消费者(代码)
绑定交换机和队列
发布/订阅模式(Publish/Subscribe)
生产者
消费者
路由模式(routing)
生产者
消费者
主题模式(topics)
生产者
消费者
工作模式(Work queue)
轮询模式
公平分配模式
核心组件
- Server:又称Broker接受客户端的连接,实现AMQP实体服务。安装rabbitmq-server Connection:连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手
- Channel:网络信道,几乎所有的 *** 作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各 Channel,每个Channel代表一个会话任务。
- Message:消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。
- VirtualHost虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和 Queueu,同一个虚拟王机里面不能有相同名字的Exchange
- Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)
- Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.
- Routingkey:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。
- Queue:队列:也成为MessageQueue,消息队列,保存消息并将它们转发给消费者。
运行原理 实现步骤 1、JDK11
2、构建一个gradle项目
3、导入RabbitMQ的相关依赖
4、启动rabbitmq-server服务
5、定义生产者
6、定义消费者
7、查看消息在rabbitmq-server服务中的过程
Maven 同样引入相应的依赖
implementation group: 'com.rabbitmq', name: 'amqp-client', version: '5.10.0'入门案例(简单模式) 生产者(代码)
package com.any.rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) { //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp // ip port // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); String queueName = "queue1"; // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息 channel.queueDeclare(queueName,false,false,true,null); // 5、准备消息 String message = "hello,anyboot"; // 6、发送消息给 queue channel.basicPublish("",queueName,null,message.getBytes()); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }消费者(代码)
package com.any.rabbitmq.simple; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(String[] args) { //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp // ip port // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); String queueName = "queue1"; channel.basicConsume(queueName, true, new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到的消息:" + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败"); } }); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }绑定交换机和队列
该例子以生产者为例
package com.any.rabbitmq.all; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class Producer { public static void main(String[] args) { //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp // ip port // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取信道Channel channel = connection.createChannel(); // 定义交换机名称 String exchangeName = "direct-message-exchange"; // 定义交换机类型 String exchangeType = "direct"; // 定义 路由key String routingKey = "message"; channel.exchangeDeclare(exchangeName,exchangeType,true); channel.queueDeclare("queue5",true,false,false,null); channel.queueDeclare("queue6",true,false,false,null); channel.queueDeclare("queue7",true,false,false,null); channel.queueBind("queue5",exchangeName,routingKey); channel.queueBind("queue6",exchangeName,routingKey); channel.queueBind("queue7",exchangeName,"email"); // 5、准备消息 String message = "hello,message-exchange"; // 6、发送消息给 queue channel.basicPublish(exchangeName,routingKey,null,message.getBytes()); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }发布/订阅模式(Publish/Subscribe)
生产者
package com.any.rabbitmq.routing.fanout; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class Producer { // 发布订阅模式 public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); // 定义 队列名称 String queueName = "queue1"; // 定义交换机名称 String exchangeName = "fanout-exchange"; // 定义交换机类型 String exchangeType = "fanout"; // 定义 路由key String routingKey = ""; // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息 //channel.queueDeclare(queueName,true,false,true,null); // 5、准备消息 String message = "hello,anyboot"; // 6、发送消息给 queue channel.basicPublish(exchangeName,routingKey,null,message.getBytes()); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }消费者
package com.any.rabbitmq.routing.fanout; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static Runnable runnable = new Runnable() { @Override public void run() { //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp // ip port // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); channel.basicConsume( queueName , true, new DeliverCallback() { // 成功处理 @Override public void handle(String consumerTag, Delivery message) throws IOException { // 获取队列中消息的数量 System.out.println(message.getEnvelope().getDeliveryTag()); System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { // 失败处理 @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败"); } }); System.out.println(queueName+"-开始接受消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(runnable,"queue1").start(); new Thread(runnable,"queue2").start(); new Thread(runnable,"queue3").start(); } }路由模式(routing) 生产者
package com.any.rabbitmq.routing.direct; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class Producer { public static void main(String[] args) { //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp // ip port // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); // 定义 队列名称 String queueName = "queue1"; // 定义交换机名称 String exchangeName = "direct-exchange"; // 定义交换机类型 String exchangeType = "direct"; // 定义 路由key String routingKey = "email"; // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息 //channel.queueDeclare(queueName,true,false,true,null); // 5、准备消息 String message = "hello,anyboot"; // 6、发送消息给 queue channel.basicPublish(exchangeName,routingKey,null,message.getBytes()); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }消费者
package com.any.rabbitmq.routing.direct; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static Runnable runnable = new Runnable() { @Override public void run() { //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp // ip port // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); channel.basicConsume( queueName , true, new DeliverCallback() { // 成功处理 @Override public void handle(String consumerTag, Delivery message) throws IOException { // 获取队列中消息的数量 System.out.println(message.getEnvelope().getDeliveryTag()); System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { // 失败处理 @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败"); } }); System.out.println(queueName+"-开始接受消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(runnable,"queue1").start(); new Thread(runnable,"queue2").start(); new Thread(runnable,"queue3").start(); } }主题模式(topics) 生产者
package com.any.rabbitmq.routing.topics; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class Producer { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); // 定义 队列名称 String queueName = "queue1"; // 定义交换机名称 String exchangeName = "topic-exchange"; // 定义交换机类型 String exchangeType = "topic"; // 定义 路由key String routingKey = "com.course.order"; // 4、通过通道交换机,队列,绑定关系,路由key,发送消息,和接收消息 //channel.queueDeclare(queueName,true,false,true,null); // 5、准备消息 String message = "hello,anyboot"; // 6、发送消息给 queue channel.basicPublish(exchangeName,routingKey,null,message.getBytes()); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }消费者
package com.any.rabbitmq.routing.topics; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static Runnable runnable = new Runnable() { @Override public void run() { //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp // ip port // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); channel.basicConsume( queueName , true, new DeliverCallback() { // 成功处理 @Override public void handle(String consumerTag, Delivery message) throws IOException { // 获取队列中消息的数量 System.out.println(message.getEnvelope().getDeliveryTag()); System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { // 失败处理 @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败"); } }); System.out.println(queueName+"-开始接受消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(runnable,"queue1").start(); new Thread(runnable,"queue2").start(); new Thread(runnable,"queue3").start(); } }工作模式(Work queue)
轮询模式
生产者
package com.any.rabbitmq.work.polling; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class Producer { public static void main(String[] args) { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); // 定义 队列名称 String queueName = "queue8"; // 4、发送消息给 queue for (int i = 0; i < 20; i++) { String message = "hello,work-rabbitmq - " + i; channel.basicPublish("",queueName,null,message.getBytes()); } } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
work1
package com.any.rabbitmq.work.polling; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeUnit; public class Work1 { public static Runnable runnable = new Runnable() { @Override public void run() { // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); channel.basicConsume( queueName , true, new DeliverCallback() { // 成功处理 @Override public void handle(String consumerTag, Delivery message) throws IOException { // 获取队列中消息的数量 System.out.println(message.getEnvelope().getDeliveryTag()); System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8")); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { // 失败处理 @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败"); } }); System.out.println(queueName+"-开始接受消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(runnable,"queue8").start(); } }
work2
package com.any.rabbitmq.work.polling; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeUnit; public class Work2 { public static Runnable runnable = new Runnable() { @Override public void run() { //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp // ip port // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); //channel. channel.basicConsume( queueName , true, new DeliverCallback() { // 成功处理 @Override public void handle(String consumerTag, Delivery message) throws IOException { // 获取队列中消息的数量 System.out.println(message.getEnvelope().getDeliveryTag()); System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8")); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }, new CancelCallback() { // 失败处理 @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败"); } }); System.out.println(queueName+"-开始接受消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(runnable,"queue8").start(); } }公平分配模式
package com.any.rabbitmq.work.fair; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; public class Producer { public static void main(String[] args) { //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp // ip port // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); // 定义 队列名称 String queueName = "queue8"; // 4、发送消息给 queue for (int i = 0; i < 20; i++) { String message = "hello,work-rabbitmq - " + i; channel.basicPublish("",queueName,null,message.getBytes()); } } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
work1
package com.any.rabbitmq.work.fair; import com.rabbitmq.client.*; import lombok.SneakyThrows; import java.io.IOException; import java.util.concurrent.TimeUnit; public class Work1 { public static Runnable runnable = new Runnable() { @Override public void run() { //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp // ip port // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); Channel finalChannel = channel; // 确认指标,一次性处理多少条消息 finalChannel.basicQos(1); finalChannel.basicConsume( queueName , false, new DeliverCallback() { // 成功处理 @SneakyThrows @Override public void handle(String consumerTag, Delivery message) throws IOException { // 获取队列中消息的数量 System.out.println(message.getEnvelope().getDeliveryTag()); System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8")); Thread.sleep(1000); // 设置手动应答 finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false); } }, new CancelCallback() { // 失败处理 @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败"); } }); System.out.println(queueName+"-开始接受消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(runnable,"queue8").start(); } }
work2
package com.any.rabbitmq.work.fair; import com.rabbitmq.client.*; import lombok.SneakyThrows; import java.io.IOException; import java.util.concurrent.TimeUnit; public class Work2 { public static Runnable runnable = new Runnable() { @Override public void run() { //所有中间件的技术都是基于tcp/ip 协议的基础之上构建新型的协议规范,只不过rabbitmq 遵循的amqp // ip port // 1、创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("101.35.118.177"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/"); // 虚拟访问节点 final String queueName = Thread.currentThread().getName(); Connection connection = null; Channel channel = null; try { // 2、创建连接Connection connection = factory.newConnection("生产者"); // 3、通过连接获取通道Channel channel = connection.createChannel(); Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume( queueName , false, new DeliverCallback() { // 成功处理 @SneakyThrows @Override public void handle(String consumerTag, Delivery message) throws IOException { // 获取队列中消息的数量 System.out.println(message.getEnvelope().getDeliveryTag()); System.out.println(queueName + "-收到的消息:" + new String(message.getBody(), "UTF-8")); Thread.sleep(2000); // 设置手动应答 finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false); } }, new CancelCallback() { // 失败处理 @Override public void handle(String consumerTag) throws IOException { System.out.println("消息接受失败"); } }); System.out.println(queueName+"-开始接受消息"); System.in.read(); } catch (Exception e) { e.printStackTrace(); } finally { if(channel !=null && channel.isOpen()){ try { // 7、关闭连接 channel.close(); } catch (Exception e) { e.printStackTrace(); } } if(connection!=null && connection.isOpen()){ try { // 8、关闭通道 connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }; public static void main(String[] args) { new Thread(runnable,"queue8").start(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)