consumer(KafkaConsumer)

consumer(KafkaConsumer),第1张

(一)消费者和消费者组

1、消费者:订阅并消费kafka消息,从属于消费者组

2、消费者组:一个群组里的消费者订阅的是同一个主题,每个消费者接受主题一部分分区的消息。

注:同一个消费者可以消费不同的partition,但是同一个partition不能被不同消费者消费。

(二)消费者群组和分区再均衡

1、再均衡:分区的消费所有权从一个消费者转移到另一个消费者称为再均衡,为消费者组带来了高可用性和可伸缩性。

注:分区何时重新分配:加入消费者或者消费者崩溃等

2、如何判断消费者崩溃:消费者通过向群组协调器(某broker,不同群组可以有不同的群组协调器)发送心跳(一般在拉取消息或者提交偏移量的时候)表示自己仍旧存活,如果长时间不发送心跳则协调器认为期死亡并进行再均衡。

注:在0101版本中,心跳行为不再和获取消息和提交偏移量绑定在一起,有一个单独的心跳线程。

3、分配分区:消费者加入消费者组是,会像群组协调器发送请求,第一个加入的成为“群主”。群主从协调器那里获取成员列表,并负责给每一个消费者分配分区。完毕之后,将分配结果发送给协调器,协调器再将消息发送给所有的消费者,每个消费者只能看到自己的分配信息。只有群主知道所有的消费信息。

(三)参数配置

1、bootstrapserver:host:port

2、keyserializer:键序列化器

3、valueserializer:值序列化器

注:以上为必须设置的

4、groupid:从属的消费者组

5、fetchminbytes:消费者从服务器获取记录的最小字节数。

6、fetchmaxwaitms:消费者等待消费消息的最大时间

7、maxpartitionfetchbytes:服务器从每个分区返回给消费者的最大字节数(需要比broker的设置maxmessagesize属性配置大,否则有些消息无法消费)

8、sessiontimeoutms:指定该消费者在被认为死亡之前可以与服务器断开连接的时间,默认3秒

9、heartbeatintervalms:制定了poll方法向协调器发送心跳的频率。

注:一般9是8的三分之一

10、autooffsetreset:消费者在读取一个没有偏移量分区或者无效偏移量分区的情况下如何处理(latest:从最新记录开始读取,earliest:从最早的记录开始读取)

11、enableauthcommit:消费者是否自动提交偏移量,默认为true

12、autocommitintervalms:自动提交偏移量的时间间隔

13、partitionassignmentstrategy:分区分配给消费者的策略:

(1)range:会把主题若干个连续分区分配给消费者

(2)roundRobin:会把主题的所有分区逐个分配给消费者

14、clientid:任意字符串,broker用来区分客户端发来的消息

15:maxpollrecords:控制poll方法返回的最大记录数

16:receivebufferbytes/sendbufferbytes:tcp缓冲池读写大小

(四)订阅主题

consumersubscribe(list)

(五)轮训(消费者API的核心)

1、轮训作用: 只要消费者订阅了主题,轮训就会处理所有的细节(群组协调、分区再均衡、发送心跳、获取数据

(1)获取数据

(2)第一次执行poll时,负责查找协调器,然后加入群组,接受分配的分区

(3)心跳的发送

(4)再均衡也是在轮训期间进行的

2、方法:poll(),消费者缓冲区没有数据时会发生阻塞,可以传一个阻塞时间,避免无限等待。0表示立即返回。

3、关闭:close(),网络连接随之关闭,立即触发再均衡。

4、线程安全:无法让一个线程运行多个消费者,也无法让多个线程公用一个消费者。

(六)提交和偏移量

1、提交:更新分区当前位置的 *** 作

2、如何提交:消费者往一个特殊主题(_consumer_offset)发送消息,消息中包含每个分区中的偏移量。

3、偏移量:分区数据被消费的位置。

4、偏移量作用:当发生再均衡时,消费者可能会分配到不一样的分区,为了继续工作,消费者需要读取到每个分区最后一次提交的偏移量,然后从偏移量的地方继续处理。

5、提交偏移量的方式

(1)自动提交:经过一个时间间隔,提交上一次poll方法返回的偏移量。每次轮训都会检测是否应该提交偏移量。缺陷:可能导致重复消费

(2)手动提交:commitSysn()提交迁移量,最简单也最可靠,提交由poll方法返回的最新偏移量。缺点:忘了提交可能会丢数据,再均衡可能会重复消费

(3)异步提交:同步提交在提交过程中必须阻塞

(4)同步异步提交组合

(5)提交特定的偏移量

(七)再均衡监听器

(八)从特定偏移量读取数据(seek)

1、从分区开始:seekToBegining

2、从分区结束:seekToEnd

3、ConsumerRebalanceListener和seek结合使用

(九)如何退出

1、前言:wakeup方法是唯一安全退出轮训的方法,从poll方法中退出并抛出wakeupException异常。如果没有碰上轮训,则在下一次poll调用时抛出。

2、退出轮训

(1)另一个线程调用consumerwakeup方法

(2)如果循环在主线程里可以在ShutdownHook里面调用该方法

3、退出之前调用close方法:告知协调器自己要离开,出发再均衡,不必等到超时。

(十)独立消费者(assign为自己分配分区)

Kafka中的消息是存储在磁盘上的,一个分区副本对应一个日志(Log)。为了防止Log过大,Kafka又引入了 日志分段 (LogSegment)的概念,将Log切分为多个LogSegment ,相当于一个 巨型文件被平均分配为多个相对较小的文件 ,这样也便于消息的维护和清理。事实上,Log和LogSegnient 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每个LogSegment对应于磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以txnindex ”为后缀的事务索引文件),下图为topic、partition、副本、log和logSegment之间的关系。

