5. RocketMQ源码分析之Producer

5. RocketMQ源码分析之Producer,第1张

5. RocketMQ源码分析之Producer RocketMQ源码分析之Producer Producer启动流程
DefaultMQProducer producer = new DefaultMQProducer("group");
producer.start();
// DefaultMQProducer.java
public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    //todo Producer的启动流程核心是defaultMQProducerImpl.start();
    this.defaultMQProducerImpl.start(); // here
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

DefaultMQProducerImpl中的start()方法是生产者启动的核心方法

核心三个方法: 检查、获取MQ ClientInstance实例、启动。

//todo Producer的启动流程核心
public void start(final boolean startFactory) throws MQClientException {
    switch (this.serviceState) {
            //todo 第一次创建一定到这里
        case CREATE_JUST:
            this.serviceState = ServiceState.START_FAILED;
            //检查生产者组是否满足要求
            this.checkConfig(); // here
            //更改当前instanceName为进程ID
            if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                this.defaultMQProducer.changeInstanceNameToPID();
            }
            //todo 获得MQ客户端实例
            //整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
            //ConcurrentMap factoryTable = new ConcurrentHashMap();
            //同一个clientId只会创建一个MQClientInstance。
            //MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
            this.mQClientFactory = // here
                MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
            //注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求
            boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
            if (!registerOK) {
                this.serviceState = ServiceState.CREATE_JUST;
                throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                                            null);
            }
            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
            //启动生产者
            if (startFactory) {
                //todo 最终还是调用MQClientInstance
                mQClientFactory.start(); // here
            }

            log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                     this.defaultMQProducer.isSendMessageWithVIPChannel());
            this.serviceState = ServiceState.RUNNING;
            break;
        case RUNNING:
        case START_FAILED:
        case SHUTDOWN_ALREADY:
            throw new MQClientException("The producer service state not OK, maybe started once, "
                                        + this.serviceState
                                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                                        null);
        default:
            break;
    }

    this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                RequestFutureTable.scanExpiredRequest();
            } catch (Throwable e) {
                log.error("scan RequestFutureTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}
Producer消息发送流程

主题也是三个步骤

  • 验证消息

  • 查找路由

  • 选择队列

  • 消息发送

DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法

producer.send(msg, new SendCallback() { // send 方法
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
    }

    @Override
    public void onException(Throwable e) {
        System.out.printf("%-10d Exception %s %n", index, e);
        e.printStackTrace();
    }
});
// DefaultMQProducer.java
public void send(Message msg, SendCallback sendCallback){
    msg.setTopic(withNamespace(msg.getTopic()));
    this.defaultMQProducerImpl.send(msg, sendCallback); // here
}

public void send(Message msg,
                 SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {
    send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}

public void send(final Message msg, final SendCallback sendCallback, final long timeout) {
    final long beginStartTime = System.currentTimeMillis();
    ExecutorService executor = this.getAsyncSenderExecutor();
    try {
        executor.submit(new Runnable() {
            @Override
            public void run() {
                long costTime = System.currentTimeMillis() - beginStartTime;
                if (timeout > costTime) {
                    try {
                        sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime); // here
                    } catch (Exception e) {
                        sendCallback.onException(e);
                    }
                } else {
                    sendCallback.onException(
                        new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
                }
            }
        });
    } catch (RejectedExecutionException e) {
        throw new MQClientException("executor rejected ", e);
    }
}

//todo sendDefaultImpl()是生产者消息发送的核心方法
private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout) {
    this.makeSureStateOK();
    //todo 生产者消息发送步骤一:校验消息
    Validators.checkMessage(msg, this.defaultMQProducer);
...
    //todo 生产者消息发送步骤二:查找路由
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
...
        //重试次数
        int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
        int times = 0;
        String[] brokersSent = new String[timesTotal];
        for (; times < timesTotal; times++) {
            String lastBrokerName = null == mq ? null : mq.getBrokerName();
            //todo 生产者消息发送步骤三:选择队列
            MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
            if (mqSelected != null) {
                mq = mqSelected;
                brokersSent[times] = mq.getBrokerName();
                try {
                    beginTimestampPrev = System.currentTimeMillis();
                    if (times > 0) {
                        //Reset topic with namespace during resend.
                        msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                    }
                    long costTime = beginTimestampPrev - beginTimestampFirst;
                    if (timeout < costTime) {
                        callTimeout = true;
                        break;
                    }
                    //todo 生产者消息发送步骤四:调用sendKernelImpl进行消息发送
                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                    endTimestamp = System.currentTimeMillis();
                    //todo 发送正常,使根据发送时长计算broker不可用时长(duration)
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                    switch (communicationMode) {
                        case ASYNC:
                            return null;
                        case ONEWAY:
                            return null;
                        case SYNC:
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                    }
                } catch (RemotingException e) {
                    endTimestamp = System.currentTimeMillis();
                    //todo 发送异常,默认当前发送时长为30000L
                    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
...
                }
            } else {
                break;
            }
        }

        if (sendResult != null) {
            return sendResult;
        }
        
        throw mqClientException;
    }

    validateNameServerSetting();

    throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
验证消息
//todo 生产者消息发送步骤一:校验消息
Validators.checkMessage(msg, this.defaultMQProducer);
// Validators.java
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
    throws MQClientException {
    //判断是否为空
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // 校验主题
    Validators.checkTopic(msg.getTopic());
    Validators.isNotAllowedSendTopic(msg.getTopic());

    // 校验消息体
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }

    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }
    // 不能超过4M
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                                    "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}
查找路由
//todo 生产者消息发送步骤二:查找路由
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// DefaultMQProducerImpl#tryToFindTopicPublishInfo
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //从缓存中获得主题的路由信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    //路由信息为空,则从NameServer获取路由
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }

    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //如果未找到当前主题的路由信息,则用默认主题继续查找
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}
选择队列
//todo 生产者消息发送步骤三:选择队列
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
// DefaultMQProducerImpl#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
消息发送
// DefaultMQProducerImpl#sendDefaultImpl
// todo 生产者消息发送步骤四:调用sendKernelImpl进行消息发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

