查看DefaultMQProducerImpl.sendDefaultImpl()方法,
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); ... // 获取topic信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 重试发送的次数, retryTimesWhenSendFailed = 2 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { // 选择一个broker // 在第一次循环时,lastBrokerName为空,在其后的循环中,lastBrokerName不为空 String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 选择一个messageQueue MessageQueue mqSelected = this.selectoneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); ... } ... } ... }
这里会选取一个messageQueue进行消息发送,在构造requestHeader时,传入queueId,
requestHeader.setQueueId(mq.getQueueId());
现在来看看是如何选取messageQueue的
public MessageQueue selectoneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectoneMessageQueue(tpInfo, lastBrokerName); }
这里mqFaultStrategy是MQFaultStrategy
public MessageQueue selectoneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 如果启用了故障延迟机制 // 默认不启用 if (this.sendLatencyFaultEnable) { ... } return tpInfo.selectoneMessageQueue(lastBrokerName); }
故障延迟机制是不开启的,会直接走tpInfo.selectoneMessageQueue(lastBrokerName)
public MessageQueue selectoneMessageQueue(final String lastBrokerName) { // 第一次发送时,lastBrokerName为空 if (lastBrokerName == null) { return selectoneMessageQueue(); } else { for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); // 对messageQueueList.size()取模,定位到一个下标 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 和上一次broker一样时,不返回 // 规避上一次发送失败的broker if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectoneMessageQueue(); } }
这里,在外层第一次发送时,lastBrokerName为空,
public MessageQueue selectoneMessageQueue() { // int index = this.sendWhichQueue.incrementAndGet(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
这里是通过this.sendWhichQueue累加然后取模,定位到一个下标,然后定位到一个messageQueue
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
而sendWhichQueue又是一个ThreadLocalIndex
public class ThreadLocalIndex { private final ThreadLocalthreadLocalIndex = new ThreadLocal (); private final Random random = new Random(); public int incrementAndGet() { Integer index = this.threadLocalIndex.get(); if (null == index) { index = Math.abs(random.nextInt()); this.threadLocalIndex.set(index); } this.threadLocalIndex.set(++index); return Math.abs(index); } @Override public String toString() { return "ThreadLocalIndex{" + "threadLocalIndex=" + threadLocalIndex.get() + '}'; } }
这里就可以认为sendWhichQueue是保存在threadLocal中,线程独有,当第一次获取时,会使用random生成一个随机数,以后只要累加使用即可。
回到上面的流程,外层第一次发送时,lastBrokerName为空,如果第一次发送失败了,
那么for需要会在循环几次发送,同步发送时,默认是3次,第二次重试发送时,lastBrokerName就不为空了,并且会重新选取一个messageQueue
public MessageQueue selectoneMessageQueue(final String lastBrokerName) { // 第一次发送时,lastBrokerName为空 if (lastBrokerName == null) { ... } else { // 重试发送时,走这里 for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.incrementAndGet(); // 对messageQueueList.size()取模,定位到一个下标 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 和上一次broker一样时,不返回 // 规避上一次发送失败的broker if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectoneMessageQueue(); } }
这里就会直接对sendWhichQueue累加,然后取模,获取一个下标,但是多了一个判断,如果本次选取的brokerName和上一次的相同,即同个broker,会重新进行messageQueue选举的,因为这里是重试发送,认为上一次发送的目标broker不可用,这里避免请求发送到同一个broker上去
线程内轮询
---------------------------------------------------------------------------------------------------------------------------------
rocketmq提供了故障延迟机制
生产者发送消息,需要发送到指定的MessageQueue上,如果发送失败了,则很可能说明这个MessageQueue所在的broker出现了某种问题,则在发送下一条消息或者重试的时候,需要尽可能的避免上次失败的broker。在rocketmq中,MQFaultStrategy负责做这件事情。
————————————————
版权声明:本文为CSDN博主「pumpkin_pk」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/yuxiuzhiai/article/details/103740627
MQFaultStrategy里面有一个变量
private final LatencyFaultTolerancelatencyFaultTolerance = new LatencyFaultToleranceImpl();
而latencyFaultTolerance中其实是维护了一个map
private final ConcurrentHashMapfaultItemTable = new ConcurrentHashMap (16);
这里面维护了故障broker的信息
class FaultItem implements Comparable{ // brokerName private final String name; // 这次发送消息到出现异常的时间,或者是发送耗时 private volatile long currentLatency; // 可用时间,在这个时间之前,这个broker不可用,认为处于故障状态 private volatile long startTimestamp; ... }
来看主流程,选取messageQueue时
// 如果启用了故障延迟机制 // 默认不启用 if (this.sendLatencyFaultEnable) { try { int index = tpInfo.getSendWhichQueue().incrementAndGet(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 判断messageQueue对应的brokerName是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } // 走到这里,所有broker都不可用 // 从排序之后的故障的broker列表的前半部分,选取一个broker final String notBestBroker = latencyFaultTolerance.pickoneAtLeast(); // 获取写队列的数量 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { // writeQueueNums >0 说明notBestBroker还没有恢复正常 // 随机选取一个messageQueue final MessageQueue mq = tpInfo.selectoneMessageQueue(); // TODO 这里没看懂? if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums); } return mq; } // 这里writeQueueNums <=0, 在tpInfo.getQueueIdByBroker(notBestBroker)中, // 如果在故障broker列表中未找到notBestBroker时,返回-1 // 即notBestBroker恢复正常 else { // 从故障列表中移除notBestBroker latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // 兜底走轮询策略 return tpInfo.selectoneMessageQueue(); }
上面是取的时候判断故障信息,那故障信息是在哪里维护的呢,回到发送的地方
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 获取topic信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { ... for (; times < timesTotal; times++) { // 选择一个broker, // 在第一次循环时,lastBrokerName为空,在其后的循环中,lastBrokerName不为空 String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 选择一个messageQueue MessageQueue mqSelected = this.selectoneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); ... // 执行发送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); log.info("发送结果:{}", sendResult); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); ... } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); ... } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); ... } catch (MQBrokerException e) { log.info("发送异常:", e); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); ... } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); ... } } else { break; } } ... } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
--
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); }
--
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { // 故障时isolation=true // 正常或者InterruptedException时,isolation= false if (this.sendLatencyFaultEnable) { // 计算故障延迟时间, // 不管是不是真的故障,这里有可能都是做延迟 // 故障时,传递30000, 去延时的最大值进行延迟 // 非故障时,根据发送的耗时,来动态确定延迟 long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); // 更新故障map this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } }
--
private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; }
--
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
从数组中可以看出来,如果发送耗时特别大,也会触发延迟机制的
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { // 新故障 final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); // 并发更新 old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { // 已经处于故障状态, old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)