- 概述
- 开始
你好!我们这次的目标是使用SparkStreaming(java)读取kafka中的数据,写入Hbase,关于向Kafka内写数据的知识请参考:
开始import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.io.IOException;
import java.util.*;
public class Temp {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
sparkConf.registerKryoClasses((Class<?>[]) Collections.singletonList(ConsumerRecord.class).toArray());
SparkSession spark = SparkSession.builder().
appName("DevelopSpark")
.master("local[*]")
.config(sparkConf)
.enableHiveSupport()
.getOrCreate();
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkContext, Durations.seconds(5));
String bootstrapServer = "192.168.165.63:9092,192.168.165.61:9092,192.168.165.62:9092";
//zookeeper hostName
String zkQuorum4 = "zk-56,zk-66,zk-67";
// 此处需要topic名称
List<String> topics = Arrays.asList("[topicName1]","[topicName2]");
String keytabPath = "/Users/keytabs/user.keytab";
// 准备kafka所需参数
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers",bootstrapServer);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id","1");
kafkaParams.put("auto.offset.reset","latest");
kafkaParams.put("security.protocol","SASL_PLAINTEXT");
kafkaParams.put("sasl.kerberos.service.name","henghe");
kafkaParams.put("sasl.mechanism","GSSAPI");
kafkaParams.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"" + keytabPath + "\" principal=\"user@USER.COM\";");
kafkaParams.put("enable.auto.commit",false);
//读kafka
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(javaStreamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
//准备hbase所需参数
Configuration config = HBaseConfiguration.create();
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000);
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000);
config.set("hbase.zookeeper.quorum", zkQuorum4);
config.set("zookeeper.znode.parent", "/hbase1");
config.set("hadoop.security.authentication", "kerberos");
config.set("hbase.security.authentication", "kerberos");
config.set("hbase.master.kerberos.principal", "henghe/_HOST@HENGHE.COM");
config.set("hbase.thrift.kerberos.principal", "henghe/_HOST@HENGHE.COM");
config.set("hbase.regionserver.kerberos.principal", "henghe/_HOST@HENGHE.COM");
config.set("hbase.client.keytab.principal", "henghe@HENGHE.COM");
config.set("hbase.client.userprovider.class", "org.apache.hadoop.hbase.security.UserProvider");
config.set("hbase.client.keytab.file", keytabPath);
// 此处需要填写表名
config.set(TableOutputFormat.OUTPUT_TABLE,"[tableName]");
JobConf jobConf = new JobConf(config);
Job job = null;
try {
job = Job.getInstance(jobConf);
} catch (IOException e) {
e.printStackTrace();
}
job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);
Job finalJob = job;
// 写hbase
stream.foreachRDD(rdd -> {
if (rdd.count()>0){
JavaRDD<Put> putJavaRDD = rdd.map(cr -> {
String mes = cr.value();
// 此处需要rowkey
Put put = new Put(Bytes.toBytes(mes));
// 此处分别需要 列族名,列名,和数据
put.addColumn(Bytes.toBytes("[familyName]"), Bytes.toBytes("[colName]"), Bytes.toBytes(mes));
return put;
});
JavaPairRDD<ImmutableBytesWritable, Put> javaPairRDD = putJavaRDD
.mapToPair((PairFunction<Put, ImmutableBytesWritable, Put>) p -> new Tuple2<>(new ImmutableBytesWritable(), p));
javaPairRDD.saveAsNewAPIHadoopDataset(finalJob.getConfiguration());
System.out.println("ok......");
}
});
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)