选择队列源码细究 默认选择队列策略

​ 采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

// MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //todo 默认不走这里:Broker故障延迟机制
    if (this.sendLatencyFaultEnable) {
        try {
            //todo 遍历队列,判断该Broker是否可用(),删除一段时间内不可用的Broker
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            //对消息队列轮询获取一个队列
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                //基于index和队列数量取余,确定位置
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0) {
                    pos = 0;
                }
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 测试可用性
                    return mq;
                }
            }
            //todo 如果预测的所有broker都不可用,则随机选择一个broker,随机选择该Broker下一个队列进行发送
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); // 听天由命了
            //获得Broker的写队列集合
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    //获得一个队列,指定broker和队列ID并返回
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        return tpInfo.selectOneMessageQueue();
    }
    //todo 默认走这里
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

// TopicPublishInfo#selectoneMessageQueue(java.lang.String)
// 默认不启用Broker故障延迟机制
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //todo lastBrokerName 是每次发送一条消息时保存的临时变量
    //第一次选择队列(非重试)
    if (lastBrokerName == null) { // 为空说明是第一次发送
        return selectOneMessageQueue();
    } else {
        //重试发送时的消息发送
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.getAndIncrement();
            //遍历消息队列集合
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0) {
                pos = 0;
            }
            //重试中,规避上次Broker队列
            MessageQueue mq = this.messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        //如果没有集群,或者没得选,还是按照第一次选择队列
        return selectOneMessageQueue();
    }
}
// TopicPublishInfo#selectoneMessageQueue()
//todo 默认选择选择队列
public MessageQueue selectOneMessageQueue() {
    //todo sendWhichQueue自增(这里使用的 ThreadLocal)
    int index = this.sendWhichQueue.getAndIncrement();
    //对队列大小取模
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0) {
        pos = 0;
    }
    //返回对应的队列
    return this.messageQueueList.get(pos);
}

​ 这种算法只要消息投递过程中没有发生重试的话,基本上可以保证每一个Queue队列的消息投递数量尽可能均匀。

​ 当然如果投递中发生问题,比如第一次投递就失败,那么很大的可能性是集群状态下的一台Broker挂了,所以在重试发送中进行规避。这样设置也是比较合理的。

​ 这里地方有一个注意的地方就是计数器使用了线程的ThreadLocal

//基于线程上下文的计数递增,用于轮询目的
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
// 
public class ThreadLocalIndex {
    private final ThreadLocal threadLocalIndex = new ThreadLocal();
    private final Random random = new Random();

    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = Math.abs(random.nextInt());
            this.threadLocalIndex.set(index);
        }
        index = Math.abs(index + 1);
        this.threadLocalIndex.set(index);
        return index;
    }

    @Override
    public String toString() {
        return "ThreadLocalIndex{" + "threadLocalIndex=" + threadLocalIndex.get() + '}';
    }
}

​ 因为本身消息的生产就可以多线程进行,所以当然要基于线程的上下文来计数递增

选择队列策略增强版(故障延迟机制)

​ 默认的投递方式比较简单,但是也暴露了一个问题,就是有些Queue队列可能由于自身数量积压等原因,可能在投递的过程比较长,对于这样的Queue队列会影响后续投递的效果。

​ 基于这种现象,RocketMQ在每发送一个MQ消息后,都会统计一下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些Queue队列投递的速度快。

​ 在这种场景下,会优先使用消息投递延迟最小的策略,如果没有生效,再使用Queue队列轮询的方式。

​ 具体的话实现使用了一个策略类:
​ 统计一下消息投递的时间延迟:org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem的实现

