1.利用spark提供的 newAPIHadoopRDD api 对hbase进行读写
2.SparkOnHbase,这种方式其实是利用Cloudera-labs开旦山源的一个HbaseContext的工具类来支持spark用RDD的方式批量读写hbase
hbase 表格式如下:
部分数据集如下:
文改迟塌中的spark 的版本为2.3.2,hbase 的版本为1.2.6
因为hbase数据集的数据都是序列化的,所以spark 默认读取Hbase的数据时会报数据序列化的错误,不管哪种方式,在读取hbase数据之前,为spark配置序列化方式,如图所示:
主要是利用TableInputFormat,TableOutPutFormat的方式对hbase进行读写。
下边是对hbase进行读
运行结果如图:
通过maven 将hbase-spark jar 报 导入
由于hbase-spark 运用的spark 版本为1.6 而实际的spark 版本为2.3.2,所以执行spark 任务会报 没有 org.apache.spark.logging 类没有定义,这是因为 spark 2.3.2 这个类名已经改变,因此需要重新构造这个类并打成jar包放入到spark 的jar目录里即可
以下为读方式:
sparkOnHbase 对于第一种方式的优势在于:
1>无缝的使用Hbase connection
2>和Kerberos无缝集成
3>通过get或者scan直接核圆生成rdd
4>利用RDD支持hbase的任何组合 *** 作
5>为通用 *** 作提供简单的方法,同时通过API允许不受限制的未知高级 *** 作
6>支持java和scala
7>为spark和 spark streaming提供相似的API
如何使用scala+spark读写Hbase软件版本如下:
scala2.11.8
spark2.1.0
hbase1.2.0
公司有一些实时数据处理的项目,存储用的是hbase,提供实时的检索,当然hbase里面存储的数据模型都是简单的,复杂的多维检索的结果是在es里面存储的,公司也正在引入Kylin作为OLAP的数据分析引擎,这块后续有空在研究下。
接着上面说的,hbase存储着一些实时的数据,前两周新需求需要对hbase里面指定表的数据做一次全量的update以满足业务的发展,平时 *** 作hbase都是单条的curd,或者插入一个批量的list,用的都是hbase的java api比较简单,但这次涉及全量update,所以如果再用原来那种单线程的 *** 作api,势必速度回慢上许多。
关于批量 *** 作Hbase,一般我们都会用MapReduce来 *** 作,这样可以大大加快处理效率,原来也写过MR *** 作Hbase,过程比较繁琐,最近一直在余早用scala做spark的相关开发,所以就直接使用scala+spark来搞定这件事了,当然底层用的还是Hbase的TableOutputFormat和TableOutputFormat这个和MR是一芦行样的,在spark里面把从hbase里面读取的数据集转成rdd了,然后做一些简单的过滤,转化,最终在把结果写入到hbase里面。
整个流程如下:
(1)全量读取hbase表的数据
(2)做一系列的ETL
(3)把全量数据再写回hbase
核心代码如下:
//获取conf
val conf=HBaseConfiguration.create() //设置读取的表
conf.set(TableInputFormat.INPUT_TABLE,tableName) //设置写入的表
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)//创建sparkConf
val sparkConf=new SparkConf() //设置spark的任务名
sparkConf.setAppName("read and write for hbase ") //创建spark上下文
val sc=new SparkContext(sparkConf)
//为job指定输出格式和输出表名
val newAPIJobConfiguration1 = Job.getInstance(conf)
newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName)
newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
//全量读取hbase表
val rdd=sc.newAPIHadoopRDD(conf,classOf[TableInputFormat]
,classOf[ImmutableBytesWritable]
,classOf[Result]
)
//过滤空数据,然后对每一个记录做更新,并转换成写入的格式
val final_rdd= rdd.filter(checkNotEmptyKs).map(forDatas)
//转换后的结果,再次竖哗雀做过滤
val save_rdd=final_rdd.filter(checkNull)
//最终在写回hbase表
save_rdd.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration)
sc.stop()
从上面的代码可以看出来,使用spark+scala *** 作hbase是非常简单的。下面我们看一下,中间用到的几个自定义函数:
第一个:checkNotEmptyKs
作用:过滤掉空列簇的数据
def checkNotEmptyKs(f:((ImmutableBytesWritable,Result))):Boolean={val r=f._2val rowkey=Bytes.toString(r.getRow)val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(Bytes.toBytes("ks")).asScalaif(map.isEmpty) false else true
}
第二个:forDatas
作用:读取每一条数据,做update后,在转化成写入 *** 作
def forDatas(f: (ImmutableBytesWritable,Result)): (ImmutableBytesWritable,Put)={ val r=f._2 //获取Result
val put:Put=new Put(r.getRow) //声明put
val ks=Bytes.toBytes("ks") //读取指定列簇
val map:scala.collection.mutable.Map[Array[Byte],Array[Byte]]= r.getFamilyMap(ks).asScala
map.foreach(kv=>{//遍历每一个rowkey下面的指定列簇的每一列的数据做转化
val kid= Bytes.toString(kv._1)//知识点id
var value=Bytes.toString(kv._2)//知识点的value值
value="修改后的value"
put.addColumn(ks,kv._1,Bytes.toBytes(value)) //放入put对象
}
)if(put.isEmpty) null else (new ImmutableBytesWritable(),put)
}
第三个:checkNull 作用:过滤最终结果里面的null数据
def checkNull(f:((ImmutableBytesWritable,Put))):Boolean={if(f==null) false else true
}
上面就是整个处理的逻辑了,需要注意的是对hbase里面的无效数据作过滤,跳过无效数据即可,逻辑是比较简单的,代码量也比较少。
除了上面的方式,还有一些开源的框架,也封装了相关的处理逻辑,使得spark *** 作hbase变得更简洁,有兴趣的朋友可以了解下,github链接如下:
Date:2020/12/22Version:Spark 3.0java 1.8.0_221Hbase 1.3.6Scala 2.12.11
1、首先是pom.xml,注释了一些东西,比如 不用 添加hbase-client和hbase-server,java中写MapReduce *** 作hbase需要这两个,scala写spark *** 作hbase不需要这两个,程序跑不起来,sc无法创建历悔拿。
2、将hbase的lib中的以下jar文件添前蔽加进来。(to IDEA小白 :可以新建一个文件夹保存这些jar文件,在IDEA中添加一个java的library指向这个文件夹)
3、将hbase中的配置文件hbase-site.xml添加到项目中的resources文件夹中
4、测试肢搭spark连接hbase
运行后的结果:
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)