下载地址
Erlang https://www.erlang-solutions.com/resources/download.html
rabbitMQ http://www.rabbitmq.com/download.html
其实坑很多的,要找到对应的版本号
配置方法:
• 安装完事儿后要记得配置一下系统的环境变量。
此电脑–>鼠标右键“属性”–>高级系统设置–>环境变量–>“新建”系统环境变量
变量名:ERLANG_HOME
变量值就是刚才erlang的安装地址,点击确定。
然后双击系统变量path
点击“新建”,将%ERLANG_HOME%bin加入到path中。
• 最后windows键+R键,输入cmd,再输入erl,看到版本号就说明erlang安装成功了。
• 双击下载后的.exe文件,安装过程与erlang的安装过程相同。
• RabbitMQ安装好后接下来安装RabbitMQ-Plugins。打开命令行cd,输入RabbitMQ的sbin目录。
我的目录是:D:Program FilesRabbitMQ Serverrabbitmq_server-3.7.3sbin
然后在后面输入rabbitmq-plugins enable rabbitmq_management命令进行安装
等几秒钟看到这个界面后,访问http://localhost:15672
默认用户名和密码都是guest
生产: ConnectionFactory connection = new ConnectionFactory(); connection.HostName = "localhost"; connection.UserName = "guest"; connection.Password = "guest"; using (var models= connection.CreateConnection()) { //创建一个信道 using (IModel channel = models.CreateModel()) { //删除一下 channel.ExchangeDelete("kso"); channel.QueueDelete("queuekso1"); channel.QueueDelete("queuekso2"); // 创建交换机 channel.ExchangeDeclare("kso",ExchangeType.Direct); //创建队列 channel.QueueDeclare("queuekso1",true,false,false); channel.QueueDeclare("queuekso2",true,false,false); //绑定 channel.QueueBind("queuekso1","kso","advancd"); channel.QueueBind("queuekso2","kso","advancd"); Console.BackgroundColor = ConsoleColor.Blue; for (int i = 0; i < 1000; i++) { IBasicProperties basicProperties = channel.CreateBasicProperties(); basicProperties.Persistent = true; channel.BasicPublish("kso", "advancd", basicProperties, Encoding.UTF8.GetBytes("这是一个生产这生产的消息"+i) ) ; Console.WriteLine("这是一个生产这生产的消息" + i); } while (true) { Console.WriteLine("输入消息!"); string name = Console.ReadLine(); IBasicProperties basicProperties = channel.CreateBasicProperties(); basicProperties.Persistent = true; channel.BasicPublish("kso","advancd",basicProperties,Encoding.UTF8.GetBytes("这是自己输入的消息"+name)); Console.WriteLine("这是自己输入的消息"+name); }消费者代码
public void Customtion() { var factory = new ConnectionFactory(); factory.HostName = "localhost";//RabbitMQ服务在本地运行 factory.UserName = "guest";//用户名 factory.Password = "guest";//密码 using (var connet = factory.CreateConnection()) { using (var channel = connet.CreateModel()) { //创建交换机 channel.ExchangeDeclare("kso", type: ExchangeType.Direct, durable: false, autoDelete: false, arguments: null); channel.QueueDeclare("queuekso1", true, false, false, null); channel.QueueDeclare("queuekso2", true, false, false, null); channel.QueueBind("queuekso1", "kso", "advancd", null); channel.QueueBind("queuekso2", "kso", "advancd", null); RabbitMQ.Client.Events.EventingBasicConsumer eventingBasicConsumer = new RabbitMQ.Client.Events.EventingBasicConsumer(channel); eventingBasicConsumer.Received += (sender, e) => { var boy = e.Body; Console.WriteLine(Encoding.UTF8.GetString(boy.ToArray())); }; channel.BasicConsume(queue: "queuekso1", autoAck: true, consumer: eventingBasicConsumer); channel.BasicConsume(queue: "queuekso2", autoAck: true, consumer: eventingBasicConsumer); Console.WriteLine(" 结束了"); Console.ReadKey(); } } }RabbitMQ事务支持
生产者的简单事务代码 channel./confirm/iSelect(); channel.BasicPublish("kso", item.LogType, properties, item.Msg); Console.WriteLine(Encoding.UTF8.GetString(item.Msg)); if (channel.WaitFor/confirm/is()) //如果一条消息或多消息都确认发送, { Console.WriteLine($"【{item}】发送到Broke成功!"); } else { //可以记录个日志,重试一下; } channel.WaitFor/confirm/isOrDie(); 这个代表着发送消息如果失败则直接关闭信道不能再次尝试 消费者事务支持 //如果在这里处理消息的手,异常了呢? //Console.WriteLine($"接收到消息:{message}"); ; if (i < 50) { //手动确认 消息正常消费 告诉Broker:你可以把当前这条消息删除掉了 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); Console.WriteLine(message); } else { //否定:告诉Broker,这个消息我没有正常消费; requeue: true:重新写入到队列里去; false:你还是删除掉; channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true); }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)