​ 记录的地方还是“消息发送流程”中核心方法中DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法

// 更具发送时常计算broker不可用时常
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
// DefaultMQProducerImpl#updateFaultItem
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

这里的 *** 作大概如下:

  1. 根据消息发送时长(currentLatency),计算broker不可用时长(duration),即如果消息发送时间越久,mq会认为broker不可用的时长越久,broker不可用时长是个经验值,如果传入isolation为true,表示默认当前发送时长为30000L,即broker不可用时长为600000L

  2. 调用latencyFaultTolerance.updateFaultItem更新broker异常容错信息。

这个方法最终会往一个ConcurrentHashMap表中写每台broker的延时、key是brokerName,value是currentLatency(延时)

// MQFaultStrategy#updateFaultItem
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        //获取不可用持续时长,在这个时间内,Broker将被规避(默认就是本次发送的延迟)
        //这里计算的
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}
// MQFaultStrategy#computeNotAvailableDuration
//todo 这里是一个根据发送延时来定义故障规避的时间
//发送延时{50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//故障规避{0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
//假如:消息发送时长为100毫秒,则mq预计broker的不可用时长为0毫秒
//假如:消息发送时长为600毫秒,则mq预计broker的不可用时长为30000毫秒
//假如:消息发送时长为4000毫秒,则mq预计broker的不可用时长为18000毫秒
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i]) {
            return this.notAvailableDuration[i];
        }
    }
    return 0;
}
//todo 阿里的经验值
//发送延时
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//故障规避
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

其关键点在于设置startTimestamp(意味broker预计可用的时间),这里使用的阿里的经验值

broker的预计恢复正常时间为:当前时间+不可用时长,即System.currentTimeMillis() + notAvailableDuration

// LatencyFaultToleranceImpl#updateFaultItem
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        //todo 设置broker的可用时间
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}
// LatencyFaultToleranceImpl
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance {
    private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap(16);
}

​ updateFaultItem的实现,一个broker对应一个faultItem,faultItem内容包含broker名称、消息发送时长、broker恢复正常的时间startTimestamp。

​ 其关键点在于设置startTimestamp(意味broker预计可用的时间),什么意思呢,假设某次消息发送时长为4000毫秒,则mq预计broker的不可用时长为18000L(根据latencyMax数组,notAvailableDuration数组对应关系得到),则broker的预计恢复正常时间为:当前时间+不可用时长,即System.currentTimeMillis() + notAvailableDuration

​ 因此LatencyFaultToleranceImpl#isAvailable判断broker是否预计可用的实现也很清晰了,只要当前时间>startTimestamp,即表示该broker正常了(逻辑意义上的正常,预计broker会在这个时间点后恢复正常)

// MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //todo 默认不走这里:Broker故障延迟机制
    if (this.sendLatencyFaultEnable) {
        try {
            //todo 遍历队列,判断该Broker是否可用(),删除一段时间内不可用的Broker
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            //对消息队列轮询获取一个队列
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                //基于index和队列数量取余,确定位置
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0) {
                    pos = 0;
                }
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // here
                    return mq;
                }
            }
        }
    }
    ...
}
@Override
public boolean isAvailable(final String name) {
    final FaultItem faultItem = this.faultItemTable.get(name);
    if (faultItem != null) {
        return faultItem.isAvailable(); // here
    }
    return true;
}

// 这其实是一个经验计算值, 不是发起网络测试了连通性
public boolean isAvailable() {
    return (System.currentTimeMillis() - startTimestamp) >= 0;
}

整体实现思路:

  1. 在消息发送失败,mq根据消息发送耗时来预测该broker不可用的时长,并将broker名称,及**”预计恢复时长**“,存储于ConcurrentHashMap faultItemTable中

  2. 在开启消息容错后,选择消息队列时,会根据当前时间与FaultItem中该broker的预计恢复时间做比较,若(System.currentTimeMillis() - startTimestamp) >= 0,则预计该broker恢复正常,选择该broker的消息队列

  3. 若所有的broker都预计不可用,随机选择一个不可用的broker,从路由信息中选择下一个消息队列,重置其brokerName,queueId,进行消息发送

选择队列策略的对比

​ 在默认队列选择机制下,会随机选择一个MessageQueue,若发送失败,轮询队列重新进行重试发送(屏蔽单次发送中不可用的broker),同步模式下默认失败时重试发送2次,但它的问题就是消息发送很大可能再次失败,引发再次重复失败,带来不必要的性能损耗。

​ 在开启故障延迟机制后,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用。

​ 这两个策略没有绝对的好与坏,如果工作中选择,应该是看网络环境和服务器的环境。

如果是网络和服务器环境比较好,那么我推荐默认策略,毕竟重试的次数和几率比较小。(环境好)的话使用默认模式)

如果是网络和服务器环境压力比较大,推荐使用故障延迟机制。(容易故障时候开启故障模式)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5636386.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存