structured streaming 入门级初使用(一)

structured streaming 入门级初使用(一),第1张

structured streaming 入门级初使用(一) 1、准备

导入依赖

		
            org.scala-lang
            scala-library
            2.11.0
        
        
            org.apache.spark
            spark-streaming_2.11
            2.3.2
        
        
            org.apache.spark
            spark-streaming-flume_2.11
            2.3.2
        

        
            com.alibaba
            fastjson
            1.2.76
        

        
            org.apache.spark
            spark-core_2.11
            2.3.2
        
        
        
            org.apache.spark
            spark-streaming-kafka-0-10_2.11
            2.3.2
        
        
        
            org.apache.spark
            spark-sql_2.11
            2.3.2
        
	     
            org.apache.spark
            spark-sql-kafka-0-10_2.11
            2.3.2
        
        
        
            redis.clients
            jedis
            2.9.0
        

        
            org.apache.commons
            commons-lang3
            3.3.2
        
        
        
            com.alibaba
            fastjson
            1.2.76
        
        
     
            org.apache.kafka
            kafka_2.11
            2.3.0
        
        
        
            org.apache.kafka
            kafka-clients
            2.1.0
        
2、从kafka读取数据

注意:

1、structured streaming 读取kafka数据是不需要设置group id的。
2、df.selectExpr()中还可以选择kafka的key等

代码如下

    val conf = new SparkConf().setMaster("local[*]")
    val spark =  SparkSession
      .builder()
      .config(conf)
      .appName(getClass.getName)
      .getOrCreate()
 import spark.implicits._

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", PropertiesUtils.loadProperties("kafka.broker.list"))
      .option("subscribe", "cache_tmp")//
      .option("startingOffsets", "earliest")
      .load()
    val re = df.selectExpr("CAST(value AS STRING) ")//
      .as[(String)]
3、写入kafka

注意:

1、写入kafka需要做checkpoint(),checkpoint 中保存着偏移量
2、目的 kafka 即使不存在 也会自己创建。

代码如下

   val query = re

      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers",PropertiesUtils.loadProperties("kafka.broker.list"))
      .option("topic","tmp_t")
      .option("checkpointLocation","E:/t_check_2")//生产环境中要放在hdfs哟。
      .start()
    query.awaitTermination()

这里的checkpoint的目录保存这消费的偏移量等等信息。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5638567.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存