RabbitMQ入门,RabbitMQ的工作队列模式(上篇)

RabbitMQ入门,RabbitMQ的工作队列模式(上篇),第1张

RabbitMQ入门,RabbitMQ的工作队列模式(上篇) RabbitMQ工作模式

相关视频参考(来自动力节点):https://www.bilibili.com/video/BV1Ap4y1D7tU

相关资料下载:http://www.bjpowernode.com/?csdn

 

工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。

我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。

使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。

生产者发送消息

这里模拟耗时任务,发送的消息中,每个点使工作进程暂停一秒钟,例如"Hello…"将花费3秒钟来处理

package rabbitmq.workqueue;

import java.util.Scanner;



import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;



public class Test1 {

public static void main(String[] args) throws Exception {

ConnectionFactory f = new ConnectionFactory();

f.setHost("192.168.64.140");

f.setPort(5672);

f.setUsername("admin");

f.setPassword("admin");



Connection c = f.newConnection();

Channel ch = c.createChannel();

//参数:queue,durable,exclusive,autoDelete,arguments

ch.queueDeclare("helloworld", false,false,false,null);



while (true) {

    //控制台输入的消息发送到rabbitmq

System.out.print("输入消息: ");

String msg = new Scanner(System.in).nextLine();

//如果输入的是"exit"则结束生产者进程

if ("exit".equals(msg)) {

break;

}

//参数:exchage,routingKey,props,body

ch.basicPublish("", "helloworld", null, msg.getBytes());

System.out.println("消息已发送: "+msg);

}



c.close();

}

}
消费者接收消息
package rabbitmq.workqueue;

import java.io.IOException;

import java.util.concurrent.TimeoutException;



import com.rabbitmq.client.CancelCallback;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DeliverCallback;

import com.rabbitmq.client.Delivery;



public class Test2 {

public static void main(String[] args) throws Exception {

ConnectionFactory f = new ConnectionFactory();

f.setHost("192.168.64.140");

f.setUsername("admin");

f.setPassword("admin");

Connection c = f.newConnection();

Channel ch = c.createChannel();

ch.queueDeclare("helloworld",false,false,false,null);

System.out.println("等待接收数据");



//收到消息后用来处理消息的回调对象

DeliverCallback callback = new DeliverCallback() {

@Override

public void handle(String consumerTag, Delivery message) throws IOException {

String msg = new String(message.getBody(), "UTF-8");

System.out.println("收到: "+msg);



//遍历字符串中的字符,每个点使进程暂停一秒

for (int i = 0; i < msg.length(); i++) {

if (msg.charAt(i)=='.') {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

}

}

System.out.println("处理结束");

}

};



//消费者取消时的回调对象

CancelCallback cancel = new CancelCallback() {

@Override

public void handle(String consumerTag) throws IOException {

}

};



ch.basicConsume("helloworld", true, callback, cancel);

}

}
运行测试

运行:

  1. 一个生产者
  2. 两个消费者

生产者发送多条消息,如: 1,2,3,4,5. 两个消费者分别收到:

  1. 消费者一: 1,3,5
  2. 消费者二::2,4

rabbitmq在所有消费者中轮询分发消息,把消息均匀地发送给所有消费者

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5656341.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存