DefaultMQProducer producer = new DefaultMQProducer("group"); producer.start(); // DefaultMQProducer.java public void start() throws MQClientException { this.setProducerGroup(withNamespace(this.producerGroup)); //todo Producer的启动流程核心是defaultMQProducerImpl.start(); this.defaultMQProducerImpl.start(); // here if (null != traceDispatcher) { try { traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel()); } catch (MQClientException e) { log.warn("trace dispatcher start failed ", e); } } }
DefaultMQProducerImpl中的start()方法是生产者启动的核心方法
核心三个方法: 检查、获取MQ ClientInstance实例、启动。
//todo Producer的启动流程核心 public void start(final boolean startFactory) throws MQClientException { switch (this.serviceState) { //todo 第一次创建一定到这里 case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; //检查生产者组是否满足要求 this.checkConfig(); // here //更改当前instanceName为进程ID if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) { this.defaultMQProducer.changeInstanceNameToPID(); } //todo 获得MQ客户端实例 //整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表 //ConcurrentMapProducer消息发送流程factoryTable = new ConcurrentHashMap (); //同一个clientId只会创建一个MQClientInstance。 //MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道 this.mQClientFactory = // here MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook); //注册当前生产者到到MQClientInstance管理中,方便后续调用网路请求 boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo()); //启动生产者 if (startFactory) { //todo 最终还是调用MQClientInstance mQClientFactory.start(); // here } log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(), this.defaultMQProducer.isSendMessageWithVIPChannel()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The producer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { RequestFutureTable.scanExpiredRequest(); } catch (Throwable e) { log.error("scan RequestFutureTable exception", e); } } }, 1000 * 3, 1000); }
主题也是三个步骤
-
验证消息
-
查找路由
-
选择队列
-
消息发送
DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法
producer.send(msg, new SendCallback() { // send 方法 @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); // DefaultMQProducer.java public void send(Message msg, SendCallback sendCallback){ msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.send(msg, sendCallback); // here } public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException { send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout()); } public void send(final Message msg, final SendCallback sendCallback, final long timeout) { final long beginStartTime = System.currentTimeMillis(); ExecutorService executor = this.getAsyncSenderExecutor(); try { executor.submit(new Runnable() { @Override public void run() { long costTime = System.currentTimeMillis() - beginStartTime; if (timeout > costTime) { try { sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime); // here } catch (Exception e) { sendCallback.onException(e); } } else { sendCallback.onException( new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout")); } } }); } catch (RejectedExecutionException e) { throw new MQClientException("executor rejected ", e); } } //todo sendDefaultImpl()是生产者消息发送的核心方法 private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout) { this.makeSureStateOK(); //todo 生产者消息发送步骤一:校验消息 Validators.checkMessage(msg, this.defaultMQProducer); ... //todo 生产者消息发送步骤二:查找路由 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { ... //重试次数 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); //todo 生产者消息发送步骤三:选择队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } //todo 生产者消息发送步骤四:调用sendKernelImpl进行消息发送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); //todo 发送正常,使根据发送时长计算broker不可用时长(duration) this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); //todo 发送异常,默认当前发送时长为30000L this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); ... } } else { break; } } if (sendResult != null) { return sendResult; } throw mqClientException; } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }验证消息
//todo 生产者消息发送步骤一:校验消息 Validators.checkMessage(msg, this.defaultMQProducer); // Validators.java public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { //判断是否为空 if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // 校验主题 Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); // 校验消息体 if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } // 不能超过4M if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } }查找路由
//todo 生产者消息发送步骤二:查找路由 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // DefaultMQProducerImpl#tryToFindTopicPublishInfo private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) { //从缓存中获得主题的路由信息 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); //路由信息为空,则从NameServer获取路由 if (null == topicPublishInfo || !topicPublishInfo.ok()) { this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); topicPublishInfo = this.topicPublishInfoTable.get(topic); } if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { return topicPublishInfo; } else { //如果未找到当前主题的路由信息,则用默认主题继续查找 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); topicPublishInfo = this.topicPublishInfoTable.get(topic); return topicPublishInfo; } }选择队列
//todo 生产者消息发送步骤三:选择队列 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // DefaultMQProducerImpl#selectOneMessageQueue public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }消息发送
// DefaultMQProducerImpl#sendDefaultImpl // todo 生产者消息发送步骤四:调用sendKernelImpl进行消息发送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);选择队列源码细究 默认选择队列策略
采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); } // MQFaultStrategy#selectOneMessageQueue public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //todo 默认不走这里:Broker故障延迟机制 if (this.sendLatencyFaultEnable) { try { //todo 遍历队列,判断该Broker是否可用(),删除一段时间内不可用的Broker int index = tpInfo.getSendWhichQueue().getAndIncrement(); //对消息队列轮询获取一个队列 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { //基于index和队列数量取余,确定位置 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) { pos = 0; } MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 测试可用性 return mq; } } //todo 如果预测的所有broker都不可用,则随机选择一个broker,随机选择该Broker下一个队列进行发送 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); // 听天由命了 //获得Broker的写队列集合 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { //获得一个队列,指定broker和队列ID并返回 mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } //todo 默认走这里 return tpInfo.selectOneMessageQueue(lastBrokerName); } // TopicPublishInfo#selectoneMessageQueue(java.lang.String) // 默认不启用Broker故障延迟机制 public MessageQueue selectOneMessageQueue(final String lastBrokerName) { //todo lastBrokerName 是每次发送一条消息时保存的临时变量 //第一次选择队列(非重试) if (lastBrokerName == null) { // 为空说明是第一次发送 return selectOneMessageQueue(); } else { //重试发送时的消息发送 for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.getAndIncrement(); //遍历消息队列集合 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) { pos = 0; } //重试中,规避上次Broker队列 MessageQueue mq = this.messageQueueList.get(pos); if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } //如果没有集群,或者没得选,还是按照第一次选择队列 return selectOneMessageQueue(); } } // TopicPublishInfo#selectoneMessageQueue() //todo 默认选择选择队列 public MessageQueue selectOneMessageQueue() { //todo sendWhichQueue自增(这里使用的 ThreadLocal) int index = this.sendWhichQueue.getAndIncrement(); //对队列大小取模 int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) { pos = 0; } //返回对应的队列 return this.messageQueueList.get(pos); }
这种算法只要消息投递过程中没有发生重试的话,基本上可以保证每一个Queue队列的消息投递数量尽可能均匀。
当然如果投递中发生问题,比如第一次投递就失败,那么很大的可能性是集群状态下的一台Broker挂了,所以在重试发送中进行规避。这样设置也是比较合理的。
这里地方有一个注意的地方就是计数器使用了线程的ThreadLocal
//基于线程上下文的计数递增,用于轮询目的 private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // public class ThreadLocalIndex { private final ThreadLocalthreadLocalIndex = new ThreadLocal (); private final Random random = new Random(); public int getAndIncrement() { Integer index = this.threadLocalIndex.get(); if (null == index) { index = Math.abs(random.nextInt()); this.threadLocalIndex.set(index); } index = Math.abs(index + 1); this.threadLocalIndex.set(index); return index; } @Override public String toString() { return "ThreadLocalIndex{" + "threadLocalIndex=" + threadLocalIndex.get() + '}'; } }
因为本身消息的生产就可以多线程进行,所以当然要基于线程的上下文来计数递增
选择队列策略增强版(故障延迟机制) 默认的投递方式比较简单,但是也暴露了一个问题,就是有些Queue队列可能由于自身数量积压等原因,可能在投递的过程比较长,对于这样的Queue队列会影响后续投递的效果。
基于这种现象,RocketMQ在每发送一个MQ消息后,都会统计一下消息投递的时间延迟,根据这个时间延迟,可以知道往哪些Queue队列投递的速度快。
在这种场景下,会优先使用消息投递延迟最小的策略,如果没有生效,再使用Queue队列轮询的方式。
具体的话实现使用了一个策略类:
统计一下消息投递的时间延迟:org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem的实现
记录的地方还是“消息发送流程”中核心方法中DefaultMQProducerImpl中的sendDefaultImpl()是生产者消息发送的核心方法
// 更具发送时常计算broker不可用时常 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); // DefaultMQProducerImpl#updateFaultItem public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); }
这里的 *** 作大概如下:
-
根据消息发送时长(currentLatency),计算broker不可用时长(duration),即如果消息发送时间越久,mq会认为broker不可用的时长越久,broker不可用时长是个经验值,如果传入isolation为true,表示默认当前发送时长为30000L,即broker不可用时长为600000L
-
调用latencyFaultTolerance.updateFaultItem更新broker异常容错信息。
这个方法最终会往一个ConcurrentHashMap表中写每台broker的延时、key是brokerName,value是currentLatency(延时)
// MQFaultStrategy#updateFaultItem public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { //获取不可用持续时长,在这个时间内,Broker将被规避(默认就是本次发送的延迟) //这里计算的 long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } } // MQFaultStrategy#computeNotAvailableDuration //todo 这里是一个根据发送延时来定义故障规避的时间 //发送延时{50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; //故障规避{0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; //假如:消息发送时长为100毫秒,则mq预计broker的不可用时长为0毫秒 //假如:消息发送时长为600毫秒,则mq预计broker的不可用时长为30000毫秒 //假如:消息发送时长为4000毫秒,则mq预计broker的不可用时长为18000毫秒 private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax.length - 1; i >= 0; i--) { if (currentLatency >= latencyMax[i]) { return this.notAvailableDuration[i]; } } return 0; } //todo 阿里的经验值 //发送延时 private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; //故障规避 private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
其关键点在于设置startTimestamp(意味broker预计可用的时间),这里使用的阿里的经验值
broker的预计恢复正常时间为:当前时间+不可用时长,即System.currentTimeMillis() + notAvailableDuration
// LatencyFaultToleranceImpl#updateFaultItem public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { FaultItem old = this.faultItemTable.get(name); if (null == old) { final FaultItem faultItem = new FaultItem(name); faultItem.setCurrentLatency(currentLatency); //todo 设置broker的可用时间 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); old = this.faultItemTable.putIfAbsent(name, faultItem); if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } // LatencyFaultToleranceImpl public class LatencyFaultToleranceImpl implements LatencyFaultTolerance{ private final ConcurrentHashMap faultItemTable = new ConcurrentHashMap (16); }
updateFaultItem的实现,一个broker对应一个faultItem,faultItem内容包含broker名称、消息发送时长、broker恢复正常的时间startTimestamp。
其关键点在于设置startTimestamp(意味broker预计可用的时间),什么意思呢,假设某次消息发送时长为4000毫秒,则mq预计broker的不可用时长为18000L(根据latencyMax数组,notAvailableDuration数组对应关系得到),则broker的预计恢复正常时间为:当前时间+不可用时长,即System.currentTimeMillis() + notAvailableDuration
因此LatencyFaultToleranceImpl#isAvailable判断broker是否预计可用的实现也很清晰了,只要当前时间>startTimestamp,即表示该broker正常了(逻辑意义上的正常,预计broker会在这个时间点后恢复正常)
// MQFaultStrategy#selectOneMessageQueue public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { //todo 默认不走这里:Broker故障延迟机制 if (this.sendLatencyFaultEnable) { try { //todo 遍历队列,判断该Broker是否可用(),删除一段时间内不可用的Broker int index = tpInfo.getSendWhichQueue().getAndIncrement(); //对消息队列轮询获取一个队列 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { //基于index和队列数量取余,确定位置 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) { pos = 0; } MessageQueue mq = tpInfo.getMessageQueueList().get(pos); if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // here return mq; } } } } ... } @Override public boolean isAvailable(final String name) { final FaultItem faultItem = this.faultItemTable.get(name); if (faultItem != null) { return faultItem.isAvailable(); // here } return true; } // 这其实是一个经验计算值, 不是发起网络测试了连通性 public boolean isAvailable() { return (System.currentTimeMillis() - startTimestamp) >= 0; }
整体实现思路:
-
在消息发送失败,mq根据消息发送耗时来预测该broker不可用的时长,并将broker名称,及**”预计恢复时长**“,存储于ConcurrentHashMap faultItemTable中
-
在开启消息容错后,选择消息队列时,会根据当前时间与FaultItem中该broker的预计恢复时间做比较,若(System.currentTimeMillis() - startTimestamp) >= 0,则预计该broker恢复正常,选择该broker的消息队列
-
若所有的broker都预计不可用,随机选择一个不可用的broker,从路由信息中选择下一个消息队列,重置其brokerName,queueId,进行消息发送
在默认队列选择机制下,会随机选择一个MessageQueue,若发送失败,轮询队列重新进行重试发送(屏蔽单次发送中不可用的broker),同步模式下默认失败时重试发送2次,但它的问题就是消息发送很大可能再次失败,引发再次重复失败,带来不必要的性能损耗。
在开启故障延迟机制后,消息队列选择时,会在一段时间内过滤掉RocketMQ认为不可用的broker,以此来避免不断向宕机的broker发送消息,从而实现消息发送高可用。
这两个策略没有绝对的好与坏,如果工作中选择,应该是看网络环境和服务器的环境。
如果是网络和服务器环境比较好,那么我推荐默认策略,毕竟重试的次数和几率比较小。(环境好)的话使用默认模式)
如果是网络和服务器环境压力比较大,推荐使用故障延迟机制。(容易故障时候开启故障模式)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)