谈谈你对 Kafka 幂等的了解?

谈谈你对 Kafka 幂等的了解?,第1张

谈谈你对 Kafka 幂等的了解?

在之前的旧版本中,Kafka只能支持两种语义:At most once和At least once。At most once保证消息不会朝服,但是可能会丢失。在实践中,很有有业务会选择这种方式。At least once保证消息不会丢失,但是可能会重复,业务在处理消息需要进行去重。、

 Kafka在0.11.0.0版本支持增加了对幂等的支持。幂等是针对生产者角度的特性。幂等可以保证上生产者发送的消息,不会丢失,而且不会重复

如何实现幂等

HTTP/1.1中对幂等性的定义是:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。也就是说,其任意多次执行对资源本身所产生的影响均与一次执行的影响相同

Methods can also have the property of “idempotence” in that (aside from error or expiration issues) the side-effects of N > 0 identical requests is the same as for a single request.

实现幂等的关键点就是服务端可以区分请求是否重复,过滤掉重复的请求。要区分请求是否重复的有两点:

  • 唯一标识:要想区分请求是否重复,请求中就得有唯一标识。例如支付请求中,订单号就是唯一标识
  • 记录下已处理过的请求标识:光有唯一标识还不够,还需要记录下那些请求是已经处理过的,这样当收到新的请求时,用新请求中的标识和处理记录进行比较,如果处理记录中有相同的标识,说明是重复交易,拒绝掉
Kafka幂等性实现原理

为了实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。

  • PID。每个新的Producer在初始化的时候会被分配一个唯一的PID,这个PID对用户是不可见的。
  • Sequence Numbler。(对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number

Kafka可能存在多个生产者,会同时产生消息,但对Kafka来说,只需要保证每个生产者内部的消息幂等就可以了,所有引入了PID来标识不同的生产者。

对于Kafka来说,要解决的是生产者发送消息的幂等问题。也即需要区分每条消息是否重复。 Kafka通过为每条消息增加一个Sequence Numbler,通过Sequence Numbler来区分每条消息。每条消息对应一个分区,不同的分区产生的消息不可能重复。所有Sequence Numbler对应每个分区

Broker端在缓存中保存了这seq number,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个Producer对于同一个<Topic, Partition>的Exactly Once语义。不能保证同一个Producer一个topic不同的partion幂等。

实现幂等前后对比

标准实现

 

 

发生重试时

 

实现幂等之后

 

发生重试时

 

幂等性示例

生产者要使用幂等性很简单,只需要增加以下配置即可:

 

enable.idempotence=true

 

Properties props = new Properties();props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");props.put("acks", "all"); // 当 enable.idempotence 为 true,这里默认为 allprops.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer producer = new KafkaProducer(props);producer.send(new ProducerRecord(topic, "test");

Prodcuer 幂等性对外保留的接口非常简单,其底层的实现对上层应用做了很好的封装,应用层并不需要去关心具体的实现细节,对用户非常友好

幂等性实现生产者流程

此流程只展示了涉及生产者幂等性相关的重要 *** 作

 

幂等性-生产者流程.png

这里重点关注幂等性相关的内容,首先,KafkaProducer启动时,会初始化一个 TransactionManager 实例,它的作用有以下几个部分:

  • 记录本地的事务状态(事务性时必须)
  • 记录一些状态信息以保证幂等性,比如:每个 topic-partition 对应的下一个 sequence numbers 和 last acked batch(最近一个已经确认的 batch)的最大的 sequence number 等;
  • 记录 ProducerIdAndEpoch 信息(PID 信息)。

幂等性时,Producer 的发送流程如下: 1)调用kafkaProducer的send方法将数据添加到 RecordAccumulator 中,添加时会判断是否需要新建一个 ProducerBatch,这时这个 ProducerBatch 还是没有 PID 和 sequence number 信息的; 2)Producer 后台发送线程 Sender,在 run() 方法中,会先根据 TransactionManager 的 shouldResetProducerStateAfterResolvingSequences() 方法判断当前的 PID 是否需要重置,重置的原因是因为:如果有topic-partition的batch已经超时还没处理完,此时可能会造成sequence number 不连续。因为sequence number 有部分已经分配出去了,而Kafka服务端没有收到这部分sequence number 的序号,Kafka服务端为了保证幂等性,只会接受同一个pid的sequence number 等于服务端缓存sequence number +1的消息,所有这时候需要重置Pid来保证幂等性

 

 synchronized boolean shouldResetProducerStateAfterResolvingSequences() {                if (isTransactional()) // We should not reset producer state if we are transactional. We will transition to a fatal error instead. return false;        for (Iterator<TopicPartition> iter = partitionsWithUnresolvedSequences.iterator(); iter.hasNext(); ) { TopicPartition topicPartition = iter.next(); if (!hasInflightBatches(topicPartition)) {//没有该分区的消息在发送中     // The partition has been fully drained. At this point, the last ack'd sequence should be once less than     // next sequence destined for the partition. If so, the partition is fully resolved. If not, we should     // reset the sequence number if necessary.          if (isNextSequence(topicPartition, sequenceNumber(topicPartition))) {         // This would happen when a batch was expired, but subsequent batches succeeded.         iter.remove();     } else {         // We would enter this branch if all in flight batches were ultimately expired in the producer.         log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +      "Going to reset producer state.", topicPartition, lastAckedSequence(topicPartition), sequenceNumber(topicPartition));         return true;     } }        }        return false;    }

3)Sender线程调用maybeWaitForProducerId()方法判断是否要申请Pid,如果需要,会阻塞直到成功申请到Pid

 