虽然一个log被拆为多个分段,但只有最后一个LogSegment(当前活跃的日志分段)才能执行写入 *** 作,在此之前所有的LogSegment都不能写入数据。当满足以下其中任一条件会创建新的LogSegment。

在索引文件切分的时候,Kafka 会关闭当前正在写入的索引文件并置为只读模式,同时以可读写的模式创建新的索引文件,默认大小为1GB。当下次索引切分时才会设置为实际大小。也就是说,之前的segment都是实际大小,活跃segment大小为1G。

索引的主要目的是提高查找的效率。

Kafka采用稀疏索引(sparse index)的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。而是每当写入一定量(由 broker 端参数 logindex intervalbytes 指定,默认4KB )的消息时,索引文件会增加一个索引项。

消息查找过程

间戳索引文件中包含若干时间戳索引项,每个追加的时间戳索引项中的 timestamp 必须大于之前追加的索引项的 timestamp ,否则不予追加。

消息查找过程

Kafka将消息存储在磁盘中,为了控制磁盘占用空间的不断增加就需要对消息做一定的清理 *** 作。Kafka提供了两种日志清理策略。

kafka有专门的任务来周期性删除不符合条件的日志分段文件,删除策略主要以下有3种。

对于有相同key的不同value值,只保留最后一个版本。如果应用只关心key对应的最新value值,则可以开启Kafka的日志压缩功能,Kafka会定期将相同key的消息进行合井,只保留最新的value值。

基于分区和副本集的相关知识,初步了解Kafka的数据存储、同步原理。

对于消息的生产以及消费逻辑不在本文的讨论范畴,主要就Broker的数据存储做以浅显的总结。

首先解释一下常见的相关专业术语:

Partition是作用于具体的Topic而言的,而不是一个独立的概念。Partition能水平扩展客户端的读写性能,是高吞吐量的 保障。

通俗的将,Partition就是一块保存具体数据的空间,本质就是磁盘上存放数据的文件夹,所以Partition是不能跨Broker存在,也不能在同一个Broker上跨磁盘。

对于一个Topic,可以根据需要设定Partition的个数;Kafka默认的Partition个数numpartitions为1($KAFKA_HOME/config/serverproperties),表示该Topic的所有数据均写入至一个文件夹下;用户也可以在新建Topic的时候通过显示的指定--partitions <integer>参数实现自定义Partition个数。

在数据持久化时,每条消息都是根据一定的分区规则路由到对应的Partition中,并append在log文件的尾部(这一点类似于HDFS);在同一个Partition中消息是顺序写入的且始终保持有序性;但是不同Partition之间不能保证消息的有序性(高吞吐量的保障)。

Kafka也支持动态增加一个已存在Topic的Partition个数,但不支持动态减少Partition个数。因为被减少Partition所对应的数据处理是个难题; 由于Kafka的数据写模式的限制,所以如果要把这些Partition的历史数据集追加到有效的Partition的尾部,就会破坏了Kafka在Partition上消息的有序性,显然是不合理的;但如果按照时间戳重新构分区的数据文件,可 *** 作性和难度都将是非常大的,所以目前并不支持动态减少Partition个数。

Partition是用来存储数据的,但并不是最小的数据存储单元。Partition下还可以细分成Segment,每个Partition是由一个或多个Segment组成。每个Segment分别对应两个文件: 一个是以index结尾的索引文件,另一个是以log结尾的数据文件,且两个文件的文件名完全相同。 所有的Segment均存在于所属Partition的目录下。

Segment的必要性:如果以partition作为数据存储的最小单元,那么partition将会是一个很大的数据文件,且数据量是持续递增的;当进行过期数据清理或消费指定offset数据时, *** 作如此的大文件将会是一个很严重的性能问题。

Replication是Kafka架构中一个比较重要的概念,是系统高可用的一种保障。

Replication逻辑上是作用于Topic的,但实际上是体现在每一个Partition上。

例如:有一个Topic,分区(partitions)数为3(分别为a, b, c),副本因子(replication-factor)数也为3;其本质就是该Topic一共有3个a分区,3个b分区,3个c分区。这样的设计在某种意义上就很大程度的提高了系统的容错率。

