KafkaApis处理ProducerRequest请求源码解析

KafkaApis处理ProducerRequest请求源码解析,第1张

KafkaApis处理ProducerRequest请求源码解析

KafkaApis模块是Kafka中负责不同业务请求的具体实现逻辑,本文主要讲一下KafkaApis处理ProducerRequest请求的流程。

当生产者发送消息保存至kafka集群或者高级消费者发送偏移量保存至kafka集群时,都会发生此类请求,对于后者可以看作是一类特殊的消息,里面保存的是特定topic的偏移量。当Broker Server接收到ProducerRequest时,主要完成两个 *** 作:1)持久化消息,2)组装响应格式。其详细的实现过程如下:

def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
    val (produceRequest, offsetCommitRequestOpt) =
      //如果是高级消费者发送的,则需要包装成ProduceRequest
      if (request.requestId == RequestKeys.OffsetCommitKey) {
        val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
        OffsetCommitRequest.changeInvalidTimeToCurrentTime(offsetCommitRequest)
        (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
      } else {
        (request.requestObj.asInstanceOf[ProducerRequest], None)
      }
   
   //检验requiredAcks的有效性,ack只能是1,0,-1 
   if (produceRequest.requiredAcks > 1 || produceRequest.requiredAcks < -1) {
      warn(("Client %s from %s sent a produce request with request.required.acks of %d, which is now deprecated and will " +
            "be removed in next release. Valid values are -1, 0 or 1. Please consult Kafka documentation for supported " +
            "and recommended configuration.").format(produceRequest.clientId, request.remoteAddress, produceRequest.requiredAcks))
    }
    //将消息持久化到本地
    val sTime = SystemTime.milliseconds
    val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)
    
    //过滤异常错误
    val firstErrorCode = localProduceResults.find(_.errorCode != ErrorMapping.NoError).map(_.errorCode).getOrElse(ErrorMapping.NoError)

    //统计出现错误的partition个数
    val numPartitionsInError = localProduceResults.count(_.error.isDefined)
    if(produceRequest.requiredAcks == 0) {
    
      if (numPartitionsInError != 0) {
        //当ack=0时,如果出现持久化消息异常,则主动关闭连接
        info(("Send the close connection response due to error handling produce request " +
          "[clientId = %s, correlationId = %s, topicAndPartition = %s] with Ack=0")
          .format(produceRequest.clientId, produceRequest.correlationId, produceRequest.topicPartitionMessageSizeMap.keySet.mkString(",")))
        requestChannel.closeConnection(request.processor, request)
      } else {

        if (firstErrorCode == ErrorMapping.NoError)
          //没有任何错误,则将高级消费者发送的偏移量保存至offsetManager的内存中
          offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo))

        if (offsetCommitRequestOpt.isDefined) {
         //如果是高级消费者发送的,即使ack=0,也需要将消息的持久化结果返回给消费者
          val response = offsetCommitRequestOpt.get.responseFor(firstErrorCode, config.offsetmetadataMaxSize)
          requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
        } else
          
          requestChannel.noOperation(request.processor, request)
      }
    } 
    else if (produceRequest.requiredAcks == 1 ||
        produceRequest.numPartitions <= 0 ||
        numPartitionsInError == produceRequest.numPartitions) {
      //ack=1或者目标分区个数无效或者持久化全部失败的情况下,需要返回具体的执行结果
      if (firstErrorCode == ErrorMapping.NoError) {
        //没有任何错误,则将高级消费者发送的偏移量保存在offsetManager的内存中。
        offsetCommitRequestOpt.foreach(ocr => offsetManager.putOffsets(ocr.groupId, ocr.requestInfo) )
      }
      //将具体的请求执行结果返回给客户端
      val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode, r.start)).toMap
      val response = offsetCommitRequestOpt.map(_.responseFor(firstErrorCode, config.offsetmetadataMaxSize))
                                           .getOrElse(ProducerResponse(produceRequest.correlationId, statuses))

      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
    } 
    else {
      // ack=-1,需要等待(min.insync.replicas-1)个副本同步数据后才返回相应
      val producerRequestKeys = produceRequest.data.keys.toSeq
      val statuses = localProduceResults.map(r =>
        r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
      val delayedRequest =  new DelayedProduce(
        producerRequestKeys,
        request,
        produceRequest.ackTimeoutMs.toLong,
        produceRequest,
        statuses,
        offsetCommitRequestOpt)

      //判断请求是否满足返回的条件,如果满足就返回,否则延迟返回响应
      val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
      if (satisfiedByMe)
        producerRequestPurgatory.respond(delayedRequest)
    }

    // we do not need the data anymore
    produceRequest.emptyData()
  }

其中,produceRequest.requiredAcks 即ack有3个可选值,分别是1,0,-1。

    1)、ack=1,简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。

