C# RabbitMQ使用

C# RabbitMQ使用,第1张

// 简单使用(一对一)
    class RabbitMQSimple
    {
        // 1. RabbitMQ实例broker重启时,所有未申明durable的交换器和队列都会被删除,交换器未声明durable不会影响队列的持久化。
        // 2. RabbitMQ中消息都被保存在队列中,所以如果队列被删除,消息不管有没有设置deliveryMode=2都会被删除。
        static void rabbitMQDeclare() {
            IModel channel = RabbitMQConnection.GetIConnection().CreateModel();

            // durable:     队列在broker重启后是否还存在。
            // exclusive:   1.仅声明的连接(连接下所有信道可访问)可访问;2.连接(不是信道)断开时队列会自动删除。
            // autoDelete:  是否自动删除,所有消费者断开连接之后队列是否自动被删除。
            // arguments:   配置队列中的消息什么时候会自动被删除。
            channel.QueueDeclare("XXXXXX", true, false, false, null);
        }

        static void rabbitMQProducer()
        {
            IModel channel = RabbitMQConnection.GetIConnection().CreateModel();
            for (int i = 0; i < 20; i++)
            {
                try
                {
                    // 配置消息属性(是否持久化等)
                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2; // 1非持久化,2持久化

                    channel.BasicPublish("", "XXXXX", properties, Encoding.UTF8.GetBytes(i.ToString()));
                    Console.WriteLine(string.Format("send:{0}", i.ToString()));
                }
                catch (Exception ex)
                {
                    // NOTE 考虑rabbitmq服务重启等异常处理
                    Console.WriteLine(ex.Message);
                    Thread.Sleep(1000);
                }
                Thread.Sleep(100);
            }
        }

        static void rabbitMQConsumer()
        {
            IModel channel = RabbitMQConnection.GetIConnection().CreateModel();
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                try
                {
                    string msgBody = Encoding.UTF8.GetString(ea.Body.ToArray());
                    Console.WriteLine(string.Format("recive:{0}", msgBody));

                    // 应答消息:支持应答该消费者先前接收未ack的所有消息(multiple=true),应答后消息被队列删除。
                    channel.BasicAck(ea.DeliveryTag, false);

                    // 拒绝消息:消费端告诉队列这个消息我拒绝、不处理,一次只能处理一个消息,可控制删除或重新放回队列。
                    // requeue=true消息重新入队列,该消费者还是会消费到该消息;requeue=false消息丢弃或者进入死信队列。
                    // channel.BasicReject(ea.DeliveryTag, true);

                    // 否定消息:与BasicReject区别是支持nack该消费者先前接收未ack的所有消息(multiple=true)。
                    // channel.BasicNack(ea.DeliveryTag, true, true);

                    // 恢复消息:重新投递还未被确认的消息,requeue=true则尽可能分配给其他消费者,false则消息会重新被投递给自己。
                    // channel.BasicRecover(true);
                }
                catch (Exception ex)
                {
                    // NOTE 考虑rabbitmq服务重启等异常处理
                    Console.WriteLine(ex.Message);
                    Thread.Sleep(1000);
                }

                Thread.Sleep(200);
            };

            // 消费端限流配置,非自动确认消息(channel.BasicConsume->autoAck=false)前提下生效,
            // 如果一定数目(channel.BasicQos->prefetchCount)的消息是未确认状态,则队列不分配新的消息给该消费端。
            // prefetchSize:    单条消息大小限制(0代表不限制,RabbitMQ尚未实现)。
            // prefetchCount:   如果有N个消息还没有ack,则该consumer将block掉,直到有消息ack(no_ask=false下生效)。
            // global:          true->设置应用于channel级别,false->设置应用于consumer级别(RabbitMQ的channel级别尚未实现)。
            channel.BasicQos(0, 10, false);

            // autoAck:自动应答,消息被消费者取出就会从队列中删除(消息处理异常会丢失数据)。
            channel.BasicConsume("XXXXXX", false, consumer);
        }

        public static void Start()
        {
            // 声明队列
            rabbitMQDeclare();

            // 生产者
            Thread productThread = new Thread(rabbitMQProducer);
            // productThread.Start();

            // 消费者
            rabbitMQConsumer();
            Console.ReadKey();
        }
    }

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/799015.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-06
下一篇 2022-05-06

发表评论

登录后才能评论

评论列表(0条)

保存