那么问题来了:一个Topic下a分区一共有三个,既然是副本集,那这三个所包含的数据都完全一样吗?作用都一样吗?

说到这就不得不引出两个概念:Leader Replica & Follower Replica

 leader partition(主分区) & leader replica(主副本集)其实这两个概念是一回事,因为副本集策略只是一种机制,是为了提高可用性而生的。

这种策略就是作用于partition上的,通俗的说增加副本集个数其实就是增加同一个partition的备份个数,同样的对于主分区而言,就是同一个partition下所有备份中的主副本集。

同一个topic下的不同partition之间是没有主次之分,都是同等重要且存储不同数据的。

当新建一个topic,并指定partition个数后,会在logdirs参数($KAFKA_HOME/config/serverproperties)所指定的目录下创建对应的分区目录,用来存储落到该分区上的数据。

分区目录的命名格式为:topic名称 + 短横线 + 分区序号;序号默认从0开始,最大为分区数 - 1。

为了尽可能的提升服务的可用性和容错率,Kafka遵循如下的分区分配原则:

如集群中有四个节点,均在统一机架上,新建一个topic:demoTopic,指定分区个数为4,副本因子为3,则对应的partition目录分别为:demoTopic-0、demoTopic-1、demoTopic-2、demoTopic-3。

因为集群未跨机架,所以在这里主要验证一下前两条分区分配原则:四个主分区分别位于四个不同的broker上,且另外两个replica也随机分配到除leader所在节点以外的其他三个broker上。

每个Partition全局的第一个Segment文件名均是从0开始,后续每个Segment的文件名为上一个Segment文件中最后一条消息的offset值;数据的大小为64位,20位数字字符的长度,未用到的用0填充。同一个Segment的index文件和log文件的文件名完全相同。

这种命名格式的好处在于可以有效的规避单文件数据量过大导致的 *** 作难问题,不仅如此,还可以方便、快速的定位数据。

例如:要实现从指定offset处开始读取数据,只需要根据给定的offset值与对应Partition下的segment文件名所比对,就可以快速的定位目标数据所在的segment文件,然后根据目标segment的index文件查找给定offset值所对应的实际磁盘偏移量,即可快速在log中读取目标数据。

在Kafka 01010以后,对于每个Segment文件,在原有的index和log文件的基础上,新增加一个timeindex文件,通过该索引文件 可以实现基于时间戳 *** 作消息的功能,具体实现详见Kafka Timestamp。

Kafka中所说的Offset本质上是一个逻辑值,代表的是目标数据对应在Partition上的偏移量;而数据在磁盘上的实际偏移量是存储在对应Segment的index文件中。

通过简单介绍replica之间的offset的变化和更新逻辑,来初步了解Kafka的数据同步机制。

首先引入几个概念:Offset & Replica相关概念

清楚LEO、HW和ISR之间的相互关系是了解Kafka底层数据同步的关键:

Kafka取Partition所对应的ISR中最小的LEO作为整个Partition的HW;

每个Partition都会有自己独立的HW,与此同时leader和follower都会负责维护和更新自己的HW。

对于leader新写入的消息,Consumer不能立刻被发现并进行消费,leader会等待该消息被ISR中所有的replica同步更新HW后,此时leader才会更新该partition的HW为之前新写入消息的offset,此时该消息对外才可见。

在分布式架构中,服务的可用性和数据的一致性是一个绕不开的话题,Kafka也不例外。

如上文所说:当leader接受到一条消息后,需要等待ISR中所有的replica都同步复制完成以后,该消息才能被消费。

如果在同步的过程中,ISR中如果有follower replica的同步落后延迟超过了阈值,则会被leader从ISR中剔除;只要ISR中所有的replica均同步成功,则该消息就一定不会丢失。

从数据的角度出发,这种方式很契合一致性的需求,但是当集群的节点数较多,ISR队里的副本数变大时,每条消息的同步时长可能并不是所有业务场景所能容忍的,所以Kafka在Producer阶段通过 requestrequiredacks 参数提供了不同类型的应答机制以方便用户在系统吞吐量和一致性之间进行权衡:

假如一个Partition有两个Replica,A(Leader)中包含的数据为a, b, c, d, e,LEO为5;B(Follower)包含的数据为a, b, c,LEO为3;此时该Partition的HW为3,Consumer可见的消息为a, b, c,系统对外表示正常;

当follower还未来得及同步消息d、e时,leader挂了,此时B变成Leader,并且Producer重新发了两条消息f和g;因为此时系统中只有B一个存活,所以Partition对外的HW这会更新为5没有问题,Consumer可见的内容为a, b, c, f, g;此时A被唤醒并作为Follower开始从Leader中拉取数据,因为follower自身的HW等于Leader的HW,所以B没有拉去到任何数据,当Producer继续发送消息时,就会导致副本A、B的数据集不一致。