注意,ack的默认值就是1。这个默认值其实就是吞吐量与可靠性的一个折中方案。生产上我们可以根据实际情况进行调整,比如如果你要追求高吞吐量,bioj那么就要放弃可靠性。

     2)、ack=0,简单来说就是,producer不关心Broker Server持久化是否成功,只需要Broker接收到消息就可以了。

    3)、ack=-1,简单来说就是,producer只有收到分区内所有副本的成功写入的通知才认为推送消息成功了。

当ack=0,或者1的时候比较好理解,那么当ack=-1的时候,kafka是如何做得延时答复请求的呢?它采取的是一种叫做Purgatory的策略,下面介绍一下这种策略的实现原理。

DelayedProduce的定义如下:

class DelayedProduce(override val keys: Seq[TopicAndPartition],
                     override val request: RequestChannel.Request,
                     override val delayMs: Long,
                     val produce: ProducerRequest,
                     val partitionStatus: Map[TopicAndPartition, DelayedProduceResponseStatus],
                     val offsetCommitRequestOpt: Option[OffsetCommitRequest] = None)
  extends DelayedRequest(keys, request, delayMs) with Logging {

DelayedProduce内部有个变量keys,针对当前ProducerRequest中不同的TopicAndPartition对DelayedProduce进行分类。

我们看一下RequestPurgatory的定义

abstract class RequestPurgatory[T <: DelayedRequest](
        brokerId: Int = 0, 
        purgeInterval: Int = 1000)
        extends Logging with KafkaMetricsGroup {

  //针对DelayedProduce内部不同的TopicAndPartition产生对应的watchers,watchers内部保存对应的
DelayedProduce
  private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))

  
  private val expiredRequestReaper = new ExpiredRequestReaper
  private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false)

RequestPurgatory内部不同的TopicAndPartition产生对应的watchers,watchers内部的requests保存相同的TopicAndPartition对应的DelayedProduce。它们的关系如下: 

接下来我们看一下什么时候会触发Broker Server回复DelayedProduce的响应,只有当该Broker Server上对应的partition的HighWaterMark发生变化时才会去检查是否满足条件。如果满足就返回,否则继续等待直到超时。判断是否满足的代码如下:

def isSatisfied(replicaManager: ReplicaManager) = {
   
    partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) =>
     
      // 判断每个分区的数据是否在等待其它Broker server来获取
      if (fetchPartitionStatus.acksPending) {
         //获取对应的partition对象
        val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
        //判断该partition是否有足够的副本达到了指定的偏移量
        val (hasEnough, errorCode) = partitionOpt match {
          case Some(partition) =>
            partition.checkEnoughReplicasReachOffset(
              fetchPartitionStatus.requiredOffset,
              produce.requiredAcks)
          case None =>
            (false, ErrorMapping.UnknownTopicOrPartitionCode)
        }
        //如果发生异常,则返回错误
        if (errorCode != ErrorMapping.NoError) {
          fetchPartitionStatus.acksPending = false
          fetchPartitionStatus.responseStatus.error = errorCode
        } else if (hasEnough) {
          //当hasEnough为true,设置acksPending为false
          fetchPartitionStatus.acksPending = false
          fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError
        }
      }
    }

    // 只有该DelayedProduce所有的keys都满足的时候才认为满足
    val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
    satisfied
  }

只有该DelayedProduce所有的keys对应的不同TopicAndPartition都满足的时候才认为该请求满足答复的条件,TopicAndPartition判断是否满足的代码如下:

def checkEnoughReplicasReachOffset(requiredOffset: Long, requiredAcks: Int): (Boolean, Short) = {
    leaderReplicaIfLocal() match {
      case Some(leaderReplica) =>
        //获取ISR列表
        val curInSyncReplicas = inSyncReplicas
         //ISR中每个Replica的记录偏移量是否大于指定大小
        val numAcks = curInSyncReplicas.count(r => {
          if (!r.isLocal)
            r.logEndOffset.messageOffset >= requiredOffset
          else
            true 
        })
        //获取min.insync.replicas
        val minIsr = leaderReplica.log.get.config.minInSyncReplicas
        //如果ack<0且highwatermark大于指定大小
        if (requiredAcks < 0 && leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
          判断minIsr是否小于当前isr
          if (minIsr <= curInSyncReplicas.size) {
            (true, ErrorMapping.NoError)
          } else {
            (true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
          }
        } else if (requiredAcks > 0 && numAcks >= requiredAcks) {
          (true, ErrorMapping.NoError)
        } else
          (false, ErrorMapping.NoError)
      case None =>
        (false, ErrorMapping.NotLeaderForPartitionCode)
    }
  }

Partition主要判断Leader Replica的HighWaterMark是否大于指定大小,则说明ISR中所有的replica都Fetch到足够的数据了,并且ISR的大小是否大于等于指定的minIsr,如果满足就说明Broker Server能回复这些延时请求了。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5718878.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存