Kafka 生产者 Producer 原理及重要参数

Kafka 生产者 Producer 原理及重要参数,第1张

Kafka 生产者 Producer 原理及重要参数

《深入理解kafka:核心设计与实践原理》笔记

一、整体架构

  消息在真正发往 Kafka 之前,有可能需要经过 拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)等一系列作用,下面先来看一下生产者客户端的整体架构。

  拦截器(Interceptor):Kafka 一共有两种拦截器:生产者拦截器和消费者拦截器。生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
  序列化器(Serializer):生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。
  分区器(Partitioner):消息在发往 broker 的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息发送时不指定分区就需要用到分区器,若指定了就不用。

主线程 KafkaProducer 创建消息经处理后,发送给消息累加器

  整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。

消息累加器缓存消息

  RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。

  RecordAccumulator 缓存的大小可以通过生产者客户端参数 buffer.memory 配置,默认值为32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer 的 send() 方法调用要么被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为60000,即60秒。

  主线程中发送过来的消息都会被追加到 RecordAccumulator 的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是 ProducerBatch,即 Deque。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。

  注意 ProducerBatch 不是 ProducerRecord,ProducerBatch 中可以包含一至多个 ProducerRecord。通俗地说,ProducerRecord 是生产者中创建的消息,而ProducerBatch 是指一个消息批次,ProducerRecord 会被包含在 ProducerBatch 中,这样可以使字节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大的 ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。

ByteBuffer 的复用

  ProducerBatch 和消息的具体格式有关。如果生产者客户端需要向很多分区发送消息,则可以将 buffer.memory 参数适当调大以增加整体的吞吐量。消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在 Kafka 生产者客户端中,通过 java.io.ByteBuffer 实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在 RecordAccumulator 的内部还有一个 BufferPool ,它主要用来实现 ByteBuffer 的复用,以实现缓存的高效利用。不过 BufferPool 只针对特定大小的 ByteBuffer 进行管理,而其他大小的 ByteBuffer 不会缓存进 BufferPool 中,这个特定的大小由 batch.size 参数来指定,默认值为116KB。我们可以适当地调大 batch.size 参数以便多缓存一些消息。

ProducerBatch 的大小和 batch.size 参数(ByteBuffer大小)的关系

  ProducerBatch 的大小和 batch.size 参数有着密切的关系。当一条消息(ProducerRecord)流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个 ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的 ProducerBatch。在新建 ProducerBatch 时评估这条消息的大小是否超过 batch.size 参数的大小,如果不超过,那么就以 batch.size 参数的大小来创建 ProducerBatch,这样在使用完这段内存区域之后,可以通过 BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。

Sender线程转换消息累加器中的消息方便网络IO

  Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本 <分区,Deque> 的保存形式转变成

  在转换成 的形式之后,Sender还会进一步封装成 的形式,这样就可以将 Request 请求发往各个Node了,这里的 Request 是指 Kafka 的各种协议请求,对于消息发送而言就是指具体的 ProduceRequest。

  请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中,InFlightRequests 保存对象的具体形式为 Map,它的主要作用是缓存了已经发出去但还没有收到响应的请求(Nodeld是一个String类型,表示节点的id编号)。与此同时,InFlightRequests 还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为 max.in.flight.requests.per.connection,默认值为5,即每个连接最多只能缓存5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque 的 size 与这个参数的大小来判断对应的 Node 中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个Node节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。

二、元数据的更新

  上面提及的 InFlightRequests 还可以获得 leastLoadedNode,即最小负载节点。这里的负载最小是通过每个 Node 在 InFlightRequests 中还未确认的请求决定的,未确认的请求越多则认为负载越大。对于下图中的 InFlightRequests 来说,很明显Node1的负载最小。也就是说,Node1为当前的 leastLoadedNode。选择 leastLoadedNode 发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode 的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。

  当使用如下方式创建一条消息 ProducerRecord 时:

ProducerRecord record = new ProducerRecord<>(topic, "Hello, Kafka!");

  我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer 要将此消息追加到指定主题的某个分区所对应的 leader 副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后 KafkaProducer 需要知道目标分区的 leader 副本所在的 broker 节点的地址、端口等信息才能建立连接,最终才能将消息发送到 Kafka,在这一过程中所需要的信息都属于元数据信息。

  因为客户端可以自己发现其他 broker 节点的地址,这一过程属于元数据相关的更新 *** 作。与此同时,分区数量及 leader 副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。

  元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

  当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过 metadata.max.age.ms 时间没有更新元数据都会引起元数据的更新 *** 作。
  客户端参数 metadata.max.age.ms 的默认值为300000,即5分钟。元数据的更新 *** 作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出 leastLoadedNode,然后向这个 Node 发送 metadataRequest 请求来获取具体的元数据信息。这个更新 *** 作是由Sender线程发起的,在创建完metadataRequest 之后同样会存入 InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由 Sender 线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过 synchronized 和 final 关键字来保障。