这个问题在01100中通过 leader epoch机制 来消除该问题,可以把epoch理解为代(版本)的概念,即每一次的leader对应一个唯一的epoch,如果leader更换,则对应的epoch值也会随之更换,而过期的epoch请求则都会被忽略。

Kafka——broker宕机后无法消费问题

>

大数据的由来

对于“大数据”(Big data)研究机构Gartner给出了这样的定义。“大数据”是需要新处理模式才能具有更强的决策力、洞察发现力和流程优化能力来适应海量、高增长率和多样化的信息资产。

1

麦肯锡全球研究所给出的定义是:一种规模大到在获取、存储、管理、分析方面大大超出了传统数据库软件工具能力范围的数据集合,具有海量的数据规模、快速的数据流转、多样的数据类型和价值密度低四大特征。

大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。换而言之,如果把大数据比作一种产业,那么这种产业实现盈利的关键,在于提高对数据的“加工能力”,通过“加工”实现数据的“增值”。

从技术上看,大数据与云计算的关系就像一枚硬币的正反面一样密不可分。大数据必然无法用单台的计算机进行处理,必须采用分布式架构。它的特色在于对海量数据进行分布式数据挖掘。但它必须依托云计算的分布式处理、分布式数据库和云存储、虚拟化技术。

大数据需要特殊的技术,以有效地处理大量的容忍经过时间内的数据。适用于大数据的技术,包括大规模并行处理(MPP)数据库、数据挖掘、分布式文件系统、分布式数据库、云计算平台、互联网和可扩展的存储系统。

最小的基本单位是bit,按顺序给出所有单位:bit、Byte、KB、MB、GB、TB、PB、EB、ZB、YB、BB、NB、DB。

大数据的应用领域

大数据无处不在,大数据应用于各个行业,包括金融、 汽车 、餐饮、电信、能源、体能和 娱乐 等在内的 社会 各行各业都已经融入了大数据的印迹。

制造业,利用工业大数据提升制造业水平,包括产品故障诊断与预测、分析工艺流程、改进生产工艺,优化生产过程能耗、工业供应链分析与优化、生产计划与排程。

金融行业,大数据在高频交易、社交情绪分析和信贷风险分析三大金融创新领域发挥重大作用。

汽车 行业,利用大数据和物联网技术的无人驾驶 汽车 ,在不远的未来将走入我们的日常生活。

互联网行业,借助于大数据技术,可以分析客户行为,进行商品推荐和针对性广告投放。

电信行业,利用大数据技术实现客户离网分析,及时掌握客户离网倾向,出台客户挽留措施。

能源行业,随着智能电网的发展,电力公司可以掌握海量的用户用电信息,利用大数据技术分析用户用电模式,可以改进电网运行,合理设计电力需求响应系统,确保电网运行安全。

物流行业,利用大数据优化物流网络,提高物流效率,降低物流成本。

城市管理,可以利用大数据实现智能交通、环保监测、城市规划和智能安防。

体育 娱乐 ,大数据可以帮助我们训练球队,决定投拍哪种 题财的 影视作品,以及预测比赛结果。

安全领域,政府可以利用大数据技术构建起强大的国家安全保障体系,企业可以利用大数据抵御网络攻击,警察可以借助大数据来预防犯罪。

个人生活, 大数据还可以应用于个人生活,利用与每个人相关联的“个人大数据”,分析个人生活行为习惯,为其提供更加周到的个性化服务。

大数据的价值,远远不止于此,大数据对各行各业的渗透,大大推动了 社会 生产和生活,未来必将产生重大而深远的影响。

大数据方面核心技术有哪些?

大数据技术的体系庞大且复杂,基础的技术包含数据的采集、数据预处理、分布式存储、NoSQL数据库、数据仓库、机器学习、并行计算、可视化等各种技术范畴和不同的技术层面。首先给出一个通用化的大数据处理框架,主要分为下面几个方面:数据采集与预处理、数据存储、数据清洗、数据查询分析和数据可视化。

数据采集与预处理

对于各种来源的数据,包括移动互联网数据、社交网络的数据等,这些结构化和非结构化的海量数据是零散的,也就是所谓的数据孤岛,此时的这些数据并没有什么意义,数据采集就是将这些数据写入数据仓库中,把零散的数据整合在一起,对这些数据综合起来进行分析。数据采集包括文件日志的采集、数据库日志的采集、关系型数据库的接入和应用程序的接入等。在数据量比较小的时候,可以写个定时的脚本将日志写入存储系统,但随着数据量的增长,这些方法无法提供数据安全保障,并且运维困难,需要更强壮的解决方案。

Flume NG

Flume NG作为实时日志收集系统,支持在日志系统中定制各类数据发送方,用于收集数据,同时,对数据进行简单处理,并写到各种数据接收方(比如文本,HDFS,Hbase等)。Flume NG采用的是三层架构:Agent层,Collector层和Store层,每一层均可水平拓展。其中Agent包含Source,Channel和 Sink,source用来消费(收集)数据源到channel组件中,channel作为中间临时存储,保存所有source的组件信息,sink从channel中读取数据,读取成功之后会删除channel中的信息。

