相关视频参考(来自动力节点):https://www.bilibili.com/video/BV1Ap4y1D7tU
相关资料下载:http://www.bjpowernode.com/?csdn
工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。
我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。
使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。
生产者发送消息这里模拟耗时任务,发送的消息中,每个点使工作进程暂停一秒钟,例如"Hello…"将花费3秒钟来处理
package rabbitmq.workqueue; import java.util.Scanner; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Test1 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setPort(5672); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); //参数:queue,durable,exclusive,autoDelete,arguments ch.queueDeclare("helloworld", false,false,false,null); while (true) { //控制台输入的消息发送到rabbitmq System.out.print("输入消息: "); String msg = new Scanner(System.in).nextLine(); //如果输入的是"exit"则结束生产者进程 if ("exit".equals(msg)) { break; } //参数:exchage,routingKey,props,body ch.basicPublish("", "helloworld", null, msg.getBytes()); System.out.println("消息已发送: "+msg); } c.close(); } }消费者接收消息
package rabbitmq.workqueue; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class Test2 { public static void main(String[] args) throws Exception { ConnectionFactory f = new ConnectionFactory(); f.setHost("192.168.64.140"); f.setUsername("admin"); f.setPassword("admin"); Connection c = f.newConnection(); Channel ch = c.createChannel(); ch.queueDeclare("helloworld",false,false,false,null); System.out.println("等待接收数据"); //收到消息后用来处理消息的回调对象 DeliverCallback callback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); System.out.println("收到: "+msg); //遍历字符串中的字符,每个点使进程暂停一秒 for (int i = 0; i < msg.length(); i++) { if (msg.charAt(i)=='.') { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } System.out.println("处理结束"); } }; //消费者取消时的回调对象 CancelCallback cancel = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { } }; ch.basicConsume("helloworld", true, callback, cancel); } }运行测试
运行:
- 一个生产者
- 两个消费者
生产者发送多条消息,如: 1,2,3,4,5. 两个消费者分别收到:
- 消费者一: 1,3,5
- 消费者二::2,4
rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)