经过4年的使用经验, 发现网上有很多的误解和错误的代码. 为清楚一些错误的观念和使用方式. 特意写了这个文章统一大家的编程规范和命名规范, 方便系统开发和系统集成.
增加了一些其它文章中的常见问题 和使用指南.和命名指南
首先要建立代码连接,然后绑定,然后发送.
代码中与RabbitMQ的连接是保持一打开永久使用的长连接的好呢? 还是每次需要都打开的好?
在数据库 *** 作的时候是建议每次执行查询都打开连接, 然后再关上.
那么在 *** 作RabbitMQ的时候, 是否也应该如此呢?
我的建议如下: 摘抄自 https://www.javaroad.cn/questions/94679
在RabbitMQ中,连接被认为是“昂贵的” - 它们占用TCP / IP端口,需要握手/协商等等 . 虽然这在SQL Server领域似乎微不足道,但当你谈到在RabbitMQ中每秒发送100K消息时,这种开销变得不可行 .
因此,对于RabbitMQ,一般的最佳做法是为每个应用程序实例打开一个连接,并尽可能长时间保持打开 - 如果可以的话最好是跟应用程序实例一样的生命周期,我个人一般设置为static 静态的. .
在app实例中,您可以在RabbitMQ连接之上创建通道(Channels) . 你可以非常快速地创建它们 . 大多数应用程序在RabbitMQ中使用单个通道进行单个 *** 作 . 消息制作人?打开一个 Channels . 从队列中消费?打开一个 Channels . 重新定义队列?打开 Channels 等
另外 - 如果你使用的是具有线程的语言,比如C#,你必须将你的 Channels 限制为一个线程 . 不要跨线程重用通道 . 如果你试图这样做会发生非常糟糕的事情 .
所以根据上面的指南我的消息生产者 发布消息的代码.
package com.qcd.webapi.service; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.*; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; // import com.qcd.webapi.SocketServe; import com.qcd.webapi.model.*; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.MessageProperties; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.stereotype.Service; import org.springframework.web.bind.annotation.*; import java.net.URLEncoder; import java.io.*; import java.net.ServerSocket; import java.net.Socket; // import java.net.URLEncoder; import java.text.SimpleDateFormat; import java.util.*; // import java.util; import java.util.concurrent.TimeoutException; // import com.weilan.openapi.test.RestTemplate; // import sun.misc.base64Decoder; @Slf4j @Service public class RabbitMQService { @Value("${app.RabbitMQ.IP}") private String AppRabbitMQIP; @Value("${app.RabbitMQ.Port}") private String AppRabbitMQPort; @Value("${app.RabbitMQ.UserName}") private String AppRabbitMQUserName; @Value("${app.RabbitMQ.Password}") private String AppRabbitMQPassword; @Value("${app.RabbitMQ.VirtualHost}") private String AppRabbitMQVirtualHost; @Value("${app.RabbitMQ.ExchangeName}") private String AppRabbitMQExchangeName; static Connection connection = null; // static Channel channel = null; static Object lockobj = new Object(); public Connection getConnection() throws IOException, TimeoutException { // 1.创建连接工厂 if (connection == null) { synchronized (lockobj) { if (connection == null) { ConnectionFactory factory = new ConnectionFactory( ); factory.setAutomaticRecoveryEnabled(true); factory.setHost(AppRabbitMQIP); factory.setPort(Integer.parseInt(AppRabbitMQPort)); factory.setUsername(AppRabbitMQUserName); factory.setPassword(AppRabbitMQPassword); factory.setVirtualHost(AppRabbitMQVirtualHost); // 创建与RabbitMQ服务器的TCP连接 connection = factory.newConnection(); } } } return connection; // Channel channel = connection.createChannel(); // return channel; } // ———————————————— // 版权声明:本文为CSDN博主「卑微的小白」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 // 原文链接:https://blog.csdn.net/qq_39411354/article/details/109311332 public boolean SendMsg(String routing_key, String msg) { Channel channel =null; try { connection = getConnection(); channel = connection.createChannel(); // 创建一个通道, // channel = connection.createChannel(); channel.exchangeDeclare(AppRabbitMQExchangeName, "topic", true); channel.basicPublish(AppRabbitMQExchangeName, routing_key, null, msg.getBytes()); return true; } catch (Exception ex) { ex.printStackTrace(); return false; } finally { if (channel != null) { try { channel.close(); } catch (Exception e) { e.printStackTrace(); } } } } protected void finalize() { if (connection != null) { try { connection.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
关于RoutingKey的命名规则.我建议是
E.系统.事件
例如:
E.WebAPI.DataReceived //意思是WebAPI系统触发了,数据已接收事件
E.WebAPI.OrderIsCreate //意思是WebAPI系统触发了,订单已创建事件
E.WebAPI.OrderIsDelete //意思是WebAPI系统触发了,订单已删除事件
E.WebAPI.OrderIsCheck //意思是WebAPI系统触发了,订单已审核事件
E.WebAPI.OrderIsInStock //意思是WebAPI系统触发了,订单已入库事件
在我的概念里, RoutingKey 约等于 事件名. 这个事件名一定要能够区分系统, 区分业务 *** 作.
越大的概念越要放左边. 容易被topic模式统一订阅. 例如我要订阅WebAPI的所有业务日志, 可以在队列绑定的时候 订阅成 Event.WebAPI.*
关于QuneName的命名规则.我的建议
Q.系统.方法.on.系统.事件
Q.ERP.CreateWorkOrder.on.WebAPI.OrderIsCheck
意思是当WebAPI数据已接收, ERP系统开始创建生产工单
Q.ERP.CancelWorkOrder.on.WebAPI.OrderIsDelete
意思是WebAPI系统触发了,ERP系统开始撤销生产工单
Q.ERP.CompleteWorkOrder.on.WebAPI.OrderIsInStock
意思是WebAPI系统触发了,订单已入库事件,ERP系统修改工单为的已完成状态.
2021-12-31
经过使用了一段时间, 发现Channel 如果共用同一个, 似乎会发生粘包的现象. 至于是否由它引起, 目前不可知. 暂时改回 每次发送消息都创建一个通道 Channel, 上文代码已修正.
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)