NDC

Logstash

Logstash是开源的服务器端数据处理管道,能够同时从多个来源采集数据、转换数据,然后将数据发送到您最喜欢的 “存储库” 中。一般常用的存储库是Elasticsearch。Logstash 支持各种输入选择,可以在同一时间从众多常用的数据来源捕捉事件,能够以连续的流式传输方式,轻松地从您的日志、指标、Web 应用、数据存储以及各种 AWS 服务采集数据。

Sqoop

Sqoop,用来将关系型数据库和Hadoop中的数据进行相互转移的工具,可以将一个关系型数据库(例如Mysql、Oracle)中的数据导入到Hadoop(例如HDFS、Hive、Hbase)中,也可以将Hadoop(例如HDFS、Hive、Hbase)中的数据导入到关系型数据库(例如Mysql、Oracle)中。Sqoop 启用了一个 MapReduce 作业(极其容错的分布式并行计算)来执行任务。Sqoop 的另一大优势是其传输大量结构化或半结构化数据的过程是完全自动化的。

流式计算

流式计算是行业研究的一个热点,流式计算对多个高吞吐量的数据源进行实时的清洗、聚合和分析,可以对存在于社交网站、新闻等的数据信息流进行快速的处理并反馈,目前大数据流分析工具有很多,比如开源的strom,spark streaming等。

Strom集群结构是有一个主节点(nimbus)和多个工作节点(supervisor)组成的主从结构,主节点通过配置静态指定或者在运行时动态选举,nimbus与supervisor都是Storm提供的后台守护进程,之间的通信是结合Zookeeper的状态变更通知和监控通知来处理。nimbus进程的主要职责是管理、协调和监控集群上运行的topology(包括topology的发布、任务指派、事件处理时重新指派任务等)。supervisor进程等待nimbus分配任务后生成并监控worker(jvm进程)执行任务。supervisor与worker运行在不同的jvm上,如果由supervisor启动的某个worker因为错误异常退出(或被kill掉),supervisor会尝试重新生成新的worker进程。

Zookeeper

Zookeeper是一个分布式的,开放源码的分布式应用程序协调服务,提供数据同步服务。它的作用主要有配置管理、名字服务、分布式锁和集群管理。配置管理指的是在一个地方修改了配置,那么对这个地方的配置感兴趣的所有的都可以获得变更,省去了手动拷贝配置的繁琐,还很好的保证了数据的可靠和一致性,同时它可以通过名字来获取资源或者服务的地址等信息,可以监控集群中机器的变化,实现了类似于心跳机制的功能。

数据存储

Hadoop作为一个开源的框架,专为离线和大规模数据分析而设计,HDFS作为其核心的存储引擎,已被广泛用于数据存储。

HBase

HBase,是一个分布式的、面向列的开源数据库,可以认为是hdfs的封装,本质是数据存储、NoSQL数据库。HBase是一种Key/Value系统,部署在hdfs上,克服了hdfs在随机读写这个方面的缺点,与hadoop一样,Hbase目标主要依靠横向扩展,通过不断增加廉价的商用服务器,来增加计算和存储能力。

Phoenix

Phoenix,相当于一个Java中间件,帮助开发工程师能够像使用JDBC访问关系型数据库一样访问NoSQL数据库HBase。

Yarn

Yarn是一种Hadoop资源管理器,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。Yarn由下面的几大组件构成:一个全局的资源管理器ResourceManager、ResourceManager的每个节点代理NodeManager、表示每个应用的Application以及每一个ApplicationMaster拥有多个Container在NodeManager上运行。

Mesos

Mesos是一款开源的集群管理软件,支持Hadoop、ElasticSearch、Spark、Storm 和Kafka等应用架构。

Redis

Redis是一种速度非常快的非关系数据库,可以存储键与5种不同类型的值之间的映射,可以将存储在内存的键值对数据持久化到硬盘中,使用复制特性来扩展性能,还可以使用客户端分片来扩展写性能。

Atlas

Atlas是一个位于应用程序与MySQL之间的中间件。在后端DB看来,Atlas相当于连接它的客户端,在前端应用看来,Atlas相当于一个DB。Atlas作为服务端与应用程序通讯,它实现了MySQL的客户端和服务端协议,同时作为客户端与MySQL通讯。它对应用程序屏蔽了DB的细节,同时为了降低MySQL负担,它还维护了连接池。Atlas启动后会创建多个线程,其中一个为主线程,其余为工作线程。主线程负责监听所有的客户端连接请求,工作线程只监听主线程的命令请求。

Kudu

