SparkStreaming(java)读取Kafka(kerberos)写入Hbase(kerberos)

SparkStreaming(java)读取Kafka(kerberos)写入Hbase(kerberos),第1张

SparkStreaming(java)读取Kafka(kerberos)写入Hbase(kerberos)
  • 概述
  • 开始

概述

你好!我们这次的目标是使用SparkStreaming(java)读取kafka中的数据,写入Hbase,关于向Kafka内写数据的知识请参考:

开始
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;

import java.io.IOException;
import java.util.*;

public class Temp {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.registerKryoClasses((Class<?>[]) Collections.singletonList(ConsumerRecord.class).toArray());
        SparkSession spark = SparkSession.builder().
                appName("DevelopSpark")
                .master("local[*]")
                .config(sparkConf)
                .enableHiveSupport()
                .getOrCreate();

        JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());

        JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkContext, Durations.seconds(5));
        String bootstrapServer = "192.168.165.63:9092,192.168.165.61:9092,192.168.165.62:9092";
        //zookeeper hostName
        String zkQuorum4 = "zk-56,zk-66,zk-67";
        // 此处需要topic名称
        List<String> topics = Arrays.asList("[topicName1]","[topicName2]");
        String keytabPath = "/Users/keytabs/user.keytab";
        // 准备kafka所需参数
        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers",bootstrapServer);
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id","1");
        kafkaParams.put("auto.offset.reset","latest");
        kafkaParams.put("security.protocol","SASL_PLAINTEXT");
        kafkaParams.put("sasl.kerberos.service.name","henghe");
        kafkaParams.put("sasl.mechanism","GSSAPI");
        kafkaParams.put("sasl.jaas.config","com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"" + keytabPath + "\" principal=\"user@USER.COM\";");
        kafkaParams.put("enable.auto.commit",false);
        //读kafka
        JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(javaStreamingContext,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topics, kafkaParams));

        //准备hbase所需参数
        Configuration config = HBaseConfiguration.create();
        config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000);
        config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000);

        config.set("hbase.zookeeper.quorum", zkQuorum4);
        config.set("zookeeper.znode.parent", "/hbase1");
        config.set("hadoop.security.authentication", "kerberos");
        config.set("hbase.security.authentication", "kerberos");
        config.set("hbase.master.kerberos.principal", "henghe/_HOST@HENGHE.COM");
        config.set("hbase.thrift.kerberos.principal", "henghe/_HOST@HENGHE.COM");
        config.set("hbase.regionserver.kerberos.principal", "henghe/_HOST@HENGHE.COM");
        config.set("hbase.client.keytab.principal", "henghe@HENGHE.COM");
        config.set("hbase.client.userprovider.class", "org.apache.hadoop.hbase.security.UserProvider");
        config.set("hbase.client.keytab.file", keytabPath);
        // 此处需要填写表名
        config.set(TableOutputFormat.OUTPUT_TABLE,"[tableName]");
        JobConf jobConf = new JobConf(config);
        Job job = null;
        try {
            job = Job.getInstance(jobConf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        job.setOutputFormatClass(org.apache.hadoop.hbase.mapreduce.TableOutputFormat.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);

        Job finalJob = job;
        // 写hbase
        stream.foreachRDD(rdd -> {
            if (rdd.count()>0){
                JavaRDD<Put> putJavaRDD = rdd.map(cr -> {
                    String mes = cr.value();
                    //  此处需要rowkey
                    Put put = new Put(Bytes.toBytes(mes));
                    // 此处分别需要 列族名,列名,和数据
                    put.addColumn(Bytes.toBytes("[familyName]"), Bytes.toBytes("[colName]"), Bytes.toBytes(mes));
                    return put;
                });
                JavaPairRDD<ImmutableBytesWritable, Put> javaPairRDD = putJavaRDD
                        .mapToPair((PairFunction<Put, ImmutableBytesWritable, Put>) p -> new Tuple2<>(new ImmutableBytesWritable(), p));
                javaPairRDD.saveAsNewAPIHadoopDataset(finalJob.getConfiguration());
                System.out.println("ok......");
            }
        });
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/787029.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-05
下一篇 2022-05-05

发表评论

登录后才能评论

评论列表(0条)

保存