ProducerIdAndEpoch producerIdAndEpoch = null;     boolean isTransactional = false; if (transactionManager != null) {//有事务或者启用幂等    //事务是否允许向此分区发送消息        if (!transactionManager.isSendToPartitionAllowed(tp))      break; producerIdAndEpoch = transactionManager.producerIdAndEpoch();     if (!producerIdAndEpoch.isValid())   // we cannot send the batch until we have refreshed the producer id          break; //是否支持事务   isTransactional = transactionManager.isTransactional();        if (!first.hasSequence() && transactionManager.hasUnresolvedSequence(first.topicPartition))       break;         int firstInFlightSequence = transactionManager.firstInFlightSequence(first.topicPartition);        if (firstInFlightSequence != RecordBatch.NO_SEQUENCE && first.hasSequence()          && first.baseSequence() != firstInFlightSequence)break;        }          ProducerBatch batch = deque.pollFirst();            if (producerIdAndEpoch != null && !batch.hasSequence()) {          //设置Batch的sequenceNumber 和isTransactional     batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);    //增加该分区的sequenceNumber,增加值为Batch中消息的个数      transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);    log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +       "{} being sent to partition {}", producerIdAndEpoch.producerId,      producerIdAndEpoch.epoch, batch.baseSequence(), tp);      //加入到发送队列中 transactionManager.addInFlightBatch(batch);        }          batch.close();//关闭此batch,不可追加消息  size += batch.records().sizeInBytes();//累计size         ready.add(batch);//加到集合中,最后一起返回出去batch.drained(now);//更新drainedMs时间戳         

5)最后调用sendProduceRequest方法将消息发送出去

幂等性服务端相关的类Batchmetadata

用来存储Batch的元数据, Batchmetadata类的几个重要的字段

  • lastSeq:Batch中最后一条消息的seq
  • lastOffset: Batch中最后一条消息的offset
  • offsetDelta: 第一条消息和最后一条消息的offset之差 lastSeq-offsetDelta可以得到第一条消息的seq,lastOffset-offsetDelta可以得到第一条消息的offset
ProducerStateEntry

用于存储每个producerId对应的Batch,按照sequence从小到大进行排序,最小的作为头,最大的作为尾 ,每个producerId的队列失踪保持着最多5个Batch,如果超过5个了,就从头开始remove。 ProducerStateEntryl类的重要字段: producerId:生产者id,用服务端生成,生产者发送消息时会带上此字段 batchmetadata:Queue[Batchmetadata]类型,里面存放了服务端收到的该生产者最新的Batch,最多存放5个 producerEpoch: 生产者的年代,默认为-1