Kudu是围绕Hadoop生态圈建立的存储引擎,Kudu拥有和Hadoop生态圈共同的设计理念,它运行在普通的服务器上、可分布式规模化部署、并且满足工业界的高可用要求。其设计理念为fast analytics on fast data。作为一个开源的存储引擎,可以同时提供低延迟的随机读写和高效的数据分析能力。Kudu不但提供了行级的插入、更新、删除API,同时也提供了接近Parquet性能的批量扫描 *** 作。使用同一份存储,既可以进行随机读写,也可以满足数据分析的要求。Kudu的应用场景很广泛,比如可以进行实时的数据分析,用于数据可能会存在变化的时序数据应用等。

在数据存储过程中,涉及到的数据表都是成千上百列,包含各种复杂的Query,推荐使用列式存储方法,比如parquent,ORC等对数据进行压缩。Parquet 可以支持灵活的压缩选项,显著减少磁盘上的存储。

数据清洗

MapReduce作为Hadoop的查询引擎,用于大规模数据集的并行计算,”Map(映射)”和”Reduce(归约)”,是它的主要思想。它极大的方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统中。

随着业务数据量的增多,需要进行训练和清洗的数据会变得越来越复杂,这个时候就需要任务调度系统,比如oozie或者azkaban,对关键任务进行调度和监控。

Oozie

Oozie是用于Hadoop平台的一种工作流调度引擎,提供了RESTful API接口来接受用户的提交请求(提交工作流作业),当提交了workflow后,由工作流引擎负责workflow的执行以及状态的转换。用户在HDFS上部署好作业(MR作业),然后向Oozie提交Workflow,Oozie以异步方式将作业(MR作业)提交给Hadoop。这也是为什么当调用Oozie 的RESTful接口提交作业之后能立即返回一个JobId的原因,用户程序不必等待作业执行完成(因为有些大作业可能会执行很久(几个小时甚至几天))。Oozie在后台以异步方式,再将workflow对应的Action提交给hadoop执行。

Azkaban

Azkaban也是一种工作流的控制引擎,可以用来解决有多个hadoop或者spark等离线计算任务之间的依赖关系问题。azkaban主要是由三部分构成:Relational Database,Azkaban Web Server和Azkaban Executor Server。azkaban将大多数的状态信息都保存在MySQL中,Azkaban Web Server提供了Web UI,是azkaban主要的管理者,包括project的管理、认证、调度以及对工作流执行过程中的监控等;Azkaban Executor Server用来调度工作流和任务,记录工作流或者任务的日志。

流计算任务的处理平台Sloth,是网易首个自研流计算平台,旨在解决公司内各产品日益增长的流计算需求。作为一个计算服务平台,其特点是易用、实时、可靠,为用户节省技术方面(开发、运维)的投入,帮助用户专注于解决产品本身的流计算需求

数据查询分析

Hive

Hive的核心工作就是把SQL语句翻译成MR程序,可以将结构化的数据映射为一张数据库表,并提供 HQL(Hive SQL)查询功能。Hive本身不存储和计算数据,它完全依赖于HDFS和MapReduce。可以将Hive理解为一个客户端工具,将SQL *** 作转换为相应的MapReduce jobs,然后在hadoop上面运行。Hive支持标准的SQL语法,免去了用户编写MapReduce程序的过程,它的出现可以让那些精通SQL技能、但是不熟悉MapReduce 、编程能力较弱与不擅长Java语言的用户能够在HDFS大规模数据集上很方便地利用SQL 语言查询、汇总、分析数据。

Hive是为大数据批量处理而生的,Hive的出现解决了传统的关系型数据库(MySql、Oracle)在大数据处理上的瓶颈 。Hive 将执行计划分成map->shuffle->reduce->map->shuffle->reduce…的模型。如果一个Query会被编译成多轮MapReduce,则会有更多的写中间结果。由于MapReduce执行框架本身的特点,过多的中间过程会增加整个Query的执行时间。在Hive的运行过程中,用户只需要创建表,导入数据,编写SQL分析语句即可。剩下的过程由Hive框架自动的完成。

Impala

Impala是对Hive的一个补充,可以实现高效的SQL查询。使用Impala来实现SQL on Hadoop,用来进行大数据实时查询分析。通过熟悉的传统关系型数据库的SQL风格来 *** 作大数据,同时数据也是可以存储到HDFS和HBase中的。Impala没有再使用缓慢的Hive+MapReduce批处理,而是通过使用与商用并行关系数据库中类似的分布式查询引擎(由Query Planner、Query Coordinator和Query Exec Engine三部分组成),可以直接从HDFS或HBase中用SELECT、JOIN和统计函数查询数据,从而大大降低了延迟。Impala将整个查询分成一执行计划树,而不是一连串的MapReduce任务,相比Hive没了MapReduce启动时间。

Hive 适合于长时间的批处理查询分析,而Impala适合于实时交互式SQL查询,Impala给数据人员提供了快速实验,验证想法的大数据分析工具,可以先使用Hive进行数据转换处理,之后使用Impala在Hive处理好后的数据集上进行快速的数据分析。总的来说:Impala把执行计划表现为一棵完整的执行计划树,可以更自然地分发执行计划到各个Impalad执行查询,而不用像Hive那样把它组合成管道型的map->reduce模式,以此保证Impala有更好的并发性和避免不必要的中间sort与shuffle。但是Impala不支持UDF,能处理的问题有一定的限制。

