RabbitMQ的使用建议

RabbitMQ的使用建议,第1张

RabbitMQ的使用建议

经过4年的使用经验, 发现网上有很多的误解和错误的代码. 为清楚一些错误的观念和使用方式. 特意写了这个文章统一大家的编程规范和命名规范, 方便系统开发和系统集成.
增加了一些其它文章中的常见问题 和使用指南.和命名指南

首先要建立代码连接,然后绑定,然后发送.

代码中与RabbitMQ的连接是保持一打开永久使用的长连接的好呢? 还是每次需要都打开的好?

在数据库 *** 作的时候是建议每次执行查询都打开连接, 然后再关上.
那么在 *** 作RabbitMQ的时候, 是否也应该如此呢?

我的建议如下: 摘抄自 https://www.javaroad.cn/questions/94679

在RabbitMQ中,连接被认为是“昂贵的” - 它们占用TCP / IP端口,需要握手/协商等等 . 虽然这在SQL Server领域似乎微不足道,但当你谈到在RabbitMQ中每秒发送100K消息时,这种开销变得不可行 .

因此,对于RabbitMQ,一般的最佳做法是为每个应用程序实例打开一个连接,并尽可能长时间保持打开 - 如果可以的话最好是跟应用程序实例一样的生命周期,我个人一般设置为static 静态的. .

在app实例中,您可以在RabbitMQ连接之上创建通道(Channels) . 你可以非常快速地创建它们 . 大多数应用程序在RabbitMQ中使用单个通道进行单个 *** 作 . 消息制作人?打开一个 Channels . 从队列中消费?打开一个 Channels . 重新定义队列?打开 Channels 等

另外 - 如果你使用的是具有线程的语言,比如C#,你必须将你的 Channels 限制为一个线程 . 不要跨线程重用通道 . 如果你试图这样做会发生非常糟糕的事情 .

所以根据上面的指南我的消息生产者 发布消息的代码.

package com.qcd.webapi.service;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.*;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;
// import com.qcd.webapi.SocketServe;
import com.qcd.webapi.model.*; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;

import java.net.URLEncoder;
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
// import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.*;
// import java.util;
import java.util.concurrent.TimeoutException;

// import com.weilan.openapi.test.RestTemplate;
// import sun.misc.base64Decoder;

@Slf4j
@Service
public class RabbitMQService {
  

    @Value("${app.RabbitMQ.IP}")
    private String AppRabbitMQIP;
    @Value("${app.RabbitMQ.Port}")
    private String AppRabbitMQPort;
    @Value("${app.RabbitMQ.UserName}")
    private String AppRabbitMQUserName;
    @Value("${app.RabbitMQ.Password}")
    private String AppRabbitMQPassword;
    @Value("${app.RabbitMQ.VirtualHost}")
    private String AppRabbitMQVirtualHost;
    @Value("${app.RabbitMQ.ExchangeName}")
    private String AppRabbitMQExchangeName;

    
    

    static Connection connection = null;
    // static Channel channel = null;
    static Object lockobj = new Object();

    public Connection  getConnection() throws IOException, TimeoutException {
        // 1.创建连接工厂
        if (connection == null) {
            synchronized (lockobj) {
                if (connection == null) {
                    ConnectionFactory factory = new ConnectionFactory( );
                    factory.setAutomaticRecoveryEnabled(true);
                    factory.setHost(AppRabbitMQIP);
                    factory.setPort(Integer.parseInt(AppRabbitMQPort));
                    factory.setUsername(AppRabbitMQUserName);
                    factory.setPassword(AppRabbitMQPassword);
                    factory.setVirtualHost(AppRabbitMQVirtualHost);

                    // 创建与RabbitMQ服务器的TCP连接
                    connection = factory.newConnection();
                }
            }
        }
         return connection;
        // Channel  channel = connection.createChannel(); 
        // return channel;
    }

    // ————————————————
    // 版权声明:本文为CSDN博主「卑微的小白」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    // 原文链接:https://blog.csdn.net/qq_39411354/article/details/109311332

    
    public boolean SendMsg(String routing_key, String msg) {
        Channel  channel =null;
        try {
            connection = getConnection();
            channel = connection.createChannel();
            // 创建一个通道,
            // channel = connection.createChannel();
            channel.exchangeDeclare(AppRabbitMQExchangeName, "topic", true);
            channel.basicPublish(AppRabbitMQExchangeName, routing_key, null, msg.getBytes());
            return true;
        } catch (Exception ex) {
            ex.printStackTrace();
            return false;
        } 
        finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    protected void finalize() {
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}

关于RoutingKey的命名规则.我建议是

E.系统.事件

例如:

E.WebAPI.DataReceived //意思是WebAPI系统触发了,数据已接收事件
E.WebAPI.OrderIsCreate //意思是WebAPI系统触发了,订单已创建事件
E.WebAPI.OrderIsDelete //意思是WebAPI系统触发了,订单已删除事件
E.WebAPI.OrderIsCheck //意思是WebAPI系统触发了,订单已审核事件
E.WebAPI.OrderIsInStock //意思是WebAPI系统触发了,订单已入库事件

在我的概念里, RoutingKey 约等于 事件名. 这个事件名一定要能够区分系统, 区分业务 *** 作.
越大的概念越要放左边. 容易被topic模式统一订阅. 例如我要订阅WebAPI的所有业务日志, 可以在队列绑定的时候 订阅成 Event.WebAPI.*

关于QuneName的命名规则.我的建议

Q.系统.方法.on.系统.事件

Q.ERP.CreateWorkOrder.on.WebAPI.OrderIsCheck
意思是当WebAPI数据已接收, ERP系统开始创建生产工单

Q.ERP.CancelWorkOrder.on.WebAPI.OrderIsDelete
意思是WebAPI系统触发了,ERP系统开始撤销生产工单

Q.ERP.CompleteWorkOrder.on.WebAPI.OrderIsInStock
意思是WebAPI系统触发了,订单已入库事件,ERP系统修改工单为的已完成状态.

2021-12-31
经过使用了一段时间, 发现Channel 如果共用同一个, 似乎会发生粘包的现象. 至于是否由它引起, 目前不可知. 暂时改回 每次发送消息都创建一个通道 Channel, 上文代码已修正.

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688646.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存