kafka系列文章之python-API的使用。
在使用kafka-python时候需要注意,一定要版本兼容,否则在使用生产者会报 无法更新元数据的错误。
在本片测试中java版本为如下,kafka版本为0.10.0,kafka-python版本为1.3.1,目前最新的版本为1.4.4
[root@test2 bin]# java -versionjava version "1.7.0_79"Java(TM) SE Runtime Environment (build 1.7.0_79-b15)Java HotSpot(TM) 64-Bit Server VM (build 24.79-b02, mixed mode)
从官网下载kafka-python,源码安装即可!https://pypi.org/project/kafka-python/1.3.1/
安装完成之后一个简易的测试:
= KafkaProducer(bootstrap_servers=[,b<kafka.producer.future.FutureRecordMetadata at >#我们向scIEnce主题发送了一个“Hello world”消息。可以在控制台使用消费者查看如下[root@test3 bin]# ./kafka-console-consumer.sh --zookeeper=10.0.102.204:2181 --topic scIEnce --from-beginningHello world
上面只是一个简单的实例,主要用来验证当前的python API是否可以使用;下面会详细说明python-kafka的使用。
kafka生成者一个应用程序在很多情况下需要往kafka写入消息:记录用户的活动(用于审计和分析),记录度量指标,保存日志消息,与其他应用程序进行异步通信,缓冲即将写入到数据库的数据,等等。
尽管生产者API使用起来很简单,但消息的发送过程还是比较复杂,如下图(摘自kafka权威指南)
首先从创建一个ProducerRecord对象开始,ProducerRecord对象需要包含目标主题和要发送的内容。我们还可以指定键和分区。在发送ProducerRecord对象时,生产者首先要把键和值对象进行序列化,这样他们才能在网络上传输。python3.x中需要序列化为bytes类型,才能传输。
然后,数据被传给分区器。如果之前在ProducerRecord对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据ProducerRecord对象来选择一个分区。选好分区之后,生成者就指知道该往哪个主题和分区发送这条记录。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的broker上。
服务器在收到消息之后会返回一个响应。如果消息成功写入kafka,就返回一个RecodMetaDate对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。
要往kafka写入消息,首先要创建一个生产者对象,并设置一些属性。下面介绍一些kafka的属性。
--: [], : , : None, : None, : , : None, : , : , : , :<kafka.partitioner.default.DefaultPartitioner at >, : , : , : , : , : , : , : , : None, : None, : [(, , )], : , : , : , : None, : True, : None, : None, : None, : None, : (, ), : , : [], : , : , :<class >, : None, : None, =<class >===all,只有当所有参与复制的节点全部收到消息时,生成者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器 发生崩溃,整个集群仍然可以运作。不过,它的延迟比acks=.-----.就是用 *** 作系统的默认值。如果生产者或 消费者与broker处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。kafka消费者
应用程序从kafkaconsumer向kafka订阅主题,并从订阅的主题上接收消息。从kafka读取数据不同于从其他消息系统读取数据,它涉及一些概念,需要先理解一下!
消费者和消费者群组设想一种情况:应用程序从kafka订阅主题,读取消息,但是我们知道生产者在向主题写入消息时,可以是多个生产者并发写入的,这时候生产者向主题写入消息的速度超过了应用程序验证数据的速度,这个时候该怎么处理?如果只使用单个消费者处理消息,应用程序会远远跟不上消息的生成速度。显然,此时很有必要对消费者进行横向伸缩,就像多个生产者向相同主题写入消息一样,我们也可以使用多个消费者从同一个逐日读取消息,对消息进行分流。
kafka消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。如果往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。
往群组里添加消费者是横向伸缩消费能力的主要方式。kafka消费者经常会做一些高延迟的 *** 作,比如把数据写到数据库或HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让他们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。
kafka设计的主要目标之一,就是要让kafka主题里的数据能够满足企业各种应用场景的需求。在这些场景中,每个应用程序可以获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序有自己的消费者群组,就可以让他们获取到主题的所有的消息。不同于传统的消息系统,横向伸缩kafka消费者和消费者群组并不会对性能造成负面影响。【每个消费者群组得道的是所有的消息,而不是部分的消息】
消费者群组与分区再均衡主要说一些概念性的东西
一个新的悄费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩愤时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时, 比如管理员添加了新的分区,会发生分区重分配。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要, 它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除梢费者),不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。我们将在本章讨论如何进行安全的再均衡,以及如何避免不必要的再均衡。
消费者通过向被指派为群组协调器的broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。如果一个消费者发生崩愤,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。
在0. 10.1 版本里, Kafka 社区引入了一个独立的心跳线程,可以在轮均消息的空档发送心跳。这样一来,发送心跳的频率(也就是消费者群纽用于检测发生崩溃的消费者或不再发送心跳的消费者的时间)与消息轮询的频率(由处理消息所花费的时间未确定)之间就是相互独立的。在新版本的Kafka 里,可以指定消费者在离开群纽并触发再均衡之前可以有多长时间不进行消息轮询,这样可以避免出现活锁(livel ock ) ,比如有时候应用程序并没有崩溃,只是由于某些原因导致无法正常运行。这个配直与
session.timeout.ms 是相互独立的,后者用于控制检测消费者发生崩溃的时间和停止发送心跳的时间。
当消费者要加入群组时,它会向群组协调器发送一个JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个悄费者分配分区。它使用一个实现了partitionAssignor接口的类来决定哪些分区应该被分配给哪个消费者。
上面介绍了消费者和消费者群组的一些理论性东西,下面来简单创建一个消费者!
= KafkaConsumer(, bootstrap_servers=[], auto_offset_reset= i
上面print打印出的一个数值如下:
ConsumerRecord(topic=, partition=, offset=, timestamp=, timestamp_type=, key=None, value=b, checksum=, serialized_key_size=-, serialized_value_size=)#返回的是一个consumer对象,包含了一些元数据信息。
消息轮询是消费者API的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调,分区再均衡,发送心跳和获取数据,开发者只需要使用一组简单的API来处理从分区返回的数据。上面的代码是一个简单的利用for循环的轮询。
消费者的参数配置1. fetch.min.bytes:该属性指定了消费者从服务器获取记录的最小字节数。broker在收到消费者的数据请求时,如果可用的数据量小于fetch.min.bytes 指定的大小,那么它会等到
有足够的可用数据时才把它返回给消费者。这样可以降低消费者和broker的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有
很多可用数据,但消费者的cpu 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低broker 的工作负载。2. fetch.max.wait.ms:我们通过fetch.min.bytes告诉Kafka ,等到有足够的数据时才把它返回给消费者。而fetch.max.wait.ms 则用于指定broker 的等待时间,默认是500ms
,如果没有足够的数据流入Kafka ,消费者获取最小数据量的要求就得不到满足,最终导致500ms 的延迟。如果要降低潜在的延迟(为了满足SLA ),可以把该参数值设置得小一些。
如果fetch.max.wait.ms被设为lOOms ,并且fetch.min.bytes被设为1MB ,那么Kafka 在收到消费者的请求后,要么返回1MB 数据,要么在1OOms 后返回所有可用的数据,
就看哪个条件先得到满足。3. max.partition.fetch.bytes:该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB,也就是说kafkaconsumer.poll() 方法从每个分区里返回的记录最
多不超过max.partition.fetch.bytes指定的字节。如果一个主题有20 个分区和5 个消费者,那么每个消费者需要至少4MB 的可用内存来接收记录。在为消费者分配内存时,
可以给它们多分配一些,因为如果群组里有消费者发生崩愤,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes的值必须比broker 能够接收的最大消息的字节
数(通过max.message.size属性配置)大, 否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。
消费者需要频繁调用poll()方法来避免会话过期和发生分区再均衡,如果单次调用poll()返回的数据太多,消费者需要更多的时间来处理,可能无怯及时进行下一个轮询来避免会
话过期。如果出现这种情况, 可以把max.partition.fetch.bytes值改小,或者延长会i舌过期时间。4. session.timeout.ms:该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s 。如果消费者没有在session.timeout.ms 指定的时间内发送心跳给群组协调
器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与heartbeat.interval.ms紧密相关。heartbeat.interval.ms指定了poll()
方法向协调器发送心跳的频率, session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms必须比
session.timeout.ms 小, 一般是session.timeout.ms 的三分之一。如果session.timeout.ms 是3s ,那么heartbeat.interval.ms应该是1s 。
把session.timeout.ms 值设得比默认值小,可以更快地检测和恢复崩愤的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设 置得大一些,可以减少意外的再均衡,不过检测节点崩愤-需要更长的时间。5.auto.offset.reset: 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时井被删除)该作何处理。它的默认
值是latest , 意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是earList,意思是说,在偏移量无效的情况下,
消费者将从起始位置读取分区的记录。6.enable.auto.commit: 该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设为false ,由自己控制何时提交偏移量。如果把
它设为true ,还可以通过配置auto.commit.interval.ms属性来控制提交的频率。7.partition.assignment.strategy: 我们知道,分区会被分配给群组里的消费者。PartitionAssignor根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。
Kafka 有两个默认的分配策略。
Range:该策略会把主题的若干个连续的分区分配给消费者。假设悄费者c1 和消费者c2 同时订阅了主题t1 和主题t2 ,井且每个主题有3 个分区。那么消费者c1有可能分配到这两个主题的分区0
和分区1 ,而消费者C2 分配到这两个主题的分区2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了Range
策略,而且分区数量无怯被消费者数量整除,就会出现这种情况。RoundRobin:该策略把主题的所有分区逐个分配给消费者。如果使用RoundRobin 策略来给消费者c1和消费者c2 分配分区,那么消费者c1 将分到主题T1的分区0 和分区2 以及主题t2的分区1 ,消费
者C2 将分配到主题t1 的分区1 以及主题t2的分区0 和分区2 。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见) , RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。
可以通过设置partition.assignment.strategy来选择分区策略。默认使用的是org.apache.kafka.clIEnts.consumer.RangeAssignor,这个类实现了Range策略,不过也可以把
它改为org.apache.kafka.clIEnts.consumer.RoundRobinAssignor。还可以自定义策略,在这种情况下,partition.assignment,strategy属性的值就是自定义类的名字。8.clIEnt.ID:该属性可以是任意字符串, broker 用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。
9.max.poll.records:该属性用于控制单次调用call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。10. receive.buffer.bytes和send.buffer.bytes: socket 在读写数据时用到的TCP 缓冲区也可以设置大小。如果它们被设为-1 ,就使用 *** 作系统的默认值。如果生产者或消费者
与broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。
上面介绍了kafka-python中生成者与消费者的理论概念,下一篇博文会给出怎么使用kafka的API接口!
总结
以上是内存溢出为你收集整理的python-kafka之理论篇全部内容,希望文章能够帮你解决python-kafka之理论篇所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)