KSO-在NETCore中RabbitMQ的使用以及相关代码

KSO-在NETCore中RabbitMQ的使用以及相关代码,第1张

KSO-在NETCore中RabbitMQ的使用以及相关代码 安装与配置

下载地址
Erlang https://www.erlang-solutions.com/resources/download.html
rabbitMQ http://www.rabbitmq.com/download.html
其实坑很多的,要找到对应的版本号

配置方法:
• 安装完事儿后要记得配置一下系统的环境变量。
此电脑–>鼠标右键“属性”–>高级系统设置–>环境变量–>“新建”系统环境变量

变量名:ERLANG_HOME
变量值就是刚才erlang的安装地址,点击确定。
然后双击系统变量path

点击“新建”,将%ERLANG_HOME%bin加入到path中。
• 最后windows键+R键,输入cmd,再输入erl,看到版本号就说明erlang安装成功了。

• 双击下载后的.exe文件,安装过程与erlang的安装过程相同。
• RabbitMQ安装好后接下来安装RabbitMQ-Plugins。打开命令行cd,输入RabbitMQ的sbin目录。
我的目录是:D:Program FilesRabbitMQ Serverrabbitmq_server-3.7.3sbin
然后在后面输入rabbitmq-plugins enable rabbitmq_management命令进行安装
等几秒钟看到这个界面后,访问http://localhost:15672
默认用户名和密码都是guest

生产者代码
生产: 
ConnectionFactory connection = new ConnectionFactory();
            connection.HostName = "localhost";
            connection.UserName = "guest";
            connection.Password = "guest";
            using (var models= connection.CreateConnection())
            {
                //创建一个信道
                using (IModel channel = models.CreateModel())
                {
                    //删除一下
                    channel.ExchangeDelete("kso");
                    channel.QueueDelete("queuekso1");
                    channel.QueueDelete("queuekso2");

                    // 创建交换机
                    channel.ExchangeDeclare("kso",ExchangeType.Direct);
                    //创建队列
                    channel.QueueDeclare("queuekso1",true,false,false);
                    channel.QueueDeclare("queuekso2",true,false,false);

                    //绑定
                    channel.QueueBind("queuekso1","kso","advancd");
                    channel.QueueBind("queuekso2","kso","advancd");
                    Console.BackgroundColor = ConsoleColor.Blue;

                    for (int i = 0; i < 1000; i++)
                    {
                        IBasicProperties basicProperties = channel.CreateBasicProperties();
                        basicProperties.Persistent = true;
                        channel.BasicPublish("kso",
                            "advancd", basicProperties,
                            Encoding.UTF8.GetBytes("这是一个生产这生产的消息"+i)
                            ) ;
                        Console.WriteLine("这是一个生产这生产的消息" + i);
                    }
                    while (true)
                    {
                        Console.WriteLine("输入消息!");
                        string name = Console.ReadLine();
                        IBasicProperties basicProperties = channel.CreateBasicProperties();
                        basicProperties.Persistent = true;


                        channel.BasicPublish("kso","advancd",basicProperties,Encoding.UTF8.GetBytes("这是自己输入的消息"+name));
                        Console.WriteLine("这是自己输入的消息"+name);


                    }

消费者代码
  public void Customtion()
        {
            var factory = new ConnectionFactory();
            factory.HostName = "localhost";//RabbitMQ服务在本地运行
            factory.UserName = "guest";//用户名
            factory.Password = "guest";//密码 

            using (var connet = factory.CreateConnection())
            {
                using (var channel = connet.CreateModel())
                {
                    //创建交换机
                    channel.ExchangeDeclare("kso", type: ExchangeType.Direct, durable: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare("queuekso1", true, false, false, null);
                    channel.QueueDeclare("queuekso2", true, false, false, null);

                    channel.QueueBind("queuekso1", "kso", "advancd", null);
                    channel.QueueBind("queuekso2", "kso", "advancd", null);

                    RabbitMQ.Client.Events.EventingBasicConsumer eventingBasicConsumer = new RabbitMQ.Client.Events.EventingBasicConsumer(channel);
                    eventingBasicConsumer.Received += (sender, e) =>
                    {
                        var boy = e.Body;
                        Console.WriteLine(Encoding.UTF8.GetString(boy.ToArray()));
                    };
                    channel.BasicConsume(queue: "queuekso1", autoAck: true, consumer: eventingBasicConsumer);
                    channel.BasicConsume(queue: "queuekso2", autoAck: true, consumer: eventingBasicConsumer);
                    Console.WriteLine(" 结束了");
                    Console.ReadKey();
                }
            }
        }

RabbitMQ事务支持
 生产者的简单事务代码
   channel./confirm/iSelect();
  channel.BasicPublish("kso", item.LogType, properties, item.Msg);
                        Console.WriteLine(Encoding.UTF8.GetString(item.Msg));
                        if (channel.WaitFor/confirm/is()) //如果一条消息或多消息都确认发送,
                        {
                            Console.WriteLine($"【{item}】发送到Broke成功!");
                        }
                        else
                        {
                            //可以记录个日志,重试一下;
                        }
                channel.WaitFor/confirm/isOrDie(); 这个代表着发送消息如果失败则直接关闭信道不能再次尝试

 
消费者事务支持
//如果在这里处理消息的手,异常了呢? 
                        //Console.WriteLine($"接收到消息:{message}"); ; 
                        if (i < 50)
                        {
                            //手动确认  消息正常消费  告诉Broker:你可以把当前这条消息删除掉了
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                            Console.WriteLine(message);
                        }
                        else
                        {
                            //否定:告诉Broker,这个消息我没有正常消费;  requeue: true:重新写入到队列里去; false:你还是删除掉;
                            channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
                        }



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5699884.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存