- 源码版本
- 消息发送源码源码入口
- 消息发送核心源码
- Product发送消息的重试机制
- Broker故障转移机制
- 选择MessageQueue源码
- 总结
- 关于我
消息发送源码源码入口4.8.0
最简单的消息发送代码
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message msg = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg);
由于我们今天重点分析的是RocketMQ 消息发送MessgQueue选择及高可用机制
所以
producer.start();
这段代码我们不看,我们重点进入producer.send(msg);这段源码。
深入进入后我们会看到发送消息的核心源码
核心方法是org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 生产者状态检查 this.makeSureStateOK(); // 外面不是校验一次了吗?消息校验 Validators.checkMessage(msg, this.defaultMQProducer); // 获取随机id final long invokeID = random.nextLong(); // 开始时间 long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 从缓存的topic路由表中获取topic路由,不存在则向 NameServer发起查找 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 获取总发送次数 如果同步发送 则 发送消息失败会重试2次 其他发送方式不会重新发送 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 选择发送到哪个 MessageQueue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } // 计算一下发送消耗的时间 long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // 真正的消息发送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); // 如果是同步发送处理返回结果 switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: // 如果执行不成功 进行重试 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQClientException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } catch (MQBrokerException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; switch (e.getResponseCode()) { case ResponseCode.TOPIC_NOT_EXIST: case ResponseCode.SERVICE_NOT_AVAILABLE: case ResponseCode.SYSTEM_ERROR: case ResponseCode.NO_PERMISSION: case ResponseCode.NO_BUYER_ID: case ResponseCode.NOT_IN_CURRENT_UNIT: continue; default: if (sendResult != null) { return sendResult; } throw e; } } catch (InterruptedException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); log.warn("sendKernelImpl exception", e); log.warn(msg.toString()); throw e; } } else { break; } } if (sendResult != null) { return sendResult; } String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst, msg.getTopic(), Arrays.toString(brokersSent)); info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED); MQClientException mqClientException = new MQClientException(info, exception); if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } if (exception instanceof MQBrokerException) { mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode()); } else if (exception instanceof RemotingConnectException) { mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION); } else if (exception instanceof RemotingTimeoutException) { mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT); } else if (exception instanceof MQClientException) { mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION); } throw mqClientException; } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
整个核心代码流程很长,我们今天的重点是这行代码。这行代码就是去选择消息发送到Broker的哪个MessageQueue
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);Product发送消息的重试机制
在发送消息中我们可以看到如下代码
// DefaultMQProducer.retryTimesWhenSendFailed private int retryTimesWhenSendFailed = 2; // 获取总发送次数 如果同步发送 则 发送消息失败会重试2次 其他发送方式不会重新发送 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; for (; times < timesTotal; times++) { // 消息发送 }
可以看到生产者发送消息如果是同步发送消息默认会重试发送两次消息(由参数retryTimesWhenSendFailed设置),这里是基于重试去解决异常消息的发送
Broker故障转移机制在上面的发送消息源码中我们会发现所有的异常都会在catch中调用了如下方法
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); // ----------------------------------------------------- // org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { // 故障 if (this.sendLatencyFaultEnable) { // 如果不可用隔离时间 30s long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } // ------------------------------------------------------ private final ConcurrentHashMapfaultItemTable = new ConcurrentHashMap (16); // org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem @Override public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { // 有问题列表 FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
这里就是将消息发送失败的Broker保存在一个ConcurrentHashMap中,后续再检测Broker是否可用的时候会用到
选择MessageQueue源码在上面发送消息的一些product重试和Broker问题规避方法分析完后我们来分析选择MessageQueue的核心方法
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
从selectOneMessageQueue方法深入后看到selectOneMessageQueue的如下源码
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 是否启动Broker故障延迟 if (this.sendLatencyFaultEnable) { try { // 自增取值 简单的轮训 通过 ThreadLocal 维护 int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { // index与当前路由表中的对列总个数取模 简单轮询 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; // 获取到当前对应的待发送队列 MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 校验队列是否可用 实际是检测Broker是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } // 尝试从规避的Broker中选择一个可用的Broker,如果没有找到,将返回null。 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); // 取broker中的可写队列数 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); // 如果可写队列数>0,则选取一个队列 if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { // // 可写队列数 <= 0 移除该broker latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } // 选择 messageQueue return tpInfo.selectOneMessageQueue(); }
其中有一个核心判断
private boolean sendLatencyFaultEnable = false; if (this.sendLatencyFaultEnable)
sendLatencyFaultEnable 这个参数是RocketMQ消息发送高可用的核心关键参数,默认不开启,开启后就会自动规避故障的Broker。
我们先看不开启sendLatencyFaultEnable 最简单的方式
return tpInfo.selectOneMessageQueue(lastBrokerName); // org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectoneMessageQueue(java.lang.String) public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } return selectOneMessageQueue(); } }
可以看到这种规避算法很简单,就是只要broker不等于上次异常的Broker就可以了
我们重点来分析下
if (this.sendLatencyFaultEnable) {} 里面的代码
- 获取messageQueueList 列表
- 通过ThreadLocal使用简单的轮训算法去获取MessageQueue
- 通过faultItemTable 去检测MessageQueue所在的Broker是否可用,可用则返回,不可用则继续轮训下一个MessageQueue
- 如果所有Broker 都不可用 则通过不是最差随机法 选取一个Broker,如果这个队列有可写MessageQueue则返回,一个都没有则移除该Broker
至此RocketMQ 消息发送MessgQueue选择及高可用机制源码分析就到此结束了
总结RocketMQ 消息发送MessgQueue选择及高可用机制
- Product 发送消息重试
- 对异常Broker进行内存规避
关于我NameServer检测Broker是否可用是有延迟的,最短为一次心跳检测间隔(10s);其次,NameServer不会检测到Broker宕机后马上推送消息给消息生产者,而是消息生产者每隔30s更新一次路由信息,所以消息生产者最快感知Broker最新的路由信息也需要30s。如果能引入一种机制,在Broker宕机期间,如果一次消息发送失败后,可以将该Broker暂时排除在消息队列的选择范围中
纯技术干货博主,原创不易,觉得文章不错请扫码关注我吧
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)