引入依赖
第一种模型(直连)com.rabbitmq amqp-client5.7.2
在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
开发生产者
//创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("10.15.0.9"); connectionFactory.setPort(5672); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123"); connectionFactory.setVirtualHost("/ems"); Connection connection = connectionFactory.newConnection(); //创建通道 Channel channel = connection.createChannel(); //参数1: 是否持久化 参数2:是否独占队列 参数3:是否自动删除 参数4:其他属性 channel.queueDeclare("hello",true,false,false,null); channel.basicPublish("","hello", null,"hello rabbitmq".getBytes()); channel.close(); connection.close();开发消费者
//创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("10.15.0.9"); connectionFactory.setPort(5672); connectionFactory.setUsername("ems"); connectionFactory.setPassword("123"); connectionFactory.setVirtualHost("/ems"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("hello", true, false, false, null); channel.basicConsume("hello",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } });参数的说明
channel.queueDeclare("hello",true,false,false,null); '参数1':用来声明通道对应的队列 '参数2':用来指定是否持久化队列 '参数3':用来指定是否独占队列 '参数4':用来指定是否自动删除队列 '参数5':对队列的额外配置编写rabbitmq工具类
可以明显发现生产者和消费者的代码有冗余,所以构建出工具类编写获取连接和关闭连接的方法
package com.demo.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitMQUtils { public static Connection getRabbitConnection() throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("10.15.0.9"); connectionFactory.setPort(5672); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/ems"); Connection connection = connectionFactory.newConnection(); return connection; } public static void closeRabbitConnection(Channel channel , Connection connection){ try { if (channel!=null) { channel.close(); } if (connection!=null) { connection.close(); } } catch (IOException | TimeoutException e) { e.printStackTrace(); } } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)