Spark

Spark拥有Hadoop MapReduce所具有的特点,它将Job中间输出结果保存在内存中,从而不需要读取HDFS。Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像 *** 作本地集合对象一样轻松地 *** 作分布式数据集。

Nutch

Nutch 是一个开源Java 实现的搜索引擎。它提供了我们运行自己的搜索引擎所需的全部工具,包括全文搜索和Web爬虫。

Solr

Solr用Java编写、运行在Servlet容器(如Apache Tomcat或Jetty)的一个独立的企业级搜索应用的全文搜索服务器。它对外提供类似于Web-service的API接口,用户可以通过>

消费者负责从订阅的主题上拉取消息,消费组是逻辑概念。一个消费者只属于一个消费组,一个消费组包一个或多个消费者。当消息发布到主题后,会被投递到每个消费组,但每个消费组中只有一个消费者能消费给消息。

消费者如何知道该消费哪个分区?当消费组内消费者个数发生变化时,分区分配是如何变化的呢?

按照消费者总数和分区总数进行整除运算来获得一个跨度,然后将分区按照跨度进行平均分配, 以保证分区尽可能均匀地分配给所有的消费者。对于 每一个主题 该策略会将消费组内所有的消费者按照名称的字典序排序然后为每个消费者划分固定的分区范围,如果不够平均分配,那么字典序靠前的消费者会被多分配一个分区。

假设n=分区数/消费者数量,m=分区数%消费者数量,那么前m个消费者每个分配n+1分区,后面的每个消费者分配n个分区。

如图所示主题中共有7个分区,此时消费组内只有一个消费者C0,C0订阅7个分区。

随着消费组内消费者不断加入,分区逐渐从C0分配到C1~C6,当最后一个消费者C7加入后,此时总共有8个消费者但是只有7个分区,因此C7由于分配不到分区而无法消费任何消息。

消费者并非越多越好,消费者数量应小于等于分区数量,否则会造成资源的浪费

缺点:

当一个消费组订阅两个分别包含四个分区的主题时,分区分配结果如下,比较均匀。

但当两个主题各有3个分区时,则会出现如下分区不均的问题。类似情况扩大的话,可能出现消费者过载问题。

将消费组内所有消费者及消费者订阅的所有主题的分区按照字典序排序,然后通过轮询方式将分区依次分配给每个消费者。如果消费组内消费者的订阅信息都是相同的,那么分区分配会比较均匀。如一个消费组两个消费者,分别订阅两个都有3的分区的主题,如图。

但是当消费组内消费者的订阅信息不同时,则会出现分配不均问题。如图,假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,分区结果将会如下图所示。

后来引入的策略,主要目的:

假设三个消费者,订阅了4个主题,每个主题有两个分区,那么初始分区分配结果如下:

乍一看,跟RoundRobin分配策略结果相同,但此时如果C1下线,那么消费组会执行再均衡 *** 作,重新分配消息分区。如果是RoundRobin策略,分配结果如下:

而如果是Sticky分配策略,则结果如下:

StickyAssignor保留了上一次对C0和C2的分配结果,将C1的分区分配给C0和C2使其均衡。

如果发生分区重分配,那么对于同一个分区而 ,有可能之前的消费者和新指派的消费者不是同一个,之前消费者进行到一半的处理还要在新指派的消费者中再次复现一遍,造成重复消费。StickyAssignor分配策略如同其名称中的"sticky"一 样,让分配策略具备的“黏性”,尽可能地让前后两次分配相同,进而减少系统资源的损耗及其他异常情况的发生。

再来看下,消费者订阅信息不相同的情况,拿RoundRobinAssignor中的实例来说。

假设消费组内有三个消费者,主题1/2/3分别有1/2/3个分区,C0订阅主题1,C1订阅主题1和2,C2订阅主题1/2/3,RoundRobinAssignor分区结果将会如下图所示。

而采用StickyAssignor时,分区分配结果如下:

若此时C0下线,RoundRobinAssignor重分配的结果如下:

而StickyAssignor重分配结果如下:

综上:

StickyAssignor分配策略的优点就是可以使分区重分配具备 “黏性”,减少不必要的分区移动(一个分区剥离之前的消费者 ,转而分配给另一个新的消费者)。

Kafka中的消息消费是基于拉模式。

Kafka每次拉取一组消息,每条消息的格式如下:

在每次拉取方法时,它返回的是还没有被消费过的消息集。要实现这个功能,就需要知道上次消费时的消费位移,消费者在消费完消息后要进行消费位移提交动作,且消费位移要进行持久化,消费位移保存在__consumer_offsets主题中。

当前拉取消息的最大offset为x,消费者消费完成提交位移的是offset其实为x+1,表示下次拉取消息的起始位置。

