RabbitMQ——RabbitMQ的HelloWorld消息模型(直连)
1、HelloWorld消息模型之发布者发布消息2、HelloWorld消息模型之消费者消费消息3、连接工具类的封装
RabbitMQ——RabbitMQ的HelloWorld消息模型(直连)AMPQ模型
AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。
AMPQ结构图为:
1、HelloWorld消息模型之发布者发布消息HelloWorld消息模型结构
P:生产者,要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
新建一个maven项目,导入依赖
com.rabbitmq amqp-client5.7.2
创建虚拟主机ems和用户ems,并把虚拟主机添加到用户中
消息生产者的开发
package com.cheng.helloworld; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; //消息生产者,消息生产者需要与rabbitmq建立连接 public class Provider { public static void main(String[] args) throws IOException, TimeoutException { //创建连接rabbitmq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接rabbitmq位于的主机 connectionFactory.setHost("121.199.53.150"); //设置端口号,应用访问端口 connectionFactory.setPort(5672); //设置虚拟主机 connectionFactory.setVirtualHost("/ems"); //设置可以访问虚拟主机用户的用户名和密码 connectionFactory.setUsername("ems"); connectionFactory.setPassword("ems"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //获取连接中的通道对象 Channel channel = connection.createChannel(); //通道对象要与消息队列建立连接,也可以说是绑定,绑定了才知道消息要发送到哪个队列 //创建队列:参数1:队列名称,如果队列不存在会自动创建 //参数二:队列是否要持久化,false不持久化,true持久化,如果不持久化,当rabbitmq重启服务后,队列会丢失,队列中未消费的消息自然也会丢失;如果持久化,当rabbitmq重启服务后,队列不会丢失,但队列中未消费的消息还会丢失 //参数三:是否连接独占队列,false不独占 //参数四:在消费完消息后是否自动删除队列 //参数五:额外参数 channel.queueDeclare("hello",true,false,false,null); //发布消息到队列queue中 //参数一:交换机名称,直连模式没有交换机 //参数二:队列名称 //参数三:传递消息额外设置,MessageProperties.PERSISTENT_TEXT_PLAIN:消息持久化,当rabbitmq重启服务后,未消费的消息不会丢失 //参数四:消息具体内容 channel.basicPublish("","hello",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes()); //关闭通道和连接 channel.close(); connection.close(); } }
运行后,查看rabbitmq的管理控制页面:
Queues里面多了一个名为hello队列,里面有一条消息,HelloWorld消息模型消息发布成功。
2、HelloWorld消息模型之消费者消费消息消息消费者的开发
package com.cheng.helloworld; import com.rabbitmq.client.*; import sun.rmi.transport.Endpoint; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //创建连接rabbitmq的连接工厂对象 ConnectionFactory connectionFactory = new ConnectionFactory(); //连接rabbitmq位于的主机 connectionFactory.setHost("121.199.53.150"); //设置端口号,应用访问端口 connectionFactory.setPort(5672); //设置虚拟主机 connectionFactory.setVirtualHost("/ems"); //设置可以访问虚拟主机用户的用户名和密码 connectionFactory.setUsername("ems"); connectionFactory.setPassword("ems"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //获取连接中的通道对象 Channel channel = connection.createChannel(); //通道对象要与消息队列建立连接,也可以说是绑定,绑定了才知道消息要从哪个队列获取 //创建队列:参数1:队列名称,如果队列不存在会自动创建 //参数二:定义队列名称是否要持久化,false不持久化 //参数三:是否连接独占队列 //参数四:在消费完消息后是否自动删除队列 //参数五:额外参数 channel.queueDeclare("hello",true,false,false,null); //发布消息到队列中 //参数一:交换机名称,直连模式没有交换机 //参数二:开启消息的自动确认机制 //参数三:消费者消费时的回调接口 channel.basicConsume("hello",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("new String(body) = "+ new String(body)); } }); //不关闭通道和连接,让消息消费者一直监听队列 } }
运行,查看控制台输出:
消费者消费完消息后,回调方法成功执行。
并且hello队列里的消息数量变为0:
注意:这里消费者会消费队列里所有的消息。
3、连接工具类的封装public class ConnectUtils { private static ConnectionFactory connectionFactory; static { connectionFactory = new ConnectionFactory(); } public static Connection getConnection(String host,Integer port,String virtualHost,String username,String pwd){ try { connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setVirtualHost(virtualHost); connectionFactory.setUsername(username); connectionFactory.setPassword(pwd); return connectionFactory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return null; } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)