com.rabbitmq amqp-client5.7.3
1.简单模式连接(1个消费者获取一个队列):发布者代码:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Providers { public static void main(String[] args) throws IOException, TimeoutException { //创建连接 ConnectionFactory factory = new ConnectionFactory(); //地址 factory.setHost("192.168.17.8"); //用户的【Can access virtual hosts】值 factory.setVirtualHost("/"); //服务器端口 factory.setPort(5672); //用户名 factory.setUsername("guest"); //密码 factory.setPassword("llh123456"); //获取连接 Connection connection = factory.newConnection(); //创建频道 Channel channel = connection.createChannel(); //申明队列(对列名称,是否持久化,是否独占频道,是否自动销毁,其他参数Map集合) channel.queueDeclare("First_llh",true,false,false,null); //设置消息 String massges = "hellow rabbitmq"; //发送消息(对列名称,BasicProperties基础配置,消息的字节数组) channel.basicPublish("","First_llh",null,massges.getBytes()); //关闭频道 channel.close(); //关闭连接 connection.close(); } }接收者代码:
import com.rabbitmq.client.*; import util.RabbitConnection; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Conmmons { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456"); //创建频道 Channel channel = geust.createChannel(); //接受消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { //获取消息(delivery.getBody() = [B@4322ce5)然后设置编码UTF_8编码,在转换string类型 String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("massges:[" + message +"]"); }; //对列名称,是否持久化, channel.basicConsume("First_llh",true,deliverCallback,(consumerTag)->{}); } }考虑到创建连接的代码冗余:
写一个连接的工具类
package util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class RabbitConnection { public static Connection getConection(String host,String SSL,Integer port,String user,String pwd) throws IOException, TimeoutException { //创建连接 ConnectionFactory factory = new ConnectionFactory(); //地址 factory.setHost(host); //用户的【Can access virtual hosts】值 factory.setVirtualHost(SSL); //服务器端口 factory.setPort(port); //用户名 factory.setUsername(user); //密码 factory.setPassword(pwd); //获取连接 Connection connection = factory.newConnection(); return connection; } }
调用工具类获取连接
//获取连接 Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456");注:接收者启动会监视队列是否发送了新的消息,实时刷新的
2.工作模式连接(2个消费者获取一个队列):接收者1类代码
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback; import util.RabbitConnection; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class ConmmonsTwo { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456"); //创建频道 Channel channel = geust.createChannel(); //接受消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { //获取消息(delivery.getBody() = [B@4322ce5)然后设置编码UTF_8编码,在转换string类型 String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("massges:[" + message +"]"); }; //对列名称,是否持久化, channel.basicConsume("First_llh",true,deliverCallback,(consumerTag)->{}); } }接收者2类代码
import com.rabbitmq.client.*; import util.RabbitConnection; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException; public class Conmmons { public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection geust = RabbitConnection.getConection("192.168.17.8", "/", 5672, "guest", "llh123456"); //创建频道 Channel channel = geust.createChannel(); //接受消息 DeliverCallback deliverCallback = (consumerTag, delivery) -> { //获取消息(delivery.getBody() = [B@4322ce5)然后设置编码UTF_8编码,在转换string类型 String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("massges:[" + message +"]"); }; //对列名称,是否持久化, channel.basicConsume("First_llh",true,deliverCallback,(consumerTag)->{}); } }注:工作模式下对列会均匀分配资源
1.同时运行 接收者1类 和 接收者2类
发送10条数据
读取者1的结果
读取者2的结果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)