Spark从hbase 并发取数据,并用spark-shell提交

Spark从hbase 并发取数据,并用spark-shell提交,第1张

Spark从hbase 并发取数据,并用spark-shell提交
import org.apache.hadoop.hbase.HbaseConfiguration
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.{ FilterList, SingleColumnValueFilter}

System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

var ss: SparkSession = null
ss = SparkSession.builder().enableHiveSupport().getOrCreate()
val sc = ss.sparkContext

val hbaseColumns = Array("c:tt", "c:url")

val queryColumns = hbaseColumns.map(c => c).mkString(" ")

val hbaseConfiguration = HbaseConfiguration.create()
hbaseConfiguration.set(TableInputFormat.INPUT_TABLE, "bidword_ad_pps_daily") //hbase 中的表
hbaseConfiguration.set(TableInputFormat.SCAN_COLUMNS, queryColumns)
//添加过滤条件
val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE)
val scan = new Scan()
val filter1=new SingleColumnValueFilter("c".getBytes,"campaign_source".getBytes,
  CompareOp.EQUAL,Bytes.toBytes("PPS-SHOPPING"))
val filter2=new SingleColumnValueFilter("c".getBytes,"campaign_source".getBytes,
  CompareOp.EQUAL,Bytes.toBytes("PPS-APP"))
filterList.addFilter(filter1)
filterList.addFilter(filter2)
scan.setFilter(filterList)

hbaseConfiguration.set(TableInputFormat.SCAN,base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))

val sqlContext = ss.sqlContext
import sqlContext.implicits._

val hbaseRDD = sc.newAPIHadoopRDD(hbaseConfiguration, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).repartition(40)

val dataRDD = hbaseRDD.map({ case (_, result) =>
    val item_id = Bytes.toString(result.getRow) 
    val url = Bytes.toString(result.getValue(Bytes.toBytes("c"), Bytes.toBytes("url")))          
    val title = Bytes.toString(result.getValue(Bytes.toBytes("c"), Bytes.toBytes("tt")))
    (item_id,url,title )
})


dataRDD.toDF().show(20)

spark-shell 
--name "fetch_hbase_test" 
--master yarn-client 
--num-executors 4 
--executor-cores 2 
--executor-memory 3G 
--driver-memory 5G 
--conf spark.driver.maxResultSize=10g 
--conf spark.yarn.executor.memoryOverhead=10000 
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" 
--conf spark.shuffle.memoryFraction=0.3 
--conf spark.sql.shuffle.partitions=1000 
--conf spark.default.parallelism=1000 
--jars $path/hbase-client-1.0.2.jar 
--conf spark.hbase.obtainToken.enabled=true 
--conf spark.yarn.security.credentials.hbase.enabled=true 
--files $path/conf/hdfs-site.xml

通常引用hbase-client-1.0.2.jar就可以了,但有些jar包不知啥问题,引用以下几个才行:

--jars $path/spark-hbase_2.11-2.3.2.jar 
--jars $path/hbase-hadoop-compat-1.3.1.jar 
--jars $path/hbase-common-1.3.1.jar 
--jars $path/hbase-client-1.3.1-6407.jar 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5575931.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存