数据写入kafka的分区策略

数据写入kafka的分区策略,第1张

众所周知,kafka有分区的概念,生产者写入数据到kafka,涉及到数据到底写到哪个分区?

kafka api提供了默认的partitioner函数,具体策略如下:

1、如果生产者写入数据时,指定了具体分区,则使用该分区

2、如果生产者没有指定分区,但是提供了key,partitioner函数会对该key取hash再对topic数量取模以确定分区

3、如果生产者既没有指定分区,也没有提供key,则采用round-robin的方式选取分区

项目中用到filebeat采集数据到kafka,如果filebeat配置文件中配置了round-robin,下游在消费数据时,打印recordkey可以发现key=null,每条数据到分区也不一样

(一)消费者和消费者组

1、消费者:订阅并消费kafka消息,从属于消费者组

2、消费者组:一个群组里的消费者订阅的是同一个主题,每个消费者接受主题一部分分区的消息。

注:同一个消费者可以消费不同的partition,但是同一个partition不能被不同消费者消费。

(二)消费者群组和分区再均衡

1、再均衡:分区的消费所有权从一个消费者转移到另一个消费者称为再均衡,为消费者组带来了高可用性和可伸缩性。

注:分区何时重新分配:加入消费者或者消费者崩溃等

2、如何判断消费者崩溃:消费者通过向群组协调器(某broker,不同群组可以有不同的群组协调器)发送心跳(一般在拉取消息或者提交偏移量的时候)表示自己仍旧存活,如果长时间不发送心跳则协调器认为期死亡并进行再均衡。

注:在0101版本中,心跳行为不再和获取消息和提交偏移量绑定在一起,有一个单独的心跳线程。

3、分配分区:消费者加入消费者组是,会像群组协调器发送请求,第一个加入的成为“群主”。群主从协调器那里获取成员列表,并负责给每一个消费者分配分区。完毕之后,将分配结果发送给协调器,协调器再将消息发送给所有的消费者,每个消费者只能看到自己的分配信息。只有群主知道所有的消费信息。

(三)参数配置

1、bootstrapserver:host:port

2、keyserializer:键序列化器

3、valueserializer:值序列化器

注:以上为必须设置的

4、groupid:从属的消费者组

5、fetchminbytes:消费者从服务器获取记录的最小字节数。

6、fetchmaxwaitms:消费者等待消费消息的最大时间

7、maxpartitionfetchbytes:服务器从每个分区返回给消费者的最大字节数(需要比broker的设置maxmessagesize属性配置大,否则有些消息无法消费)

8、sessiontimeoutms:指定该消费者在被认为死亡之前可以与服务器断开连接的时间,默认3秒

9、heartbeatintervalms:制定了poll方法向协调器发送心跳的频率。

注:一般9是8的三分之一

10、autooffsetreset:消费者在读取一个没有偏移量分区或者无效偏移量分区的情况下如何处理(latest:从最新记录开始读取,earliest:从最早的记录开始读取)

11、enableauthcommit:消费者是否自动提交偏移量,默认为true

12、autocommitintervalms:自动提交偏移量的时间间隔

13、partitionassignmentstrategy:分区分配给消费者的策略:

(1)range:会把主题若干个连续分区分配给消费者

(2)roundRobin:会把主题的所有分区逐个分配给消费者

14、clientid:任意字符串,broker用来区分客户端发来的消息

15:maxpollrecords:控制poll方法返回的最大记录数

16:receivebufferbytes/sendbufferbytes:tcp缓冲池读写大小

(四)订阅主题

consumersubscribe(list)

(五)轮训(消费者API的核心)

1、轮训作用: 只要消费者订阅了主题,轮训就会处理所有的细节(群组协调、分区再均衡、发送心跳、获取数据)

(1)获取数据

(2)第一次执行poll时,负责查找协调器,然后加入群组,接受分配的分区

(3)心跳的发送

(4)再均衡也是在轮训期间进行的

2、方法:poll(),消费者缓冲区没有数据时会发生阻塞,可以传一个阻塞时间,避免无限等待。0表示立即返回。

