导入依赖
2、从kafka读取数据org.scala-lang scala-library2.11.0 org.apache.spark spark-streaming_2.112.3.2 org.apache.spark spark-streaming-flume_2.112.3.2 com.alibaba fastjson1.2.76 org.apache.spark spark-core_2.112.3.2 org.apache.spark spark-streaming-kafka-0-10_2.112.3.2 org.apache.spark spark-sql_2.112.3.2 org.apache.spark spark-sql-kafka-0-10_2.112.3.2 redis.clients jedis2.9.0 org.apache.commons commons-lang33.3.2 com.alibaba fastjson1.2.76 org.apache.kafka kafka_2.112.3.0 org.apache.kafka kafka-clients2.1.0
注意:
1、structured streaming 读取kafka数据是不需要设置group id的。
2、df.selectExpr()中还可以选择kafka的key等
代码如下
val conf = new SparkConf().setMaster("local[*]") val spark = SparkSession .builder() .config(conf) .appName(getClass.getName) .getOrCreate() import spark.implicits._ val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", PropertiesUtils.loadProperties("kafka.broker.list")) .option("subscribe", "cache_tmp")// .option("startingOffsets", "earliest") .load() val re = df.selectExpr("CAST(value AS STRING) ")// .as[(String)]3、写入kafka
注意:
1、写入kafka需要做checkpoint(),checkpoint 中保存着偏移量
2、目的 kafka 即使不存在 也会自己创建。
代码如下
val query = re .writeStream .format("kafka") .option("kafka.bootstrap.servers",PropertiesUtils.loadProperties("kafka.broker.list")) .option("topic","tmp_t") .option("checkpointLocation","E:/t_check_2")//生产环境中要放在hdfs哟。 .start() query.awaitTermination()
这里的checkpoint的目录保存这消费的偏移量等等信息。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)