项目最后更新时间为2016年,其中使用了hive streaming的api
项目中主要使用canal工具读取mysql日志,主要定义了处理binlog日志的逻辑,然后将处理完的数据打入kafka中供spark streaming进行消费
项目中的主要spark逻辑定义在了如下的函数中
项目主要使用了spark streaming做数据处理,设置15s为一个周期,取出kafka中的数据,然后做数据清洗和处理,然后调用DStreamtoHiveBatchUtilLoan将数据更新到hive中
在更新到hive的过程中,针对Hive中没有主键的概念,Hive使用事务Id+批次Id来唯一标识一条记录,对Hive中每条记录的增删改 *** 作,Hive都会分配唯一的事务Id+桶Id+批次Id。 当发生增删改 *** 作时,需要根据Mysql表的主键,查找到Hive表中对应的记录,即查找到对应的事务Id+批次Id。
问题:
当对Hive进行插入数据 *** 作之后,需要存储事务Id+桶Id+批次Id,Hive原生的方法是将此组合Id存储在Hive表中,这样会造成查找时候的效率低下,严重影响增删改 *** 作的性能。
解决方案:
将Hive的事务Id+桶Id+批次Id存储在Hbase中,Hbase支持实时查询,这样可以大幅减少查找的时间。
使用HCatalog Streaming Mutation API向hive2中进行实时增删改数据.
1.创建hive和hbase的连接
2.解析数据,匹配insert *** 作,调用hive的insert *** 作,并将存储 数据ID:事务Id+桶Id+批次Id 数据put到hbase
3.匹配update *** 作,在hbase中取出数据ID的数据,并取出批次Id进行解析,然后添加到updateArray中
4.insert *** 作和update *** 作需要开启两个完全独立的Transaction和Coordinator,update *** 作在所有insert *** 作完成之后进行,hive对所有数据进行更新
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)