3、关闭:close(),网络连接随之关闭,立即触发再均衡。

4、线程安全:无法让一个线程运行多个消费者,也无法让多个线程公用一个消费者。

(六)提交和偏移量

1、提交:更新分区当前位置的 *** 作

2、如何提交:消费者往一个特殊主题(_consumer_offset)发送消息,消息中包含每个分区中的偏移量。

3、偏移量:分区数据被消费的位置。

4、偏移量作用:当发生再均衡时,消费者可能会分配到不一样的分区,为了继续工作,消费者需要读取到每个分区最后一次提交的偏移量,然后从偏移量的地方继续处理。

5、提交偏移量的方式

(1)自动提交:经过一个时间间隔,提交上一次poll方法返回的偏移量。每次轮训都会检测是否应该提交偏移量。缺陷:可能导致重复消费

(2)手动提交:commitSysn()提交迁移量,最简单也最可靠,提交由poll方法返回的最新偏移量。缺点:忘了提交可能会丢数据,再均衡可能会重复消费

(3)异步提交:同步提交在提交过程中必须阻塞

(4)同步异步提交组合

(5)提交特定的偏移量

(七)再均衡监听器

(八)从特定偏移量读取数据(seek)

1、从分区开始:seekToBegining

2、从分区结束:seekToEnd

3、ConsumerRebalanceListener和seek结合使用

(九)如何退出

1、前言:wakeup方法是唯一安全退出轮训的方法,从poll方法中退出并抛出wakeupException异常。如果没有碰上轮训,则在下一次poll调用时抛出。

2、退出轮训

(1)另一个线程调用consumerwakeup方法

(2)如果循环在主线程里可以在ShutdownHook里面调用该方法

3、退出之前调用close方法:告知协调器自己要离开,出发再均衡,不必等到超时。

(十)独立消费者(assign为自己分配分区)

121 新建保存工作簿(覆盖创建)

获取当前活动工作表的:

以索引值方式获取工作表:

以工作表名获取: wb['工作表名'],注意,此表达方式为切片显示,所以没有成员提示。很少用

循环工作表:很好用,一般用sheetnames

获取所有工作表名:wbsheetnames

获取指定工作表名

修改工作表名称

新建工作表时的默认工作表名:

workbookremove(工作表)

A1 表示法: 工作表['A1'] ,R1C1 表示法:工作表cell(行号,列号)

2工作表['起始行号': '结束行号']或者工作表['起始行号: 结束行号'],此方法是按行读取的数据。

3工作表['起始列号': '结束列号']或者工作表['起始列号: 结束列号'],

此方法是按列读取的数据。

4获取(按行)指定工作表所有已用数据:

list(workbookworksheets[索引值]values)

按行求和(方法 1)

按行求和(方法 )

按列统计平均值

按行获取工作表使用区域数据:worksheetrows

按列获取工作表使用区域数据:worksheetcolumns

获取工作表中最小行号:worksheetmin_row

获取工作表中最小列号:worksheetmin_column

获取工作表中最大行号:worksheetmax_row

获取工作表中最大列号:worksheetmax_column

获取单元格的行号:cellrow

获取单元格的列号:cellcolumn iter

方法获取指定区域:

1按行获取指定工作表单元格区域:worksheetiter_rows(……)

2按列获取指定工作表单元格区域:worksheetiter_cols(……)

可以通过 min_row、min_col、max_col、max_row 这几个参数进行单元格区域的控制

A1 表示法:工作表['A1']=值,R1C1 表示法:工作表cell(行号,列号,值)

在最后一行写入数据:工作表append(列表)

1102 实例应用(九九乘法表)

最后加一列写优秀

一、背景

随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。

使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。

二、现有方法及问题

对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张MySQL 表中,这张中间表对应了业务需要的 Elasticsearch 索引,每一列对应索引中的一个Mapping 字段。通过脚本以 Crontab 的方式,读取 MySQL 中间表中 UTime 大于上一次读取时间的所有数据,即该段时间内的增量,写入Elasticsearch。

所以,一旦业务逻辑中有相应字段的数据变更,需要同时顾及 MySQL 中间表的变更;如果需要 Elasticsearch 中的数据即时性较高,还需要同时写入 Elasticsearch。

