kafka生产者、消费者参数

kafka生产者、消费者参数,第1张

kafka生产者、消费者参数 生产者 bootstrap.servers

该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查询其他broker的信息。不过最少提供2个broker的信息,一旦其中一个宕机,生产者仍能连接到集群上。
格式如下:192.168.40.150:9092,192.168.40.151:9092,192.168.40.152:9092

key.serializer

key值的序列化方式,常见的序列化方式有StringSerializer,ByteArraySerializer、ByteBufferSerializer、BytesSerializer、Long(Double Integer String)Serializer,要自定义序列化的话需要实现Serializer接口。

value.serializer

value指的序列化方式,常见方式如上。

acks

这个参数中是用来指定分区中必须要有多少个副本收到这条消息,之后生产者猜会认为这条消息是成功写入的。acks是生产者客户端的一个非常重要的参数。它涉及到消息的可靠性和吞吐量之间的权衡。

ack=1:默认值为1.只要leader副本成功写入数据,那么它就会收到服务端的成功响应。是消息可靠性和吞吐量的一个这种方案。如果无法写入leader副本,比如在leader副本奔溃,重新选举新的leader副本的过程中,那么生产者就会收到一个错误的响应。为了避免消息丢失,生产者可以选择重发消息。如果成功写入leader副本并成功返回给响应者,且在被其他follower副本拉取之前leader副本奔溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息。

ack=0:只要消息发送后,不等待服务端的任何响应。在其他配置相同的情况下可以达到最大的吞吐量。如果消息从发送到写入kafka的过程中出现异常,导致kafka并没有收到这条消息,那么生产者也是无从得知,消息也就丢失。

ack=-1:等待所有的ISR副本都成功写入数据后,才返回成功响应。只要ISR中的副本数量大于1个,就一定能保证消息不会丢失。具有最大的可可靠性。配置min.insync.replicas大于1,将能保证消息不会丢失。

buffer.memory

默认值:33554432(32M),生产者客户端用于缓存消息的缓存区的大小。

batch.size

默认值,16384(16KB),当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去,即指定ProducerBatch可以复用内存区域大小

max.request.size

这个参数的用来限制生产者客户端能够发送的消息最大值。默认值为:1048576B,即1M。

retires

retires参数是用来配置生产者重试的次数,默认值为0。即不进行重试,即在发送消息的时候不进行任何的重试动作。消息从生产者写入到broker的时候可能会发生一些临时性的异常,比如网络抖动,leader副本选举等,这种异常往往是可以自行恢复的。

retry.backoff.ms

设定两次重试之间的时间间隔,避免无效的频繁重试,默认值为100。

linger.ms

生产者发送ProducerBatch之前等待更多消息(ProducerRecoder)加入ProducerBatch的时间,默认值为0。生产者客户端会在ProducerBatch被填满或者等待时间超过linger.ms时发送出去。增大这个参数的值会增加消息的延迟,但同时会提高吞吐量。

receive.buffer.bytes和send.buffer.bytes:

指定TCP socket接受和发送数据包的缓存区大小。如果它们被设置为-1,则使用 *** 作系统的默认值。如果生产者或消费者处在不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

partitioner.class:

显示配置使用哪个分区器。

interceptor.classes:

指定自定义拦截器,多个传List集合。

max.block.ms

最大阻塞时间,RecordAccumulator缓存不足时或者没有可用的元数据时,KafkaProducer的send()方法调用要么被阻塞,要么抛出异常,此参数的默认值为60000,即60s。

compression.type

指定消息的压缩方式,默认值为"none",可以配置为"gzip",“snappy”和“lz4”。

connections.max.idle.ms

用来指定多久之后关闭闲置的连接,默认值540000(ms),即9min

消费者 bootstrap.servers

该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查询其他broker的信息。不过最少提供2个broker的信息,一旦其中一个宕机,生产者仍能连接到集群上。
格式如下:192.168.40.150:9092,192.168.40.151:9092,192.168.40.152:9092

key.deserializer

key值的反序列化方式,反序列化类型同上生产者

value.deserializer

value指的反序列化方式,常见方式如上。

group.id

该参数指定的是 consumer group 的名字。它能够唯一标识一个 consumer group。它能够唯一标识一个 consumer group该参数是有默认值的,即一个空字符串 。 但在开发 consumer 程序时我们依然要显式指定 group.id ,否则 consumer 端会抛出 InvalidGroupldException 异常 。

max.poll.records

一次调用poll() *** 作时返回的最大记录数,默认值为500,即最大拉取500条消息。

auto.offset.reset

earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

session.timeout.ms

用于通过心跳机制检测使用者故障.使用者心跳线线程必须在session.timeout.ms时间到期之前将心跳线发送给代理.否则,被kafka视为死者的消费者就会触发重新平衡.默认(默认 30s)

poll.timeout

例如1000ms,kafka消费者会阻塞1000ms,去获取records(消息)。

auto.commit.interval.ms

自动提交位移的时间间隔,默认值为5000毫秒(5秒)

enable.auto.commit

指定了消费者是否自动提交偏移量,默认值是true,为了尽量避免重复数据和数据丢失,可以把它设置为false,有自己控制合适提交偏移量,如果设置为true, 可以通过设置 auto.commit.interval.ms属性来控制提交的频率。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5688464.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存