import org.apache.hadoop.hbase.HbaseConfiguration import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.sql.SparkSession import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.filter.{ FilterList, SingleColumnValueFilter} System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer") var ss: SparkSession = null ss = SparkSession.builder().enableHiveSupport().getOrCreate() val sc = ss.sparkContext val hbaseColumns = Array("c:tt", "c:url") val queryColumns = hbaseColumns.map(c => c).mkString(" ") val hbaseConfiguration = HbaseConfiguration.create() hbaseConfiguration.set(TableInputFormat.INPUT_TABLE, "bidword_ad_pps_daily") //hbase 中的表 hbaseConfiguration.set(TableInputFormat.SCAN_COLUMNS, queryColumns) //添加过滤条件 val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE) val scan = new Scan() val filter1=new SingleColumnValueFilter("c".getBytes,"campaign_source".getBytes, CompareOp.EQUAL,Bytes.toBytes("PPS-SHOPPING")) val filter2=new SingleColumnValueFilter("c".getBytes,"campaign_source".getBytes, CompareOp.EQUAL,Bytes.toBytes("PPS-APP")) filterList.addFilter(filter1) filterList.addFilter(filter2) scan.setFilter(filterList) hbaseConfiguration.set(TableInputFormat.SCAN,base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray)) val sqlContext = ss.sqlContext import sqlContext.implicits._ val hbaseRDD = sc.newAPIHadoopRDD(hbaseConfiguration, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]).repartition(40) val dataRDD = hbaseRDD.map({ case (_, result) => val item_id = Bytes.toString(result.getRow) val url = Bytes.toString(result.getValue(Bytes.toBytes("c"), Bytes.toBytes("url"))) val title = Bytes.toString(result.getValue(Bytes.toBytes("c"), Bytes.toBytes("tt"))) (item_id,url,title ) }) dataRDD.toDF().show(20)
spark-shell --name "fetch_hbase_test" --master yarn-client --num-executors 4 --executor-cores 2 --executor-memory 3G --driver-memory 5G --conf spark.driver.maxResultSize=10g --conf spark.yarn.executor.memoryOverhead=10000 --conf spark.serializer="org.apache.spark.serializer.KryoSerializer" --conf spark.shuffle.memoryFraction=0.3 --conf spark.sql.shuffle.partitions=1000 --conf spark.default.parallelism=1000 --jars $path/hbase-client-1.0.2.jar --conf spark.hbase.obtainToken.enabled=true --conf spark.yarn.security.credentials.hbase.enabled=true --files $path/conf/hdfs-site.xml
通常引用hbase-client-1.0.2.jar就可以了,但有些jar包不知啥问题,引用以下几个才行:
--jars $path/spark-hbase_2.11-2.3.2.jar --jars $path/hbase-hadoop-compat-1.3.1.jar --jars $path/hbase-common-1.3.1.jar --jars $path/hbase-client-1.3.1-6407.jar
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)