随着业务数据越来越多,MySQL 中间表的数据量越来越大。当需要在 Elasticsearch 的索引中新增 Mapping 字段时,相应的 MySQL 中间表也需要新增列,在数据量庞大的表中,扩展列的耗时是难以忍受的。

而且 Elasticsearch 索引中的 Mapping 字段随着业务发展增多,需要由业务方增加相应的写入 MySQL 中间表方法,这也带来一部分开发成本。

三、方案设计

1 整体思路

现有的一些开源数据同步工具,如阿里的 DataX 等,主要是基于查询来获取数据源,这会存在如何确定增量(比如使用utime字段解决等)和轮询频率的问题,而我们一些业务场景对于数据同步的实时性要求比较高。为了解决上述问题,我们提出了一种基于 MySQL Binlog 来进行 MySQL 数据同步到 Elasticsearch 的思路。Binlog 是 MySQL 通过 Replication 协议用来做主从数据同步的数据,所以它有我们需要写入 Elasticsearch 的数据,并符合对数据同步时效性的要求。

使用 Binlog 数据同步 Elasticsearch,业务方就可以专注于业务逻辑对 MySQL 的 *** 作,不用再关心数据向 Elasticsearch 同步的问题,减少了不必要的同步代码,避免了扩展中间表列的长耗时问题。

经过调研后,我们采用开源项目 go-mysql-elasticsearch 实现数据同步,并针对马蜂窝技术栈和实际的业务环境进行了一些定制化开发。

2 数据同步正确性保证

公司的所有表的 Binlog 数据属于机密数据,不能直接获取,为了满足各业务线的使用需求,采用接入 Kafka 的形式提供给使用方,并且需要使用方申请相应的 Binlog 数据使用权限。获取使用权限后,使用方以 Consumer Group 的形式读取。

这种方式保证了 Binglog 数据的安全性,但是对保证数据同步的正确性带来了挑战。因此我们设计了一些机制,来保证数据源的获取有序、完整。

1) 顺序性

通过 Kafka 获取 Binlog 数据,首先需要保证获取数据的顺序性。严格说,Kafka 是无法保证全局消息有序的,只能局部有序,所以无法保证所有 Binlog 数据都可以有序到达 Consumer。

但是每个 Partition 上的数据是有序的。为了可以按顺序拿到每一行 MySQL 记录的 Binglog,我们把每条 Binlog 按照其 Primary Key,Hash 到各个 Partition 上,保证同一条 MySQL 记录的所有 Binlog 数据都发送到同一个 Partition。

如果是多 Consumer 的情况,一个 Partition 只会分配给一个 Consumer,同样可以保证 Partition 内的数据可以有序的 Update 到 Elasticsearch 中。

2) 完整性

考虑到同步程序可能面临各种正常或异常的退出,以及 Consumer 数量变化时的 Rebalance,我们需要保证在任何情况下不能丢失 Binlog 数据。

利用 Kafka 的 Offset 机制,在确认一条 Message 数据成功写入 Elasticsearch 后,才 Commit 该条 Message 的 Offset,这样就保证了数据的完整性。而对于数据同步的使用场景,在保证了数据顺序性和完整性的情况下,重复消费是不会有影响的。

四、技术实现

1 功能模块

配置解析模块

负责解析配置文件(toml 或 json 格式),或在配置中心(Skipper)配置的 json 字符串。包括 Kafka 集群配置、Elasticsearch 地址配置、日志记录方式配置、MySQL 库表及字段与 Elasticsearch 的 Index 和 Mapping 对应关系配置等。

规则模块

规则模块决定了一条 Binlog 数据应该写入到哪个 Elasticsearch 索引、文档_id 对应的 MySQL 字段、Binlog 中的各个 MySQL 字段与索引 Mapping 的对应关系和写入类型等。

在本地化过程中,根据我们的业务场景,增加了对 MySQL 表各字段的 where 条件判断,来过滤掉不需要的 Binlog 数据。

Kafka 相关模块