ProducerStateEntryl类的核心方法:addBatch()方法,往ProducerStateEntry中添加Batch,此方法首先会判断是否要更新epoch,如果epoch不一样,则会清空batchmetadata队列并更新最新的epoch,然后加batch添加到batchmetadata中,添加前也会先校验batchmetadata的元素个数是否等于ProducerStateEntry.NumBatchesToRetain,如果相等就剔除掉头部的Batchmetadata。代码如下:

 

  def addBatch(producerEpoch: Short, lastSeq: Int, lastOffset: Long, offsetDelta: Int, timestamp: Long): Unit = {    maybeUpdateEpoch(producerEpoch)//更新producerEpoch,如果producerEpoch不一样就清空队列中的Batch    addBatchmetadata(Batchmetadata(lastSeq, lastOffset, offsetDelta, timestamp))  }  def maybeUpdateEpoch(producerEpoch: Short): Boolean = {    if (this.producerEpoch != producerEpoch) {      batchmetadata.clear()//清空Batch      this.producerEpoch = producerEpoch//更新producerEpoch      true    } else {      false    }  }  private def addBatchmetadata(batch: Batchmetadata): Unit = {    if (batchmetadata.size == ProducerStateEntry.NumBatchesToRetain)      batchmetadata.dequeue()//去掉头部的Batch    batchmetadata.enqueue(batch)  }

findDuplicateBatch()方法用于校验新产生的消息是否是重复发送。遍历batchmetadata,如果新产生的Batch的firstSeq和lastSeq都和batchmetadata中缓存的某个Batch一样,说明是重复的,代码如下:

 

  def findDuplicateBatch(batch: RecordBatch): Option[Batchmetadata] = {    if (batch.producerEpoch != producerEpoch)       None    else      batchWithSequenceRange(batch.baseSequence, batch.lastSequence)  }  def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[Batchmetadata] = {    val duplicate = batchmetadata.filter { metadata =>      firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq    }    duplicate.headOption  }
ProducerAppendInfo

ProducerAppendInfo用于在追加的消息写到Log之前进行校验,主要对epoch、sequence number进行校验 currentEntry:ProducerStateEntry类型,就是pid对应的ProducerStateEntry中batchmetadata尾部对象,用于跟新追加的Batch做比较 validationType:校验的方式。不同的类型,校验的规则不一样

  • ValidationType.None:什么也不用校验。如果请求来自非客户端(Kakfa内部),则就是这种类型
  • ValidationType.EpochOnly:只校验Epoch。如果Topic是__consumer_offsets就是这种校验类型
  • ValidationType.Full:检查ProducerEpoch和sequence number

核心方法就是maybevalidateAppend(),根据validationType做不同的校验

 

  private def maybevalidateAppend(producerEpoch: Short, firstSeq: Int) = {    validationType match {      case ValidationType.None =>      case ValidationType.Epochonly =>        checkProducerEpoch(producerEpoch)      case ValidationType.Full =>        checkProducerEpoch(producerEpoch)        checkSequence(producerEpoch, firstSeq)    }  }

checkProducerEpoch方法检查ProducerEpoch是否合法

 

  private def checkProducerEpoch(producerEpoch: Short): Unit = {    if (producerEpoch < updatedEntry.producerEpoch) {      throw new ProducerFencedException(s"Producer's epoch is no longer valid. There is probably another producer " +        s"with a newer epoch. $producerEpoch (request epoch), ${updatedEntry.producerEpoch} (server epoch)")    }  }

checkSequence方法是一个跟幂等性很重要的方法,此方法就是校验sequence number的。有以下几个判断规则

  • 1)如果producerEpoch更新了,则追加的Batch里的appendFirstSeq必须是0
  • 2)当currentLastSeq为-1时,说明此生产者还没有成功追加过消息,appendFirstSeq也必须是0
  • 3)appendFirstSeq = currentLastSeq+1,或者当currentLastSeq达到Int的最大值Int.MaxValue时,appendFirstSeq为0

 

