KafkaApis模块是Kafka中负责不同业务请求的具体实现逻辑,本文主要讲一下KafkaApis处理ProducerRequest请求的流程。
当生产者发送消息保存至kafka集群或者高级消费者发送偏移量保存至kafka集群时,都会发生此类请求,对于后者可以看作是一类特殊的消息,里面保存的是特定topic的偏移量。当Broker Server接收到ProducerRequest时,主要完成两个 *** 作:1)持久化消息,2)组装响应格式。其详细的实现过程如下:
def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) { val (produceRequest, offsetCommitRequestOpt) = //如果是高级消费者发送的,则需要包装成ProduceRequest if (request.requestId == RequestKeys.OffsetCommitKey) { val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest] OffsetCommitRequest.changeInvalidTimeToCurrentTime(offsetCommitRequest) (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest)) } else { (request.requestObj.asInstanceOf[ProducerRequest], None) } //检验requiredAcks的有效性,ack只能是1,0,-1 if (produceRequest.requiredAcks > 1 || produceRequest.requiredAcks < -1) { warn(("Client %s from %s sent a produce request with request.required.acks of %d, which is now deprecated and will " + "be removed in next release. Valid values are -1, 0 or 1. Please consult Kafka documentation for supported " + "and recommended configuration.").format(produceRequest.clientId, request.remoteAddress, produceRequest.requiredAcks)) } //将消息持久化到本地 val sTime = SystemTime.milliseconds val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty) //过滤异常错误 val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError) //统计出现错误的partition个数 val numPartitionsInError = localProduceResults.count(_.error.isDefined) if(produceRequest.requiredAcks == 0) { if (numPartitionsInError != 0) { //当ack=0时,如果出现持久化消息异常,则主动关闭连接 info(("Send the close connection response due to error handling produce request " + "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0") .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(","))) requestChannel.closeConnection(request.processor, request) } else { if (firstErrorCode == ErrorMapping.NoError) //没有任何错误,则将高级消费者发送的偏移量保存至offsetManager的内存中 offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo)) if (offsetCommitRequestOpt.isDefined) { //如果是高级消费者发送的,即使ack=0,也需要将消息的持久化结果返回给消费者 val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetmetadataMaxSize) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } else requestChannel.noOperation(request.processor, request) } } else if (produceRequest.requiredAcks == 1 || produceRequest.numPartitions <= 0 || numPartitionsInError == produceRequest.numPartitions) { //ack=1或者目标分区个数无效或者持久化全部失败的情况下,需要返回具体的执行结果 if (firstErrorCode == ErrorMapping.NoError) { //没有任何错误,则将高级消费者发送的偏移量保存在offsetManager的内存中。 offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) ) } //将具体的请求执行结果返回给客户端 val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetmetadataMaxSize)) .getOrElse(ProducerResponse(produceRequest.correlationId, statuses)) requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response))) } else { // ack=-1,需要等待(min.insync.replicas-1)个副本同步数据后才返回相应 val producerRequestKeys = produceRequest.data.keys.toSeq val statuses = localProduceResults.map(r => r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap val delayedRequest = new DelayedProduce( producerRequestKeys, request, produceRequest.ackTimeoutMs.toLong, produceRequest, statuses, offsetCommitRequestOpt) //判断请求是否满足返回的条件,如果满足就返回,否则延迟返回响应 val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest) if (satisfiedByMe) producerRequestPurgatory.respond(delayedRequest) } // we do not need the data anymore produceRequest.emptyData() }
其中,produceRequest.requiredAcks 即ack有3个可选值,分别是1,0,-1。
1)、ack=1,简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。
注意,ack的默认值就是1。这个默认值其实就是吞吐量与可靠性的一个折中方案。生产上我们可以根据实际情况进行调整,比如如果你要追求高吞吐量,bioj那么就要放弃可靠性。
2)、ack=0,简单来说就是,producer不关心Broker Server持久化是否成功,只需要Broker接收到消息就可以了。
3)、ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。
当ack=0,或者1的时候比较好理解,那么当ack=-1的时候,kafka是如何做得延时答复请求的呢?它采取的是一种叫做Purgatory的策略,下面介绍一下这种策略的实现原理。
DelayedProduce的定义如下:
class DelayedProduce(override val keys: Seq[TopicAndPartition], override val request: RequestChannel.Request, override val delayMs: Long, val produce: ProducerRequest, val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus], val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None) extends DelayedRequest(keys, request, delayMs) with Logging {
DelayedProduce内部有个变量keys,针对当前ProducerRequest中不同的TopicAndPartition对DelayedProduce进行分类。
我们看一下RequestPurgatory的定义
abstract class RequestPurgatory[T <: DelayedRequest]( brokerId: Int = 0, purgeInterval: Int = 1000) extends Logging with KafkaMetricsGroup { //针对DelayedProduce内部不同的TopicAndPartition产生对应的watchers,watchers内部保存对应的 DelayedProduce private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers)) private val expiredRequestReaper = new ExpiredRequestReaper private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false)
RequestPurgatory内部不同的TopicAndPartition产生对应的watchers,watchers内部的requests保存相同的TopicAndPartition对应的DelayedProduce。它们的关系如下:
接下来我们看一下什么时候会触发Broker Server回复DelayedProduce的响应,只有当该Broker Server上对应的partition的HighWaterMark发生变化时才会去检查是否满足条件。如果满足就返回,否则继续等待直到超时。判断是否满足的代码如下:
def isSatisfied(replicaManager: ReplicaManager) = { partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) => // 判断每个分区的数据是否在等待其它Broker server来获取 if (fetchPartitionStatus.acksPending) { //获取对应的partition对象 val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) //判断该partition是否有足够的副本达到了指定的偏移量 val (hasEnough, errorCode) = partitionOpt match { case Some(partition) => partition.checkEnoughReplicasReachOffset( fetchPartitionStatus.requiredOffset, produce.requiredAcks) case None => (false, ErrorMapping.UnknownTopicOrPartitionCode) } //如果发生异常,则返回错误 if (errorCode != ErrorMapping.NoError) { fetchPartitionStatus.acksPending = false fetchPartitionStatus.responseStatus.error = errorCode } else if (hasEnough) { //当hasEnough为true,设置acksPending为false fetchPartitionStatus.acksPending = false fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError } } } // 只有该DelayedProduce所有的keys都满足的时候才认为满足 val satisfied = ! partitionStatus.exists(p => p._2.acksPending) satisfied }
只有该DelayedProduce所有的keys对应的不同TopicAndPartition都满足的时候才认为该请求满足答复的条件,TopicAndPartition判断是否满足的代码如下:
def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = { leaderReplicaIfLocal() match { case Some(leaderReplica) => //获取ISR列表 val curInSyncReplicas = inSyncReplicas //ISR中每个Replica的记录偏移量是否大于指定大小 val numAcks = curInSyncReplicas.count(r => { if (!r.isLocal) r.logEndOffset.messageOffset >= requiredOffset else true }) //获取min.insync.replicas val minIsr = leaderReplica.log.get.config.minInSyncReplicas //如果ack<0且highwatermark大于指定大小 if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) { 判断minIsr是否小于当前isr if (minIsr <= curInSyncReplicas.size) { (true, ErrorMapping.NoError) } else { (true, ErrorMapping.NotEnoughReplicasAfterAppendCode) } } else if (requiredAcks > 0 && numAcks >= requiredAcks) { (true, ErrorMapping.NoError) } else (false, ErrorMapping.NoError) case None => (false, ErrorMapping.NotLeaderForPartitionCode) } }
Partition主要判断Leader Replica的HighWaterMark是否大于指定大小,则说明ISR中所有的replica都Fetch到足够的数据了,并且ISR的大小是否大于等于指定的minIsr,如果满足就说明Broker Server能回复这些延时请求了。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)