该模块负责连接 Kafka 集群,获取 Binlog 数据。

Binlog 数据解析模块

原项目中的 Binlog 数据解析针对的是原始的 Binlog 数据,包含了解析 Replication 协议的实现。在我们的使用场景中,Binlog 数据已经是由 canal 解析成的 json 字符串,所以对该模块的功能进行了简化。

binlog json字符串示例

上面是一个简化的 binlog json 字符串,通过该条 binlog 的 database 和 table 可以命中一条配置规则,根据该配置规则,把 Data 中的 key-value 构造成一个与对应 Elasticsearch 索引相匹配的 key-value map,同时包括一些数据类型的转换:

Elasticsearch相关模块

Binlog 数据解析模块生成的 key-value map,由该模块拼装成请求_bulk 接口的 update payload,写入 Elasticsearch。考虑到 MySQL 频繁更新时对 Elasticsearch 的写入压力,key-value map 会暂存到一个 slice 中,每 200ms 或 slice 长度达到一定长度时(可以通过配置调整),才会调用 Elasticsearch 的_bulk 接口,写入数据。

2 定制化开发

1) 适应业务需求

upsert

业务中使用的索引数据可能是来自多个不同的表,同一个文档的数据来自不同表的时候,先到的数据是一条 index,后到的数据是一条 update,在我们无法控制先后顺序时,需要实现 upsert 功能。在_bulk 参数中加入

Filter

实际业务场景中,可能业务需要的数据只是某张表中的部分数据,比如用 type 字段标识该条数据来源,只需要把 type=1或2的数据同步到 Elasticsearch 中。我们扩展了规则配置,可以支持对 Binlog 指定字段的过滤需求,类似:

2)快速增量

数据同步一般分为全量和增量。接入一个业务时,首先需要把业务现有的 历史 MySQL 数据导入到 Elasticsearch 中,这部分为全量同步。在全量同步过程中以及后续增加的数据为增量数据。

在全量数据同步完成后,如果从最旧开始消费 Kafka,队列数据量很大的情况下,需要很长时间增量数据才能追上当前进度。为了更快的拿到所需的增量 Binlog,在 Consumer Group 消费 Kafka 之前,先获取各个 Topic 的 Partition 在指定时间的 offset 值,并 commit 这些 offset,这样在 Consumer Group 连接 Kafka 集群时,会从刚才提交的 offset 开始消费,可以立即拿到所需的增量 Binlog。

3) 微服务和配置中心

项目使用马蜂窝微服务部署,为新接入业务提供了快速上线支持,并且在业务 Binlog 数据突增时可以方便快速的扩容 Consumer。

马蜂窝配置中心支持了各个接入业务的配置管理,相比于开源项目中的 toml 格式配置文件,使用配置中心可以更方便的管理不同业务不同环境的配置。

五、日志与监控

从上图中可以看出,订单各个表的数据同步延时平均在 1s 左右。把延时数据接入 ElastAlert,在延时数据过多时发送报警通知。

另一个监控指标是心跳检测,单独建立一张独立于业务的表,crontab 脚本每分钟修改一次该表,同时检查上一次修改是否同步到了指定的索引,如果没有,则发送报警通知。该心跳检测,监控了整个流程上的 Kafka、微服务和 ES,任何一个会导致数据不同步的环节出问题,都会第一个接到通知。

六、结语

目前接入的最重要业务方是电商的订单索引,数据同步延时稳定在 1s 左右。这次的开源项目本地化实践,希望能为一些有 Elasticsearch 数据同步需求的业务场景提供帮助。

这个简单,假如你的表格id为table,表格第一列为checkbox,然后在checkbox选中的情况下将表格所有第二列中的数据放到一个数组中

var data = [];$(function(){ $("#table")find(":checkbox:checked")each(function(){ var val = $(this)parent()next()text(); datapush(val); });});

以上就是关于数据写入kafka的分区策略全部的内容,包括:数据写入kafka的分区策略、consumer(KafkaConsumer)、python处理excel完整版等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/9617770.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023-04-30
下一篇 2023-04-30

发表评论

登录后才能评论

评论列表(0条)

保存