- 知识点
- 0. 介绍
- 1. HDFS
- 1.1 读数据
- 1.2 写数据
- 1.3 块大小
- 1.4 Yarn调度
- 1.4.1 Job提交流程
- 1.4.2 调度器
- 2. MapReduce、Hive
- 2.1 运行过程
- 2.1 切片大小
- 2.2 CombineTextInputFormat
- 2.3 压缩方式
- 2.5 数据倾斜
- 2.6优化
- 2.5.1 慢的原因
- 2.5.2 优化方案
- 2.7 如何设置maptask、reducetask个数
- 2.8 笛卡尔积的优化
- 2.9 四种排序
- 2.10 UDF、UDAF、UDTF 的区别
- 2.11 行列转换
- 2.12 开窗函数
- 3.Zookeeper
- 3.1架构
- 3.2 选举机制
- 3.3 ZAB协议
- 3.4 Watch机制
- 3.5 脑裂
- 4.Flume
- 4.1 数据丢失
- 4.2 拦截器
- 4.3 Channel选择器
- 4.4 优化
- 4.5 监控
- 4.6 HDFS Sink小文件的处理
- 4.7 和KafaKa的比较
- 5.Kafka
- 5.1 架构
- 5.2 分区
- 5.2.1 分区数量
- 5.2.2 分区副本
- 5.2.3 分区分配策略
- 5.3 KafKa监控
- 5.4 数据丢失和重复消费
- 5.4.1 问题
- 5.4.2 解决方案
- 5.5 存储机制
- 5.6 宕机怎么办
- 5.7 Kafka为什么不支持读写分离
- 5.8 Kafka 的数据 offset 读取流程
- 5.9 KafKa消费能力不足
- 5.10 Pulsar
- 6.Hbase
- 7.Sqoop
- 8.Spark
- 9.Flink
- 10.数仓建模及分层
- 10.1 数仓建模
- 10.2 数仓分层
- 11.数据中台
- 11.1 概念
- 11.2 问题及中台的解决方案
2.项目 基本法项目 代理人基本管理办法,目的是追踪代理人的发展状况,反哺公司政策。 项目技术流程上是这样。 业务数据:sqoop 同步到数仓的ods层,然后在数仓中在数据加工,最后将数据推送到应用端,如果pg、kylin、oracle等 埋点数据:flume 采集日志做简单清洗将日志数据写到kafka,再用flume消费kafka写到hdfs,再用hive做加工 难点: (1)业务逻辑复杂,打比方; (2)时效要求高,但上游出数晚; 方案:a) 要上游落地批次任务的状态日志表,然后Sqoop脚本轮询读取任务批次的写入状态 b) 调优要求高。 c) 超过几点还没有完成,取T+2数据 问他的: 1)mapjoin内存溢出,如何合理设定内存大小 2)上游硬删除问题 增量同步 3)当前使用的技术框架 分层: 1) ods:sqoop 增量、全量同步 2) dwd:事实明细表 命名规范:体现一级二级主题域、运行频次、事实内容 字段规范:码值标准化、词根标准化、时间格式标准化、数值格式标准化 存储格式:ORCFile 分区规范:时间分区(日、月、年) 长度、命名 数据处理:主键空值数据过滤、缺省值补充 3) dws:事实逻辑表 维度建模:星型模型 事实表+维度 主题宽表 4) sub:轻度汇总指标表 宽表/纵表 5) agg:多维度指标宽表 6) app:应用层 kylin druid pg Oracle1. HDFS 1.1 读数据
1.Client向NameNode发起读请求 2.NameNode检查文件是否存在,Client是否有权限读等,然后根据元数据中存储的Block信息,将存储Block块的DataNode地址返回给Client(选择Block规则:网络拓扑图中最近的DataNode+心跳机制) 3.Client通过地址访问对应的DataNode进行数据并行读取,并对返回的数据检查checksum(数据写入的会记录数量) 4.传输完成后,在Clinet端进行block块数据合并1.2 写数据
1.Client向NameNode发起写请求 2.NameNode检查写入的文本是否存在,Client是否有权限写入等,NameNode根据机架感知和网络拓扑及副本机制进行文件分配,告知Client DataNode的地址 3.Client向DataNode1,2,3依次建立Pipeline管道通信,将数据以packet为单位(64k)添加到队列等待写入,数据依次写入3个节点并且3个节点反向依次返回ack确认包,如果成功则继续写入第二个block块,所有块写完后关闭流1.3 块大小
本地模式:32M Hadoop1.X: 64M Hadoop2.X: 128M1.4 Yarn调度 1.4.1 Job提交流程
1. 客户端向Resource Manager提交job运行的请求 2. Resource Manager进行检查,没有问题后向客户端返回一个共享资源路径和job_id 3. 客户端将资源放到共享路径下 4. 客户端向Resource Manager反馈,资源已经放入共享路径下,并申请运行mrAppMaster 5. Resource Manager将用户的请求初始化为一个Task,并放入调度队列当中 6. Node Manager领取到Task 7. Resource Manager在这个节点上启动一个Container并启动MRAppmaster 8. MRAppMaster到共享资源路径下下载共享资源(主要是分片和job) 9. MRAppMaster向Resource Manager申请maptask的运行资源 10. 当Node Manager领取到maptask的资源后,Resource Manager向MRAppMaster返回资源节点信息 (返回节点时,有就近原则,优先返回当前的maptask所处理切片的实际节点,数据处处理的时候可以做到数据的本地化处理。如果是多副本的时候就在多副本的任意节点。而reducetask任务在任意不忙的节点上启动) 11. MRAppMaster发送启动脚本到对应节点上,该节点启动一个Container,并启动maptask任务,定时向MRAppMaster汇报自己的运行状态和进度 12. maptask任务到对应的共享资源路径下下载相应的资源,当一个maptask运行成功后,MRAppMaster就会向Resource Manager申请reducetask的运行资源,并启动reducetask 13. reducetask从map中拉取数据,当所有的maptask运行结束后,启动reduce任务进行计算 14. 当所有的任务全部结束后,MRAppMaster向Resource Manager申请注销自己并释放资源1.4.2 调度器
1.FIFO 先进先出,同一时间队列只有一个任务在执行 2.Capacity Scheduler apache版本默认调度器,多队列,同一时间,一个队列只有一个任务在执行 3.Fair Scheduler CDH默认调度器,多队列,每个队列内部按照缺额大小分配资源启动任务2. MapReduce、Hive 2.1 运行过程
1.Client获取待处理数据的信息,形成任务分配的规划 2.提交切片信息到Yarn 3.Yarn创建MrAppMaster,根据切片信息计算出MapTask数量,并启动MapTask 4.Mapper读取要处理的数据(InputFormat,默认实现TextInputFormat,实际上是RecordReader,可自定义) 5.Mapper逻辑处理 6.然后将Mapper的每条结果进行collect收集,在collect中,会先对其分区,默认是HashPartitioner(规则是 key.hashCode() % numReducerTasks),可自定义。 7.接下来将kv数据写入到内存中,即环形缓冲区中(默认大小100M),环形缓冲区一侧写kv数据,一侧写数据索引及元数据信息(Partition、KeyStarPos、ValueStarPos、ValueLength),写到80%后开始溢写,并对不同的分区分别对key做字典排序(快排)。 8.如果有设置Combiner,那么在溢写前会先做Combiner局部汇总,将key相同的value加起来,减少溢写数据量。Combiner父类就是Reduce,和Reduce的区别在于运行的位置不同,Combiner是对每一个MapTask运行,Reduce是对接收到的所有Mapper输出结果运行。Combiner能应用的前提是不影响最终的业务逻辑,输出的KV和Reduce的输入对应起来。如:求平均数就不可以;求和可以; 9.每次溢写都会在磁盘上生成一个临时文件,多次溢写的话在数据处理结束后将对一些文件做Merge合并(归并排序),如果设置了输出压缩,还会将数据压缩后再溢写到磁盘。 10.所有MapTask任务完成后,启动相应数量的ReduceTask(分区数),来自不同的MapTask数据会被拉取分发到一个Reduce,如果数据超过内存大小,数据会溢写到磁盘,并且Reduce会合并文件(归并排序)。 11.Reduce处理逻辑,并输出(OutputFormat,默认TextOutputFormat)到HDFS上(part-r-***文件)2.1 切片大小
//maptask的并行度由切片个数决定,FileInputFormat的默认切片规则如下 //1.简单来说就是取三者的中位数,默认即块大小 long splitSize = computeSplitSize(blockSize, minSize, maxSize) //minSize:1 //maxSize: Long.MaxValue //blockSize: 块大小 pretected long computeSplitSize(long blockSize, long minSize, long maxSize){ return Max.max(minSize,Max.min(maxSize, blockSize)); } //注:整个切片的核心过程在getSplit()方法中完成,inputSplit只记录了切片的元数据信息,比如起始位置、长度以及所在的节点列表。切花规划文件会提交到Yarn上,Yarn上的MrAppMaster就可以根据切片规划文件计算开启MapTask的个数。
block设置的依据
寻址时间为传输时间的1%时,为最佳状态;HDFS中平均寻址时间大概为10ms,换算后最佳传输时间为1s,普通磁盘的传输数据为100M/S,所以设置为128M。根据磁盘的实际传输速度来设定。2.2 CombineTextInputFormat
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下 1、应用场景: CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。 2、虚拟存储切片最大值设置 CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m 3、切片机制 生成切片过程包括:虚拟存储过程和切片过程二部分。 (1)虚拟存储过程: 将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。 例如setMaxInputSplitSize值为4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为4.02M,如果按照4M逻辑划分,就会出现0.02M的小的虚拟存储文件,所以将剩余的4.02M文件切分成(2.01M和2.01M)两个文件。 (2)切片过程: (a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。 (b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。 (c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为: 1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M) 最终会形成3个切片,大小分别为: (1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M2.3 压缩方式
支持Split:bzip2、lzo 不支持Split:snappy、gzip 即使MapReduce的输入输出文件都是未压缩的文件,你仍然可以对Map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能2.5 数据倾斜
现象:查看日志,看到reduce跑到99%一直跑不动 原因:本质原因是key分布不均匀,某一个reduce被分发的数据过多,导致节点性能处理不了这么大的数据 SQL上:count(distinct column)、 笛卡尔积、join *** 作导致 业务上:业务数据本身的特性 解决方案: count(distinct)导致: 使用count + group by处理 order by ... 笛卡尔积:避免使用笛卡尔积,或者参数直接限定 join *** 作导致:: (1)大表join小表开启mapjoin (2)开启map端聚合,combiner (3)大表join大表:分区裁剪 + 字段做分桶;分桶的话,两张都要是分桶表,且对同一字段分桶,分桶个数要是倍数关系。CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS 表示按照 userid 来分桶,按照viewtime来进行桶内排序,分成多少个桶 (3)对join的key值做处理 a.某一key值数量过多:加盐处理(二次group by 局部汇总+ 全局汇总),参数也可以设置: b.key的null值很多:加随机数或者用union all ,null值单独处理+ 其他正常处理 a join b on case when a.user_id is null then concat(‘jd-hive’,rand() ) else a.user_id end = b.user_id; c.join的数据类型不一样:string join int : 做cast(col as string) d.hive.groupby.skewindata=true; hive.skewjoin.key=100000; hive.optimize.skewjoin=false;2.6优化 2.5.1 慢的原因
1.计算机性能 CPU、内存、磁盘、网络 2.I/O *** 作优化 (1)数据倾斜 (2)Map和Reduce个数不合理 (3)小文件过多 (4)大量不可分割的超大文件 (5)spill、meger次数过多2.5.2 优化方案
数据输入、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数 1.数据输入 (1)合并小文件:大量在小文件会启动多个Map,增大Map任务的load次数,比较耗时 (2)采用CombineTextInputFormat来输入,解决输入端大量小文件场景 2.Map阶段 (1)减少spill次数:增大触发spill的内存上限,减少spill次数,从而减少磁盘IO (2)减少meger次数:增大触发merge的文件数目 (3)不影响业务逻辑的前提下,先进行Combine处理 (4)增加环形缓存区内存大小 3.Reduce阶段 (1)合理设置Reduce个数:min(数据总量/设定每个reduce的处理数量(256M),每个job的reduce个数的最大个数);直接设置reduce个数。reduce不是越多越好:有多少reduce就有多个文件输出,小文件 (2)设置map和reduce共存,没有依赖关系的可以map和reduce,reduce可以先运行 (3)规避使用Reduce (4)根据内存情况,设置Reduce直接从内存中拉取数据,而不是从溢写到磁盘的数据拉取数据 4.IO传输 (1)使用数据压缩的方式,减少网路IO。Snappy、Lzo (2)使用SequenceFile二进制文件 5.数据倾斜 见上 6.hive优化 建表、语法、架构(常规) Hive建表方面: 1)分区表、 2)分通表、 3)选择合适的存储格式(TextFile、SequenceFile、ORCFile、ParquetFile)、 4)选择合适的压缩格式(snappy、gzip、bzip2、lzo)(压缩率、压缩解压速度、支持split) Hive语法方面: 5)列裁剪、 6)谓词下推、 7)分区裁剪、 8)map输入端合并小文件、 9)map输出端和reduce输出端文件合并、 10)合理设置maptask并行度(maptask过多输出过多小文件,创建和初始化map开销大;maptask过小,当输入文件很大时,maptask处理的很慢,并发度太小)、 11)合理设置reduceTask并行度(reducetask过多输出过多小文件,初始化和创建reduce开销大;reduce过小可能会出现倾斜问题或者内存溢出) 12)Join优化(mapjoin、多表关联尽量相同的连接字段、null值处理、开启倾斜优化参数、CBO优化、不得不笛卡尔积的规避方案) 13)map端聚合combiner 14)负载均衡hive.groupby.skewindata=true; 15)order by 优化(最好使用distribute by + sort by + reduce代替) 16)count(distinct cloumns) 使用group by + count代替 17)in/exists 使用left semi join代替 18)一次读取,多次插入 from ... insert into .. 19)启动中间结果压缩 Hive架构层面: 20)启用本地抓取(select * 、 limit 、 where partition = ''不走mr) 21)本地模型(数据量较小的时候没必要分布式模式计算) 22)jvm重用 对于小文件Job,可以开启JVM重用,一个Map运行在一个JVM上,开启重用的话,该Map在JVM上运行完毕后,JVM继续运行其他Map 23)推测执行(默认打开) 95%以上的task都已经完成了,但是少数的几个task进度很慢,开启推测执行后,会为这些任务开启一个备份任务,同时运行,谁先跑完,就采用谁的结果 24)并行执行:不相互依赖的stage可以并行进行 25)hive严格模型(分区表必须有where分区裁剪、order by必须有limit、限制笛卡尔积、分区表必须有静态分区列)2.7 如何设置maptask、reducetask个数
maptask
1.增加 MapTask个数 可以设置 mapred.map.tasks 为一个较大的值。这个值只有大于default_mapper_num才生效。设置经验为: Num = (DataNode_num * 0.95) * M(倍数),目的是为了合理利用整个集群 default_mapper_num = file_total_size / block_size 2.减少MapTask个数 如果输入端是大量小文件,可以参数设置合并小文件 设置 maperd.min.split.size 最小切片大小,调成大于默认block_size大小且是blocksize的整数倍即可。
reducetask
参数1:hive.exec.reducers.bytes.per.reducer (默认256M) 参数2:hive.exec.reducers.max (默认为1009) 参数3:mapreduce.job.reduces (默认值为-1,表示没有设置,那么就按照以上两个参数 进行设置) nums_reducetask = Math.min(参数2,总输入数据大小 / 参数1) 即默认nums = Math.min(1009,file_total_size / 256M)2.8 笛卡尔积的优化
小表 join 大表的笛卡尔积优化 原理:规避笛卡尔积 具体 *** 作:给两表添加一列为join key,将小表复制数倍。key可以设定为随机数,随机数的数量就是小表复制的倍数,每份数据的每一行数据key不同。大表每一行数据随机分配key。 这种情况join后,reduce数量就是随机数的数量,不再是一个reduce了。2.9 四种排序
1.order by:全局有序,一个reduce 2.sort by :单个reduce有序 3.cluster by :对同一字段分桶并排序,不能sort by 联用 4.distribute by + sort by:分桶 + 每个reduce结果有序2.10 UDF、UDAF、UDTF 的区别
UDF:单行进入,单行输出 自定义UDF:继承org.apache.hadoop.hive.ql.exec.UDF,实现evaluate函数 UDAF:多行进入,单行输出 UDTF:单行输入,多行输出2.11 行列转换
行转列
COLLECT_SET(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生array类型字段 concat_ws('|', collect_set(t1.name)
列转行
EXPLODE(col):将hive一列中复杂的array或者map结构拆分成多行。 LATERAL VIEW 用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias 解释:用于和split, explode等UDTF一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。 select movie, category_name from movie_info lateral view explode(category) table_tmp as category_name;2.12 开窗函数
常用 row_number() over(partition by ...order by ) rn -- 1 2 3 rank() over(partition by ... order by ) rk -- 1 1 3 dense_rank() over(partition by .. order by )rk -- 1 1 2 lag(column,n) over(partition by ..order by ) lg sum(column) over(partition by ..order by ) sm min.. max.. avg..3.Zookeeper
为分布式应用提供一致性服务的软件,是Hadoop和Hbase的重要组件。本质上是一个分布式文件系统, 适合存放小文件,也可以理解为一个数据库
3.1架构Zookeeper集群是一个基于主从架构的高可用集群。组员有: Client:发送请求 Leader:写 *** 作;一个Zookeeper集群同一时间只会有一个实际工作的Leader,它会发起并维护与各 Follwer及Observer间的心跳。所有的写 *** 作必须要通过Leader完成再由Leader将写 *** 作广 播给其它服务器。 Follower:一个Zookeeper集群可能同时存在多个Follower,它会响应Leader的心跳。 Follower可直接处理并返回客户端的读请求,同时会将写请求转发给Leader处理,并且负责在Leader处理写请求时对请求进行投票。 Observer:角色与Follower类似,但是无投票权。3.2 选举机制
Leader选举是保证分布式数据一致性的关键所在。当Zookeeper集群中的一台服务器出现以下 两种情况之一时,需要进入Leader选举。 (1)服务刚启动 (2)Leader挂掉 选举过程:过半机制 每个Server广播一个投票 每个Server处理投票 每个Server统计投票,超过一半便认为可以成为Leader 该Server广播自己为Leader,其他Server改变自己状态为Follower3.3 ZAB协议
保持数据的一致性 - 所有写 *** 作都必须通过leader完成 - leader先将 *** 作写入本地日志后再广播到follower节点 - 一旦leader节点无法工作,zookeeper可以通过选举机制选出一个新leader - 在选举新leader后,保证已经Commit的数据不会丢失,未被Commit的数据对客户端不可见,follower中大于leader的max_zxid的事务会被删除3.4 Watch机制
类似于数据库中的触发器,对某个Znode设置了Watcher后,Znode发生更新删除等 *** 作后,对应的Watcher会得到通知.只会被触发一次.3.5 脑裂
1.Zookeeper 集群节点为什么要部署成奇数 为了以最大容错服务器个数的条件下,能节省资源。比如,最大容错为2的情况下,对应的zookeeper服务数,奇数为5,而偶数为6,也就是6个zookeeper服务的情况下最多能宕掉2个服务,所以从节约资源的角度看,没必要部署6(偶数)个zookeeper服务节点。 2.脑裂 由于心跳超时(网络原因导致的)认为leader死了,但其实leader还存活着。由于假死会发起新的leader选举,选举出一个新的leader,但旧的leader网络又通了,导致出现了两个leader ,有的客户端连接到老的leader,而有的客户端则连接到新的leader。 实际上Zookeeper集群中是不会轻易出现脑裂问题的,原因在于过半机制(>一半而非>=一半)。4.Flume
组件:Source、Channel、Sink
4.1 数据丢失Flume数据不会丢失,数据传输自身有事务,Channel可以选择为FileChannel或者KafkaChannel4.2 拦截器
(1)静态拦截器:往采集到的Event的header里面添加k-v键值对,后序存放数据的时候可以提取出来kv值。 (2)时间拦截器、主机拦截器、正则过滤拦截器等 (3)自定义拦截器 a.实现interceptor接口 b.重写四个方法:initialize、intercept(event)、intercept(List4.3 Channel选择器)、close c.实现静态内部类 interceptor.Builder
-- Channel Selectors可以让不同的项目日志通过不同的Channel到不同的Sink中 Replicating Channel Selector(default):将source过来的events发送到所有channel Multiplexing Channel Selector:将source过来的events选择发送到哪些channel4.4 优化
1.通过配置datadir指向多个路径,不同路径指向不同的硬盘,提高文件通道的性能 2.checkpointDir和backCheckpointDir也尽量配置到不同的硬盘上,提高容错4.5 监控
自研,因为我们要做邮件告警和电话告警功能 Ganglia4.6 HDFS Sink小文件的处理
配置参数 hdfs.rollInterval 单位:秒 间隔多长时间将临时文件滚动成最终目标文件 hdfs.rollSize 单位:bytes 达到大小 hdfs.roundValue 时间上的舍弃 hdfs.roundUnit 时间上的舍弃的单位 hdfs.roundCount event的数量4.7 和KafaKa的比较
1.如果数据被多个系统消费的话,使用Kafka,如果数据是设计给hadoop、hbase使用,使用flume 2.flume没有副本机制,节点奔溃的话需要恢复磁盘才能恢复数据 3.flume可以使用拦截器实时处理数据,kafaka需要外部的流处理系统才能做到 4.flume采集日志是通过流的方式直接将日志收集到存储层,而kafka是将数据缓存在kafka集群,待后期可以采集到存储层5.Kafka
KafKa是一个分布式消息队列
为什么要使用Kafka?
(1)缓冲和削峰
上游数据时由流量剧增的情况,下游没有足够的机器来处理(如果以峰值来投入资源是巨大的浪费),kafka在中间可以起到一个缓冲的作用,把消息暂存在kafka中,下游服务器按照自己的节奏处理。
(2)解耦和扩展性
消息队列可以用为一个接口层,解耦重要的业务流程。只需要遵守约定,就可以独立的扩展两边的处理过程。
(3)异步通信
很多时候,用户不想也不需要立即处理消息。消息队列提供了异 步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多 少消息就放多少,然后在需要的时候再去处理它们。
(4)健壮性
消息队列可以堆积请求,所以消费端业务即使短时间死掉,也不会影响主要业务的正常进行。
5.1 架构Producer:采用推的方式,将数据发送都Broker Broker: Comsumer:采用拉的方式,将数据从Broker拉取数据并消费 Zookeeper:分布式协调系统,组件要向ZK注册 Topic: Kafka 中消息以主题为单位进行归类,每个主题都有一个 Topic Name,生产者根据 Topic Name 将消息发送到特定的 Topic,消费者则同样根据 Topic Name 从对应的 Topic 进行消费。 Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列 Consumer group: 每个消费组负责一套完整的数据,组中每个消费者消费的数据不重复5.2 分区 5.2.1 分区数量
1.数据分区内有序,写入什么顺序,消费就是什么顺序,这也是为什么kafka数据存储在磁盘,但是依然很快的原因,因为不用随机读写。 2.如果要保证一个topic下的数据有序,那么就是一个主题一个分区,分区数目在创建topic的时候设置 3.分区数并不是越大越好,分区数不要超过集群机器的数量。 4.分区数决定了每个消费组中并发消费者的数量,因为一个分区只能被同组一个消费者消费,当上游压力过大的时候,可以增加分区数或提高消费者消费能力 生产者默认按轮询规则发送消息到各个broker5.2.2 分区副本
副本数:<= broker个数,一般设置两个 副本主从: leader负责读和写,follower只负责冲leader拉取数据做副本,不做读写(保证数据的一致性) leadr宕机后,会从follower中选择一个当leader,且不会再创建一个新的副本(这会影响kafka的吞吐) 当leader和follower的数据差异超过4k条或者超过延迟时间,follower会从ISR(In-Sync Relipcas)列表中移除,存入OSR(Out-Sync Relipcas )5.2.3 分区分配策略
1.Range 范围分配 分区个数/消费者线程总数,除不尽的依次让前面的消费者线程多分一个分区 如:11/3 =3 分配 4 4 3 2.RoundRobin 轮询分配 将所有主题分区组成列表,然后将列表按照hashcode进行排序,轮询分配给消费者,没有订阅该主题的消费者就跳过 前提:同一个消费者里面所有消费者订阅的主题必须相同.每个主题的消费者实例是相同的 https://blog.csdn.net/qq_39907763/article/details/82697211 有partition按partition,有key按 key,都没有轮询5.3 KafKa监控
自研、KafKaManager5.4 数据丢失和重复消费 5.4.1 问题
1.什么时候修改offset 默认是数据处理完后再修改offset 2.什么时候出现丢失 消费者异步提交的时候,offset已提交,但是数据消费失败 3.什么时候出现重复 异步提交的时候,数据已经消费,但是offset提交失败了5.4.2 解决方案
数据不丢失
- 生产者生产数据不丢失
ACK确认机制,生产者发送数据到Broker,Broker需要返回ACK响应码 0: 发完即提交offset,不关心丢失与否 1:需要leader收到数据 2:需要leader和follower都收到数据 数据发送方式: 同步:发送一批数据,等待kafka返回结果 异步:将数据保存在生产者的Buffer中(2w条),满足数据大小阈值或者时间阈值就发送数据。如果满了还没有发送出去,有个选项可以配置是否清空buffer,可以设置为-1,永久阻塞。
- Broker数据不丢失
通过副本因子机制保证数据不丢失
- Consumer数据不丢失
手工提交offset,记录好offset即可保证数据不丢失 High/Low Level API High:0.8版本前由Zookeeper帮助管理offset,相对安全,缺点是每次消费者都要去zookeeper获取offset Low:由Kafka创建一个Topic维护offset,控制灵活,如跳读,重读,精确读取.缺点:维护成本高
kafka将数据写到磁盘,一般是不会丢失的。但是在重启kafka过程中,如果有消费者消费消息,可能会导致数据的不准确(丢失或者从重复消费)
5.5 存储机制1.存储 一个topic有多个分区文件,按照topic_name-partition_num命名. 如果Topic-0.文件下有多个segment,segment由index和log文件组成. 索引文件的命名为分区的最大offset值,存储了数据的偏移量;索引文件没有为每条消息都创建索引,而是采用的稀疏存储,节省空间,从而可以将索引文件放在内存中.缺点是没有建立索引的消息数据需要做第二次的顺序扫描 log文件存储是数据文件,最大值为1G,超过了1G则创建第二个segment; 2.清除 默认删除超过7天的数据 3.压缩 将k-v中k相同的数据合并,保留最新的那一条数据,丢弃旧数据5.6 宕机怎么办
kafka的副本冗余机制保证了如果只是某个节点宕机是不影响运行。kafka会从ISR队列中选举出一个新的leader出来服务。 如果超过一半节点都宕机了,那kafka会变得不可用。需要通过日志分析来查看宕机原因,从而解决问题,恢复节点5.7 Kafka为什么不支持读写分离
在Kafka中,生产者写入消息,消费者读取消息都是与Leader进行交互的,从而实现主写主读的生产消费模型。 主写从读的问题: (1)数据一致性问题 (2)数据延迟问题5.8 Kafka 的数据 offset 读取流程
从_consumer_offsets主题中读取5.9 KafKa消费能力不足
1.增加topic分区数,同时增加消费组内消费者数量,消费者数量 = 分区数,两者缺一不可 2.提交每批次拉取的数量5.10 Pulsar
Pulsar流式处理和队列的合体,最主要的优点就是plusar扩容,增加节点时比kafka更快更方便,且数据处理速度上也比kafka更上一层楼。6.Hbase
架构
Hbase是主从架构 主节点是HMaster,从节点是HRegionServer HRegionServer= 1个HLog + N个region region = 1个MemStore + N个StoreFile
读写
读: 1.Client连接到ZK,获取元数据信息的RegionServer地址 2.连接RegionServer,获取查询rowkey所在的region信息 3.连接Region,找到对应的Store 4.首先判断在不知memoryStore中,再通过布尔过滤器判断rowkey所在的StoreFile中 5.将数据返回 写: 1.Client连接ZK,获取元数据的RegionServer地址 2.Client连接RegionServer,获取插入rowkey的所在region 3.Client连接Region,找到对应的Store,现将数据写到Hlog中,再往MemoryStore中写 4.达到MemoryStore阈值(64、 128)后,则将数据溢写到StoreFile 5.StoreFile过多时,就会合并这里StoreFile,并对过期数据删除,数据以HFile的格式存在HDFS上 6.当Region超过10G的时候,Region会被切分。新Region被HMaster分配到新的RegionServer下。
Rowkey设计原则
1.散列原则 如果是按照时间戳方式的递增,那么建议高位作为散列字段(将数据均衡分布在每个RegionServer中,防止热点问题),低位放时间字段 2.唯一原则 字典排序 3.长度原则 建议不超过16个字节.减少持久化到HFile占用的空间;减少MEMStore内存的占用
Rowkey如何设计
-- 防止热点问题 原因:rowkey是按照字典排序,这样有利于scan *** 作,但这也是热点问题的源头.当大量的Client的集中访问集群中一个Region,超出了Region所在单个机器的承受能力,会引起性能下降甚至不可用 -- 解决方案 - 加盐 :rowkey前生产随机数 - 时间戳反转 :把时间戳反转添加到key的末尾,可以快速获取数据的最新版本 - 哈希 :使同一行永远用一个前缀加盐。哈希也可以使负载分散到整个集群,但是读却是可以预测的7.Sqoop
本质还是将命令翻译成MR来实现,没有Reduce。在翻译出的 mapreduce 中主要是对 inputformat 和 outputformat 进行定制。 从RDBMS到Hive ods使用的是增量合并、全量同步两种方式8.Spark
Spark为什么比MR快
1.Spark将中间结果存储在内存中,内存不够才会存储在磁盘上 2.Spark的task是线程,MR的task是进程,进程创建和销毁开销比较大 3.DAG有向无环图 4.RDD lineage血缘9.Flink
Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。
Flink基于流的世界观,一切都是由流组成的,离线数据是有界的流;实时数据是无界的流;
特点
(1)支持事件时间和处理时间语义
(2)精确一次的状态一致性保证
(3)毫秒级低延迟
VS SparkStream
本质上的区别:flink是流计算,sparksteam是微批次计算
架构及作业提交流程
架构: 组件主要是资源管理器、任务管理器、作业管理器、分发器 提交流程: 1.客户端提交应用 2.分发器启动并提交应用 3.任务管理器向资源管理器请求资源 4.资源管理器启动作业管理器,作业管理器提供资源给任务 5.执行任务
窗口
滚动窗口、滑动窗口、会话窗口
时间语义
事件时间、摄入事假、处理时间10.数仓建模及分层 10.1 数仓建模
维度建模
维度建模又分为雪花模型和星型模型: 星型模型和雪花模型主要区别就是对维度表的拆分,对于雪花模型,维度表的涉及更加规范,一般符合3NF;而星型模型,一般采用降维的 *** 作,利用冗余来避免模型过于复杂,提高易用性和分析效率,性能高一些。 维度建模将数仓中的表分为事实表和维度表 设计步骤: 1)确认业务过程,强业务参与。主题层>数据域>业务过程L0 L1 L2 2)识别事实,将事实归纳到对应的业务过程 3)选择粒度 4)识别维度 5)输出总线矩阵文档 6)根据以上信息+其他分层规范 建模
ER实体建模
将事物抽象成实体、属性、关系来建模,常用于OLTP数据库建模
dataValue建模
Data Vault是在ER模型的基础上衍生而来,Data Vault模型是一种中心辐射式模型。基本机构是:中心表、连接表、卫星表。 中心表:实体主键 链接表:主外键 卫星表:实体属性
Anchor建模
Anchor是对Data Vault模型做了更近一步的规范会处理,达到了6NF的标准。由于过度规范化,使用中牵涉到太多的join *** 作,很少用。10.2 数仓分层
1.ods:贴源层 2.dwd:事实明细层,事实表和维度表 3.dws:轻度汇总层,主题宽表 4.sub:细粒度指标层,纵表偏多.分区规范:统计日期(day,month,year)、统计频次(当期、月累计、季累计、半年累计、年累计) 5.agg:维度下钻指标层,指标宽表 6.app:应用层 sqoop》mysql/oracled/pg、kylin、druid11.数据中台 11.1 概念
数据中台是指通过数据技术对海量数据进行采集、计算、存储,同时统一标准和口径,形成全域级、可复用的数据资产中心和数据存储能力中心,形成大数据资产层,进而为客户提供高效的服务。
狭义上的数据中台是一套实现数据资产化的工具,广义上的数据中台是一套利用数据帮助企业实现数字化转型的机制和方法。
数据中台的核心理念是让一切业务数据化,一切数据业务化,数据取之业务,用之于业务。
11.2 问题及中台的解决方案问题
(1)指标口径不一致:业务口径不一致、数据源不一致、加工逻辑不一致
(2)数据重复建设:烟囱式开发
(3)取数效率低:表过多,无法快速找到要的数据
(4)数据质量差:没有稽核任务,数据可靠性低
(5)数据成本线性增加:上述问题导致
解决方案
方法论给你统一数据入口和出口,即OneData、OneService
具体 *** 作:
(1)构建指标资产管理平台:消除指标二义性,避免重复建设
(2)构建数据地图:提交取数效率
(3)构建数据质量监控平台:确保指标数据准确性
(4)构建元数据管理中心:血缘关系,价值链等
(5)构建自助分析可视化平台:可视化+自助分析
(6)统一取数方式:数据应该通过API接口的方式被访问,把数据和应用链接起来,也方便横向扩展
体系
(1)计算和存储基础设施:hadoop
(2)工具产品:bdata、linkdo等
(3)数据治理模:元数据中心、数据地图、数仓设计、数据质量、指标系统、成本优化
(4)数据服务:向下提供了应用和表的访问关系,统一数据访问API
(5)数据产品:大屏报表、自助分析平台、标签工厂等
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)