启动RabbitMQ
点击你刚刚创建的amd用户
点了之后会出现下面这张图片的内容
把下面的括号都选上 然后点击close
注意生产者与消费者都要在同一网络上
生产者this.hosts=['amqp://192.168.199.130'];//生产者的ip地址 this.index=0; this.open = amqp.connect({ hostname:"192.168.199.130",//IP地址 protocol:"amqp", port:5672, username:"amd",// 你刚刚创建的账号 password:"amd"//与密码 });
let amqp = require('amqplib'); class RabbitMQ { constructor() { this.hosts=['amqp://192.168.199.130']; this.index=0; this.open = amqp.connect({ hostname:"192.168.199.130", protocol:"amqp", port:5672, username:"amd", password:"amd" }); } sendQueueMsg(queueName, msg, errCallBack) { let self = this; self.open .then(function (conn) { return conn.createChannel(); }) .then(function (channel) { return channel.assertQueue(queueName).then(function (ok) { return channel.sendToQueue(queueName, Buffer.from(msg), { persistent: true }); }) .then(function (data) { if (data) { errCallBack && errCallBack("success"); channel.close(); } }) .catch(function () { setTimeout(() => { if (channel) { channel.close(); } }, 500) }); }) .catch(function () { let num = self.index++; if (num <= self.length - 1) { self.open = amqp.connect(self.hosts[num]); } else { self.index == 0; } }); } } module.exports=RabbitMQ
if循环生成消息
//ptest.js var RabbitMQ=require("./prd.js") let mq = new RabbitMQ(); var count=0; setInterval(()=>{ count++; mq.sendQueueMsg('testQueue', JSON.stringify({kind:"obj",count:count}), (error) => { console.log(error) }) },1000);消费者
cus.js
constructor() { this.hosts=['amqp://192.168.199.249']; //生产者的ip地址 this.index=0; this.open = amqp.connect({ hostname:"192.168.199.249",//生产者的ip地址 protocol:"amqp", port:5672, username:"amd", password:"amd"//与生产者的RabbitMQ的账号密码 }); }
let amqp = require('amqplib'); class RabbitMQ{ constructor() { this.hosts=['amqp://192.168.199.249']; this.index=0; this.open = amqp.connect({ hostname:"192.168.199.249", protocol:"amqp", port:5672, username:"admin", password:"admin" }); } receiveQueueMsg(queueName, receiveCallBack, errCallBack) { let self = this; self.open .then(function (conn) { return conn.createChannel(); }) .then(function (channel) { return channel.assertQueue(queueName) .then(function (ok) { return channel.consume(queueName, function (msg) { if (msg !== null) { let data = msg.content.toString(); channel.ack(msg); receiveCallBack && receiveCallBack(data); } }) .finally(function () { setTimeout(() => { if (channel) { // channel.close(); } }, 500) }); }) }) .catch(function () { let num = self.index++; if (num <= self.length - 1) { self.open = amqp.connect(self.hosts[num]); } else { self.index = 0; self.open = amqp.connect(self.hosts[0]); } }); } } module.exports=RabbitMQ
//ctest.js var RabbitMQ=require("./cus.js") let mq = new RabbitMQ(); mq.receiveQueueMsg('testQueue',(msg) => { console.log(msg); })
先运行生产者
在运行消费者
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)