private def checkSequence(producerEpoch: Short, appendFirstSeq: Int): Unit = {        if (producerEpoch != updatedEntry.producerEpoch) {      if (appendFirstSeq != 0) {        if (updatedEntry.producerEpoch != RecordBatch.NO_PRODUCER_EPOCH) {          throw new OutOfOrderSequenceException(s"Invalid sequence number for new epoch: $producerEpoch " + s"(request epoch), $appendFirstSeq (seq. number)")        } else {          throw new UnknownProducerIdException(s"Found no record of producerId=$producerId on the broker. It is possible " + s"that the last message with the producerId=$producerId has been removed due to hitting the retention limit.")        }      }    } else {      val currentLastSeq = if (!updatedEntry.isEmpty)        updatedEntry.lastSeq      else if (producerEpoch == currentEntry.producerEpoch)        currentEntry.lastSeq      else        RecordBatch.NO_SEQUENCE      //currentLastSeq为-1时,说明该生产者还没有上送成功过任何消息,appendFirstSeq必须从0开始      if (currentLastSeq == RecordBatch.NO_SEQUENCE && appendFirstSeq != 0) {        // the epoch was bumped by a control record, so we expect the sequence number to be reset        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: found $appendFirstSeq " +          s"(incoming seq. number), but expected 0")      } else if (!inSequence(currentLastSeq, appendFirstSeq)) {//校验Sequence是否是连续的        throw new OutOfOrderSequenceException(s"Out of order sequence number for producerId $producerId: $appendFirstSeq " +          s"(incoming seq. number), $currentLastSeq (current end sequence number)")      }    }  }  private def inSequence(lastSeq: Int, nextSeq: Int): Boolean = {    nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue)  }
ProducerStateManager

用来管理Producer的状态,里面存储了各个生产者与ProducerStateEntry的对应关系。每个ProducerStateManager对应一个TopicPartition

producers:用于存储生产者与ProducerStateEntry的对应关系,key为pid,value为ProducerStateEntry prepareUpdate()方法返回ProducerAppendInfo对象,用于在写到Log之前校验消息

 

  def prepareUpdate(producerId: Long, isFromClient: Boolean): ProducerAppendInfo = {    val validationToPerform =      if (!isFromClient)        ValidationType.None      else if (topicPartition.topic == Topic.GROUP_metaDATA_TOPIC_NAME)        ValidationType.Epochonly      else        ValidationType.Full    //从队列中取出最近的ProducerStateEntry    val currentEntry = lastEntry(producerId).getOrElse(<u>ProducerStateEntry.empty(producerId)</u>)    new ProducerAppendInfo(producerId, currentEntry, validationToPerform)  }

当消息写入到Log后,调用update方法,更新生产者状态信息

 

 def update(appendInfo: ProducerAppendInfo): Unit = {    if (appendInfo.producerId == RecordBatch.NO_PRODUCER_ID)      throw new IllegalArgumentException(s"Invalid producer id ${appendInfo.producerId} passed to update " +        s"for partition $topicPartition")    trace(s"Updated producer ${appendInfo.producerId} state to $appendInfo")    val updatedEntry = appendInfo.toEntry    producers.get(appendInfo.producerId) match {      case Some(currentEntry) =>        currentEntry.update(updatedEntry)      case None =>        producers.put(appendInfo.producerId, updatedEntry)    }    appendInfo.startedTransactions.foreach { txn =>      ongoingTxns.put(txn.firstOffset.messageOffset, txn)    }  }
幂等性实现服务端流程

幂等性-服务端流程.png

如前面途中所示,当 Broker 收到 ProduceRequest 请求之后,会通过 handleProduceRequest() 做相应的处理,其处理流程如下(这里只讲述关于幂等性相关的内容):

1)如果请求是事务请求,检查是否对 TXN.id 有 Write 权限,没有的话返回TRANSACTIONAL_ID_AUTHORIZATION_FAILED; 2)如果请求设置了幂等性,检查是否对 ClusterResource 有 IdempotentWrite 权限,没有的话返回 CLUSTER_AUTHORIZATION_FAILED; 3)验证对 topic 是否有 Write 权限以及 Topic 是否存在,否则返回 TOPIC_AUTHORIZATION_FAILED 或 UNKNOWN_TOPIC_OR_PARTITION 异常; 4)检查是否有 PID 信息,没有的话走正常的写入流程; 5)LOG 对象会在 analyzeAndValidateProducerState() 方法先根据 batch 的 sequence number 信息检查这个 batch 是否重复(server 端会缓存 PID 对应这个 Topic-Partition 的最近5个 batch 信息),如果有重复,这里当做写入成功返回(不更新 LOG 对象中相应的状态信息,比如这个 replica 的 the end offset 等); 6)有了 PID 信息,并且不是重复 batch 时,在更新 producer 信息时,会做以下校验:

  • 检查该 PID 是否已经缓存中存在(主要是在 ProducerStateManager 对象中检查);
  • 如果不存在,那么判断 sequence number 是否 从0 开始,是的话,在缓存中记录 PID 的 meta(PID,epoch, sequence number),并执行写入 *** 作,否则返回 UnknownProducerIdException(PID 在 server 端已经过期或者这个 PID 写的数据都已经过期了,但是 Client 还在接着上次的 sequence number 发送数据);
  • 如果该 PID 存在,先检查 PID epoch 与 server 端记录的是否相同;
  • 如果不同并且 sequence number 不从 0 开始,那么返回 OutOfOrderSequenceException 异常;
  • 如果不同并且 sequence number 从 0 开始,那么正常写入;
  • 如果相同,那么根据缓存中记录的最近一次 sequence number(currentLastSeq)检查是否为连续(会区分为 0、Int.MaxValue 等情况),不连续的情况下返回 OutOfOrderSequenceException 异常。

 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4880055.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-11
下一篇 2022-11-11

发表评论

登录后才能评论

评论列表(0条)

保存