三、重要的生产者参数 Kafka生产者客户端必要参数配置

在Kafka生产者客户端 KafkaProducer 中有3个参数是必填的。

  • bootstrap.servers:该参数用来指定生产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的broker地址,因为生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要设置两个以上的broker 地址信息,当其中任意一个容机时,生产者仍然可以连接到 Kafka 集群上。

  • key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。生产者把消息发往 broker 之前需要将消息中对应的 key 和 value 做相应的序列化 *** 作来转换成字节数组。key.serializer 和 value.serializer 这两个参数分别用来指定 key 和 value 序列化 *** 作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,如org.apache.kafka.common.serialization.StringSerializer,单单指定 StringSerializer 是错误的。

acks

  这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。
  acks是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks参数有3种类型的值(都是字符串类型)。

  • acks = “1”(默认值)

  生产者发送消息之后,只要分区的 leader 副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入 leader 副本,比如在leader副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入 leader 副本并返回成功响应给生产者,且在被其他 follower 副本拉取之前 leader 副本崩溃,那么此时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息。acks设置为 “1”,是消息可靠性和吞吐量之间的折中方案。

  • acks = "0"

  生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入 Kafka 的过程中出现某些异常,导致Kafka 并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks设置为0可以达到最大的吞吐量。

  • acks = “-1” 或 acks = "all"

  生产者在消息发送之后,需要等待 ISR 中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为 -1(all) 可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为 ISR(同步副本)中可能只有 leader 副本,这样就退化成了 acks=1 的情况。要获得更高的消息可靠性需要配合 min.insync.replicas 等参数的联动,具体内容可以参考原书8.3节。

max.request.size

  这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1MB。一般情况下,这个默认值就可以满足大多数的应用场景了。不建议盲目地增大这个参数的配置值,尤其是在对 Kafka 整体脉络没有足够把控的时候。
  因为这个参数还涉及一些其他参数的联动,比如 broker 端的 message.max.bytes 参数,如果配置错误可能会引起一些不必要的异常。比如将 broker 端的 message.max.bytes 参数配置为10,而 max.request.size 参数配置为20,那么当我们发送一条大小为15B的消息时,生产者客户端就会报出如下的异常:

org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
retires 和 retry.backoff.ms

  retries 参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不重试。
  消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。
  不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过 max.request.size 参数配置的值时,这种方式就不可行了。

  重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为100,用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries 和 retry.backoff.ms 之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。

  Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。对于某些应用来说,顺序性非常重要,比如 MySQL 的 binlog 传输,如果出现错误就会造成非常严重的后果。
  如果将 acks 参数配置为非零值,并且 max.in.flight.requests.per.connection(每个客户端与Node之间的连接最多缓存的请求数)参数配置为大于1的值,那么就会出现错序的现象:如果第一批次消息写入失败,而第二批次消息写入成功,那么生产者会重试发送第一批次的消息,此时如果第一批次的消息写入成功,那么这两个批次的消息就出现了错序。一般而言,在需要保证消息顺序的场合建议把参数 max.in.flight.requests.per.connection 配置为1,而不是把 acks 配置为0,不过这样也会影响整体的吞吐。

compression.type

  这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。还可以配置为“gzip”“snappy”和“lz4”。对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。

connections.max.idle.ms

  用来指定在多久之后关闭闲置的连接,默认值是 540000(ms),即9分钟。

linger.ms

  这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入 ProducerBatch 的时间,默认值为0。生产者客户端会在 ProducerBatch 被填满或等待时间超过 linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。

receive.buffer.bytes 与 send.buffer.bytes

  这个参数用来设置 Socket 接收消息缓冲区(SO_RECBUF)的大小,默认值为32KB。如果设置为 -1,则使用 *** 作系统的默认值。如果 Producer 与 Kafka 处于不同的机房,则可以适地调大这个参数值。

  这个参数用来设置 Socket 发送消息缓冲区(SO_SNDBUF)的大小,默认值为128KB。与 receive.buffer.bytes 参数一样,如果设置为 -1,则使用 *** 作系统的默认值。

request.timeout.ms

  这个参数用来配置 Producer 等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择进行重试。注意这个参数需要比 broker 端参数 replica.lag.time.max.ms 的值要大,这样可以减少因客户端重试而引起的消息重复的概率。
  注:Kafka 判断 ISR 中的 follower 和 leader 同步的根据是参数 replica.lag.time.max.ms 默认是10s。就是说如果follower 落后 leader 10s,则认为它失效了,会被踢出 ISR(同步副本)集合。

  更多参数请参考原书或其它资料。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5574179.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存