spark 2.3读写Hbase

spark 2.3读写Hbase,第1张

spark 2.3读写Hbase

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录
  • 1. 简述
  • 1.官方支持
  • 2.SHC使用
    • 2.1. 下载源码、编译、上传
    • 2.2. 项目中引用
    • 2.3. 项目实战使用
      • 2.3.1.写入Hbase
      • 2.3.2. 读取Hbase


1. 简述

Spark 读写 Hbase 的官方参考资料较少,当前 Spark On Hbase 的驱动主要有两种实现,官方驱动和hortonworks的shc。

官方的也是基于hortonworks的进行开发的,而且好像对spark的Dataframe支持不是很友好,而且对Hbase的版本支持的也很低。所以本文主要针对hortonworks的shc的进行描述。

hortonworks 贡献的驱动,与官方驱动相比,又加了一些封装,使得使用方面与其他内置驱动(jdbc 等)基本相同,对 Dataframe 的支持较为友好。


1.官方支持

官方的可参考官网文档
https://hbase.apache.org/book.html#_bulk_load,其中对Hbase使用spark的部分描述的较为详细。

2.SHC使用

由于这个插件是hortonworks提供的,maven的中央仓库并没有直接可下载的版本。需要用户下载源码自己编译打包,如果有maven私库,可以上传到自己的maven私库里面。具体的步骤可以参考如下:

2.1. 下载源码、编译、上传

去官网github下载对应版本的源码:https://github.com/hortonworks-spark/shc。

下载完成后,点击旁边的maven插件deploy发布工程,如果只想打成jar包,那就直接install就可以了,然后在本地的maven仓库中就可以看到生成的对应的jar包了。

2.2. 项目中引用

在pom.xml中引入:


    com.hortonworks
    shc-core
    1.1.2-2.2-s_2.11-SNAPSHOT

2.3. 项目实战使用 2.3.1.写入Hbase
object Application {
	def main(args: Array[String]): Unit = {
		val spark = SparkSession.builder().master("local").appName("normal").getOrCreate()
	    spark.sparkContext.setLogLevel("warn")
		val data = (0 to 255).map { i =>  HbaseRecord(i, "extra")}

	    val df:Dataframe = spark.createDataframe(data)
	    df.write
	      .mode(SaveMode.Overwrite)
	      .options(Map(HbaseTableCatalog.tableCatalog -> catalog))
	      .format("org.apache.spark.sql.execution.datasources.hbase")
	      .save()
	}
	def catalog = s"""{
                   |"table":{"namespace":"rec", "name":"user_rec"},
                   |"rowkey":"key",
                   |"columns":{
                   |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                   |"col1":{"cf":"t", "col":"col1", "type":"boolean"},
                   |"col2":{"cf":"t", "col":"col2", "type":"double"},
                   |"col3":{"cf":"t", "col":"col3", "type":"float"},
                   |"col4":{"cf":"t", "col":"col4", "type":"int"},
                   |"col5":{"cf":"t", "col":"col5", "type":"bigint"},
                   |"col6":{"cf":"t", "col":"col6", "type":"smallint"},
                   |"col7":{"cf":"t", "col":"col7", "type":"string"},
                   |"col8":{"cf":"t", "col":"col8", "type":"tinyint"}
                   |}
                   |}""".stripMargin
}
case class HbaseRecord(
                  col0: String,
                  col1: Boolean,
                  col2: Double,
                  col3: Float,
                  col4: Int,
                  col5: Long,
                  col6: Short,
                  col7: String,
                  col8: Byte)

object HbaseRecord
{
  def apply(i: Int, t: String): HbaseRecord = {
    val s = s"""row${"%03d".format(i)}"""
    HbaseRecord(s,
      i % 2 == 0,
      i.toDouble,
      i.toFloat,
      i,
      i.toLong,
      i.toShort,
      s"String$i: $t",
      i.toByte)
  }
}
2.3.2. 读取Hbase
    Map map = new HashMap();
    map.put(HbaseTableCatalog.tableCatalog(), catalog);
    map.put(HbaseTableCatalog.newTable(), "5");
	Dataset df = spark.read().options(map).format("org.apache.spark.sql.execution.datasources.hbase").load()

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5676370.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存