package com.jia.utils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMQUtils { //获取连接对象 public static Connection getConnection(){ try { ConnectionFactory connectionFactory = new ConnectionFactory(); //设置rabbit主机 端口 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); //设置虚拟主机 connectionFactory.setVirtualHost("/vh01"); //设置拥有改虚拟主机权限的用户 connectionFactory.setUsername("LIANG"); connectionFactory.setPassword("123456"); return connectionFactory.newConnection(); }catch (Exception e){ e.printStackTrace(); } return null; } //关闭连接 public static void closeConnectAndChanel(Channel channel,Connection connection){ try { if(channel!=null) { channel.close(); } if(connection!=null) { connection.close(); } }catch (Exception e){ e.printStackTrace(); } } }生产者
public class Provider { @Test public void testSendMsg() throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置rabbit主机 端口 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); //设置虚拟主机 connectionFactory.setVirtualHost("/vh01"); //设置拥有改虚拟主机权限的用户 connectionFactory.setUsername("LIANG"); connectionFactory.setPassword("123456"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //获取通道 Channel channel = connection.createChannel(); //通道绑定消息队列 channel.queueDeclare("msg",false,false,false,null); //发布消息 channel.basicPublish("","msg", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes()); //channel.basicPublish("","msg",null,"hello rabbitmq".getBytes()); //关闭通道 channel.close(); //关闭连接 connection.close(); } }消费者
package com.jia; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置rabbit主机 端口 connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); //设置虚拟主机 connectionFactory.setVirtualHost("/vh01"); //设置拥有改虚拟主机权限的用户 connectionFactory.setUsername("LIANG"); connectionFactory.setPassword("123456"); //获取连接对象 Connection connection = connectionFactory.newConnection(); //获取通道 Channel channel = connection.createChannel(); //绑定队列 channel.queueDeclare("msg",false,false,false,null); channel.basicConsume("msg",true, new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(new String(body)); } } ); //channel.close(); //connection.close(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)