自动提交

默认采用自动提交,默认每隔5s会将拉取到的每个分区的最大的消息位移进行提交。真正的提交动作是在拉取消息的逻辑完成,每次拉取消息前会判断是否可以进行位移提交,如果可以则提交上一次的位移。这里会有两个问题,如下图所示。

重复消费:当前拉取消息x+2,x+7,当前消费到X+5,在提交消费位移前,消费者宕机;新的消费者还是会从X+2开始拉取消息, 因此导致重复消费。

消息丢失:当前拉取消息x+2,x+7,当前消费X+5,到下次拉取的时候,消费位移已经提交为X+8,若此时消费者宕机,新的消费者会从X+8处开始消费,导致X+5 ~ X+7的消息没有被消费,导致消息的丢失。

手动提交

同步提交和异步提交。

同步提交默认提交本次拉取分区消息的最大偏移量,如本次拉取X+2,X+7的消息,同步提交默认提交X+8的位置;当时同步提交也可指定提交的偏移量,比如消费一条提交1次,因为提交本身为同步 *** 作,所以会耗费一定的性能。

同步提交也会导致重复消费的问题,如消费完成后,提交前消费者宕机。

异步提交消费者线程不会被阻塞,使性能得到增强,但异步提交失败重试可能会导致提交位移被覆盖的问题,如本次异步提交offset=X失败,下次异步提交offset=X+y成功;此时前一次提交重试再次提交offset=x,如果业务上没有重试校验,会导致offset被覆盖,最终导致重复消费。

当新的消费组建立、消费者订阅新的主题或之前提交的位移信息因为过期被删除等,此时查不到纪录的消费位移。Kafka可配置从最新或从最早处开始消费。

Kafka还支持从特定位移处开始消费,可以实现回溯消费,Kafka内部提供了Seek()方法,来重置消费位移。

当需要回溯指定时间后的消息时,可先用offsetsForTimes方法查到指定时间后第一条消息的位移,然后再用seek重置位移。

分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除或添加消费者。

Kfaka提供了组协调器(GroupCoordinator)和消费者协调器(ConsumerCoordinator),前者负责管理消费组,后者负责与前者交互,两者最重要的职责就是负责再均衡的 *** 作。

举例说明,当消费者加入消费组时,消费者、消费组和组协调器之间一般会经历以下几个阶段。

第一阶段(FIND COORDINATOR)

消费者需要确定它所属的消费组对应的GroupCoordinator所在的broker并创建与该broker 相互通信的网络连接。

消费者会向集群中的某个节点发送FindCoordinatorRequest请求来查找对应的组协调器。

Kafka根据请求中的coordinator_key(也就是groupld )的哈希值计算__consumer_offsets中的分区编号,如下图所示。找到对应的分区之后,在寻找此分区leader副本所在的broker节点,该节点即为当前消费组所在的组协调器节点。

消费组最终的分区分配方案及组内消费者所提交的消费位移信息都会发送给该broker节点。该broker节点既扮演GroupCoordinato的角色又扮演保存分区分配方案和组内消费者位移的角色,这样可以省去很多不必要的中间轮转所带来的开销。

第二阶段(JOIN GROUP)

在成功找到消费组所对应的GroupCoordinator之后就进入加入消费组的阶段,在此阶段的 消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

组协调器内部主要做了以下几件事:

选举消费组的leader

如果当前组内没有leader,那么第一个加入消费组的则为leader。如果leader挂掉,组协调器会从内部维护的HashMap(消费者信息,key为member_id)中选择第一个key作为新的leader。

选举分区分配策略

前面说的每个消费者可能会上报多个分区分配策略,选举过程如下:

第三阶段(SYNC GROUP)

leader消费者根据在第二阶段中得到的分区分配策略来实施分区分配,然后将分配结果同步到组协调器。各个消费者会向组协调器发送SyncGroupRequest请求来同步分配方案。

请求结构如图,leader发送的请求才会有group_assignment。

其中包含了各个消费者对应的具体分配方案,member_id表示消费者的唯一标识,而 member_assignment是与消费者对应的分配方案,如图

消费者收到具体的分区分配方案后,会开启心跳任务,定期向组协调器发送心跳请求确定彼此在线。

第四阶段(HEARTBEAT)

在正式消费之前,消费者还需要确定拉取消息的起始位置。假设之前已经将最后的消费位移提交成功,那么消费者会请求获取上次提交的消费位移并从此处继续消费。

心跳线程是一个独立的线程,可以在轮询消息的空档发送。如果消费者停发送心跳的时间足够长,组协调器会认为这个消费者已经死亡,则触发一次再均衡行为。

以上就是关于consumer(KafkaConsumer)全部的内容,包括:consumer(KafkaConsumer)、Kafka数据存储、【kafka】kafka理论之partition & replication等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/sjk/9436614.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-28
下一篇 2023-04-28

发表评论

登录后才能评论

评论列表(0条)

保存