RabbitMQ的添加用户与权限 生产者与消费者

RabbitMQ的添加用户与权限 生产者与消费者,第1张

RabbitMQ的添加用户与权限 生产者消费者 RabbitMq 添加用户

启动RabbitMQ

点击你刚刚创建的amd用户

点了之后会出现下面这张图片的内容


把下面的括号都选上 然后点击close

生产者与消费者

注意生产者与消费者都要在同一网络上

生产者
  this.hosts=['amqp://192.168.199.130'];//生产者的ip地址
        this.index=0;
        this.open = amqp.connect({
            hostname:"192.168.199.130",//IP地址
            protocol:"amqp",
            port:5672,
            username:"amd",// 你刚刚创建的账号
            password:"amd"//与密码
        });
let amqp = require('amqplib');

class RabbitMQ {
    constructor() {
        this.hosts=['amqp://192.168.199.130'];
        this.index=0;
        this.open = amqp.connect({
            hostname:"192.168.199.130",
            protocol:"amqp",
            port:5672,
            username:"amd",
            password:"amd"
        });
    }
    sendQueueMsg(queueName, msg, errCallBack) {
        let self = this;

        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName).then(function (ok) {
                    return channel.sendToQueue(queueName, Buffer.from(msg), {
                        persistent: true
                    });
                })
                    .then(function (data) {
                        if (data) {
                            errCallBack && errCallBack("success");
                            channel.close();
                        }
                    })
                    .catch(function () {
                        setTimeout(() => {
                            if (channel) {
                                channel.close();
                            }
                        }, 500)
                    });
            })
            .catch(function () {
                let num = self.index++;

                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index == 0;
                }
            });
    }
}

module.exports=RabbitMQ 

if循环生成消息

//ptest.js
var RabbitMQ=require("./prd.js")
let mq = new RabbitMQ();
var count=0;
setInterval(()=>{
    count++;
    mq.sendQueueMsg('testQueue', JSON.stringify({kind:"obj",count:count}), (error) => {
        console.log(error)
    })

},1000);
消费者

cus.js

  constructor() {
      this.hosts=['amqp://192.168.199.249']; //生产者的ip地址
      this.index=0;
        this.open = amqp.connect({
         hostname:"192.168.199.249",//生产者的ip地址
         protocol:"amqp",
         port:5672,
         username:"amd",
         password:"amd"//与生产者的RabbitMQ的账号密码
         });
    }
let amqp = require('amqplib');

class RabbitMQ{
    constructor() {
      this.hosts=['amqp://192.168.199.249'];
      this.index=0;
        this.open = amqp.connect({
         hostname:"192.168.199.249",
         protocol:"amqp",
         port:5672,
         username:"admin",
         password:"admin"
         });
    }
    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
        let self = this;
        self.open
            .then(function (conn) {
                return conn.createChannel();
            })
            .then(function (channel) {
                return channel.assertQueue(queueName)
                    .then(function (ok) {
                        return channel.consume(queueName, function (msg) {
                            if (msg !== null) {
                                let data = msg.content.toString();
                                channel.ack(msg);
                                receiveCallBack && receiveCallBack(data);
                            }
                        })
                            .finally(function () {
                                setTimeout(() => {
                                    if (channel) {
                                      //  channel.close();
                                    }
                                }, 500)
                            });
                    })
            })
            .catch(function () {
                let num = self.index++;
                if (num <= self.length - 1) {
                    self.open = amqp.connect(self.hosts[num]);
                } else {
                    self.index = 0;
                    self.open = amqp.connect(self.hosts[0]);
                }
            });
    }
}

module.exports=RabbitMQ 
//ctest.js
var RabbitMQ=require("./cus.js")
let mq = new RabbitMQ();
mq.receiveQueueMsg('testQueue',(msg) => 
{    
   console.log(msg);
})

先运行生产者

在运行消费者

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5635129.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存