【Structed Streaming】 *** 作 Streaming DataFrameDataSet

【Structed Streaming】 *** 作 Streaming DataFrameDataSet,第1张

【Structed Streaming】 *** 作 Streaming DataFrame/DataSet

在streaming Dataframes/Datasets上应用各种 *** 作.主要分两种:

  1. 直接执行 sql
  2. DSL(api)
1. 基本 *** 作

在静态的DF/DS 上大多数通用 *** 作都支持作用在 Streaming Dataframe/Streaming DataSet 上

1.1 演示数据

一会要处理的数据 people.json 内容:

{"name": "Michael","age": 29,"sex": "female"}
{"name": "Andy","age": 30,"sex": "male"}
{"name": "Justin","age": 19,"sex": "male"}
{"name": "Lisi","age": 18,"sex": "male"}
{"name": "zs","age": 10,"sex": "female"}
{"name": "zhiling","age": 40,"sex": "female"}
1.2 弱类型 api(了解)

弱类型api就是StreamingDataframe的API,特点是在算子中用的都是字段名;

import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.{Dataframe, SparkSession}


object BasicOperation {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("BasicOperation")
            .getOrCreate()
            
        val peopleSchema: StructType = new StructType()
            .add("name", StringType)
            .add("age", LongType)
            .add("sex", StringType)
        val peopleDF: Dataframe = spark.readStream
            .schema(peopleSchema)
            .json("/Users/lzc/Desktop/data")
        
        // 弱类型 api
        val df: Dataframe = peopleDF.select("name","age", "sex").where("age > 20") 
        df.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()
    }
}


1.select(String* field)
2.where(String condition)

1.3 强类型 api(了解)

强类型就是DS,需要对DF设置强类型;
查看DataSetAPI

import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}

 
object BasicOperation2 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("BasicOperation")
            .getOrCreate()
        import spark.implicits._
        
        val peopleSchema: StructType = new StructType()
            .add("name", StringType)
            .add("age", LongType)
            .add("sex", StringType)
        val peopleDF: Dataframe = spark.readStream
            .schema(peopleSchema)
            .json("/Users/lzc/Desktop/data")
        
        val peopleDS: Dataset[People] = peopleDF.as[People] // 转成 ds
        
        
        val df: Dataset[String] = peopleDS.filter(_.age > 20).map(_.name)
        df.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()
    }
}

case class People(name: String, age: Long, sex: String)

1.4 sql(重要)
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{Dataframe, Dataset, SparkSession}


object BasicOperation3 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("BasicOperation")
            .getOrCreate()
        import spark.implicits._
        
        val peopleSchema: StructType = new StructType()
            .add("name", StringType)
            .add("age", LongType)
            .add("sex", StringType)
        val peopleDF: Dataframe = spark.readStream
            .schema(peopleSchema)
            .json("/Users/lzc/Desktop/data")
        
        peopleDF.createOrReplaceTempView("people") // 创建临时表
        val df: Dataframe = spark.sql("select * from people where age > 20")
        
        df.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()
    }
}

2. 基于 event-time 的窗口 *** 作 2.1 event-time 窗口理解

在 Structured Streaming 中, 可以按照事件发生时的时间对数据进行聚合 *** 作, 即基于 event-time 进行 *** 作.
在这种机制下, 即不必考虑 Spark 陆续接收事件的顺序是否与事件发生的顺序一致, 也不必考虑事件到达 Spark 的时间与事件发生时间的关系.
因此, 它在提高数据处理精度的同时, 大大减少了开发者的工作量.

event-time 窗口应用场景案例:
我们现在想计算 10 分钟内的单词, 每 5 分钟更新一次, 也就是说计算在 10 分钟窗口 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20等之间收到的单词量. 注意, 12:00 - 12:10 表示数据在 12:00 之后 12:10 之前到达.
现在,考虑一下在 12:07 收到的单词。该单词应该增加对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。因此,计数将由分组键(即单词)和窗口(可以从事件时间计算)索引。
统计后的结果应该是这样的:


案例分析:
每5min,统计最近十分钟的每个单词的个数

  • 窗口:窗口大小 10min 滑动步长5min
  • 粒度:group by window,word

案例实现:

import java.sql.Timestamp

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}


object WordCountWindow {
    def main(args: Array[String]): Unit = {
        
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("WordCount1")
            .getOrCreate()
        
        import spark.implicits._
        //todo 1.source
        val lines: Dataframe = spark.readStream
            .format("socket") 
            .option("host", "localhost")
            .option("port", 10000)
            .option("includeTimestamp", true) // 给产生的数据自动添加时间戳
            .load
        
        //todo 2.process
        // 把行切割成单词, 保留时间戳
        val words: Dataframe = lines.as[(String, Timestamp)].flatMap(line => {
            line._1.split(" ").map((_, line._2))
        }).toDF("word", "timestamp")
        
        import org.apache.spark.sql.functions._
    
        // 按照窗口和单词分组, 并且计算每组的单词的个数
        val wordCounts: Dataset[Row] = words.groupBy(
            //调用 window 函数, 返回的是一个 Column 参数 1: df 中表示时间戳的列 参数 2: 窗口长度 参数 3: 滑动步长
            window($"s", "10 minutes", "5 minutes"),
            $"word"
        ).count().orderBy($"window")  // 计数, 并按照窗口排序
        
        val query: StreamingQuery = wordCounts.writeStream
            .outputMode("complete")
            .format("console")
            .option("truncate", "false")  // 不截断.为了在控制台能看到完整信息, 最好设置为 false
            .start
        query.awaitTermination()
    }
}

2.2 event-time 窗口生成规则
 The windows are calculated as below:
   * maxNumOverlapping <- ceil(windowDuration / slideDuration)
   * for (i <- 0 until maxNumOverlapping)
   *   windowId <- ceil((timestamp - startTime) / slideDuration)
   *   windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
   *   windowEnd <- windowStart + windowDuration
   *   return windowStart, windowEnd
2.2.1 窗口是左闭右开的

Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05).

2.2.2 window返回值是一个column 2.2.3 窗口个数
ceil(windowDuration / slideDuration)

这里要明白一个事情,一条数据根据其event-time所计算出来的窗口个数是遵循这个公式的,但是至于这条数据能不能全部进入这些窗口是不一定的!除非能整除,肯定所有窗口都能进,比如下面这个例子:

  * This behaves as follows for the given parameters for the time: 12:05. The valid windows are
  * marked with a +, and invalid ones are marked with a x. The invalid ones are filtered using the
  * Filter operator.
  * window: 12m, slide: 5m, start: 0m :: window: 12m, slide: 5m, start: 2m
  *     11:55 - 12:07 +                      11:52 - 12:04 x
  *     12:00 - 12:12 +                      11:57 - 12:09 +
  *     12:05 - 12:17 +                      12:02 - 12:14 +
2.2.4 startTime

相对于1970-01-01 00:00:00 UTC的偏移量,用来启动窗口间隔。

2.2.5 窗口id
 ceil((timestamp - startTime) / slideDuration)

就是说:从当前event-time到 1970-01-01 00:00:00 + startTime 这个范围内,经历了多少个slideDuration

2.2.6 windowStart
 windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
2.2.7 windowEnd
windowStart + windowDuration


由图中可以看出来,对于第一个窗口而言,该条数据还真不一定能进入这个窗口!
反正窗口的windowStart - startTime 一定是duration的整数倍

2.2.8 模拟窗口生成
package StructedStreaming._03_流式DF和DS的 *** 作._02_eventTime;

import org.apache.spark.sql.sources.In;

import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;

public class windowsplit {



    private static HashMap getWindows(double timestamp,double windowDuration,double slideDuration,double startTime){
        double num = Math.ceil(windowDuration/slideDuration);
        System.out.println("窗口个数:" + num);
        double windowId = Math.ceil((timestamp - startTime) / slideDuration);
        System.out.println("窗口id:" + windowId);



        HashMap result = new HashMap<>();
        HashMap result2 = new HashMap<>();


        for(int i = 0;i windows = getWindows(1634551127649.0, 10 * 60 * 1000, 3 * 60 * 1000, 0);

        for (Map.Entry doubleDoubleEntry : windows.entrySet()) {
            System.out.println(doubleDoubleEntry);
        }
    }
}
3 基于 Watermark 处理延迟数据

Structured Streaming 引入 Watermark 机制, 主要是为了解决以下两个问题:
1.处理聚合中的延迟数据
2.减少内存中维护的聚合状态.

1.延迟数据的定义
在数据分析系统中, Structured Streaming 可以持续的按照 event-time 聚合数据, 然而在此过程中并不能保证数据按照时间的先后依次到达. 例如: 当前接收的某一条数据的 event-time 可能远远早于之前已经处理过的 event-time. 在发生这种情况时, 往往需要结合业务需求对延迟数据进行过滤.
2.延迟数据的处理方式
现在考虑如果事件延迟到达会有哪些影响. 假如, 一个单词在 12:04(event-time) 产生, 在 12:11 到达应用. 应用应该使用 12:04 来在窗口(12:00 - 12:10)中更新计数, 而不是使用 12:11. 这些情况在我们基于窗口的聚合中是自然发生的, 因为结构化流可以长时间维持部分聚合的中间状态


3.中间状态的维护
但是, 如果这个查询运行数天, 系统很有必要限制内存中累积的中间状态的数量. 这意味着系统需要知道何时从内存状态中删除旧聚合, 因为应用不再接受该聚合的后期数据。为了实现这个需求, 从 spark2.1, 引入了 watermark(水印), 使用引擎可以自动的跟踪当前的事件时间, 并据此尝试删除旧状态.
4.waterMark的定义方式
通过指定 event-time 列和预估事件的延迟时间上限来定义一个查询的 watermark.

watermark = MaxEventTime - Threshhod 

waterMark是一个水位线,窗口闭合时间小于水位线的,全部闭合,不再接受迟到数据,并且该窗口的数据会从内存中删除;比如:
一个以时间 T 结束的窗口, 引擎会保留此窗口的状态和允许延迟时间直到(waterMark > T). 换句话说, 延迟时间在上限内的被聚合, 延迟时间超出上限的开始被丢弃.

5.waterMark的特点

  • watermark只能逐渐增加, 不能减少
  • waterMark所引用的字段必须是timestamp类型
4. 不同outmode的waterMark

在不同输出模式(complete, append, update)中, Watermark 会产生不同的影响.

4.1 update 模式下使用 watermark
  • 此模式下,窗口的输出条件:
    windowEndTime > waterMark && window中有新的或新增的数据.
    满足此条件,可将窗口数据输出;
  • windowEndTime > waterMark && window中没有update的数据
    满足此条件,窗口数据不输出,但是保留在内存中
  • windowEndTime < waterMark
    满足此条件,不管有没有数据更新,都会从内存中被删除,并且不会被输出
package StructedStreaming._03_流式DF和DS的 *** 作._03_waterMark

import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}

import java.sql.Timestamp


object _02_WordCountWaterMark1 {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("WordCountWatermark1")
      .getOrCreate()

    import spark.implicits._
    val lines: Dataframe = spark.readStream
      .format("socket")
      .option("host", "192.168.196.128")
      .option("port", 9999)
      .load

    // 输入的数据中包含时间戳, 而不是自动添加的时间戳
    val words: Dataframe = lines.as[String].flatMap(line => {
      val split = line.split(",")
      split(1).split(" ").map((_, Timestamp.valueOf(split(0))))
    }).toDF("word", "timestamp")

    import org.apache.spark.sql.functions._


    val wordCounts: Dataset[Row] = words
      .withWatermark("timestamp", "2 minutes")
      .groupBy(window($"timestamp", "10 minutes", "2 minutes"), $"word")
      .count()

    val query: StreamingQuery = wordCounts.writeStream
      .outputMode("update")
      .trigger(Trigger.ProcessingTime(1000))
      .format("console")
      .option("truncate", "false")
      .start
    query.awaitTermination()

    //1.输入数据:2019-08-14 10:55:00,dog
    //(1)当前waterMark = 0
    //(2)计算所有窗口,将未过期的窗口(窗口的endTime > waterMark) 并且 在 update 模式下,只输出结果表中涉及更新或新增的窗口
    //+------------------------------------------+----+-----+
    //|window                                    |word|count|
    //+------------------------------------------+----+-----+
    //|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |
    //|[2019-08-14 10:52:00, 2019-08-14 11:02:00]|dog |1    |
    //|[2019-08-14 10:50:00, 2019-08-14 11:00:00]|dog |1    |
    //|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |
    //|[2019-08-14 10:54:00, 2019-08-14 11:04:00]|dog |1    |
    //+------------------------------------------+----+-----+
    //(3) 然后根据当前批次中最大的 event-time, 计算出来下次使用的 watermark.
    // 本批次只有一个数据(10:55), 所有: watermark = 10:55 - 2min = 10:53


    // 2. 输入数据:2019-08-14 11:00:00,dog
    //(1)当前waterMark = 10:53
    //(2)计算所有窗口,将未过期的窗口并且窗口的endTime > waterMark的 并且 在 update 模式下,只输出结果表中涉及更新或新增的窗口
    //+------------------------------------------+----+-----+
    //|window                                    |word|count|
    //+------------------------------------------+----+-----+
    //|[2019-08-14 11:00:00, 2019-08-14 11:10:00]|dog |1    |
    //|[2019-08-14 10:52:00, 2019-08-14 11:02:00]|dog |2    |
    //|[2019-08-14 10:58:00, 2019-08-14 11:08:00]|dog |1    |
    //|[2019-08-14 10:54:00, 2019-08-14 11:04:00]|dog |2    |
    //|[2019-08-14 10:56:00, 2019-08-14 11:06:00]|dog |1    |
    //+------------------------------------------+----+-----+
    //其中: count 是 2 的表示更新, count 是 1 的表示新增. 没有变化的就没有显示.(但是内存中仍然保存着)
    //(3) 然后根据当前批次中最大的 event-time, 计算出来下次使用的 watermark. 10:58


    //3.输入数据:2019-08-14 10:55:00,dog
    //(1) 此数据对应的五个窗口中,当前内存中有两个窗口的结束时间已经低于 10: 58.
    //|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |
    //|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |
    //(2) 则立即删除这两个窗口在内存中的维护状态. 同时, 当前批次中新加入的数据所划分出来的窗口, 如果窗口结束时间低于 10:58, 则窗口会被过滤掉.
    //所以这次输出结果:
    //+------------------------------------------+----+-----+
    //|window                                    |word|count|
    //+------------------------------------------+----+-----+
    //|[2019-08-14 10:52:00, 2019-08-14 11:02:00]|dog |3    |
    //|[2019-08-14 10:50:00, 2019-08-14 11:00:00]|dog |2    |
    //|[2019-08-14 10:54:00, 2019-08-14 11:04:00]|dog |3    |
    //+------------------------------------------+----+-----+
    // 第三个批次的数据处理完成后, 立即计算: watermark= 10:55 - 2min = 10:53, 
    // 这个值小于当前的 watermask(10:58), 所以保持不变.(因为 watermask 只能增加不能减少)
  }
}
4.2 append 模式下使用 wartermark


把前一个案例中的update改成append即可.

val query: StreamingQuery = wordCounts.writeStream
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(0))
    .format("console")
    .option("truncate", "false")
    .start

在 append 模式中, 仅输出闭合的窗口的数据;

  • 窗口输出条件: windowEnd < waterMark
    注意:不是单纯的内存中的窗口的windowEnd < waterMark,就会将此window输出;而是由延迟数据,触发这个窗口的输出;

测试:
1.输入数据:2019-08-14 10:55:00,dog
(1)初始waterMark = 0
(2)计算此数据所属的所有窗口
按照window($“timestamp”, “10 minutes”, “2 minutes”)得到 5 个窗口.

[2019-08-14 10:46:00, 2019-08-14 10:56:00]
[2019-08-14 10:52:00, 2019-08-14 11:02:00]
[2019-08-14 10:50:00, 2019-08-14 11:00:00]
[2019-08-14 10:48:00, 2019-08-14 10:58:00]
[2019-08-14 10:54:00, 2019-08-14 11:04:00]

(3)窗口endTime和waterMark进行比较
此时初始 watermark=0, 当前批次中所有窗口的结束时间均大于 watermark.
但是 Structured Streaming 无法确定后续批次的数据中是否会更新当前批次的内容. 因此, 基于 Append 模式的特点, 这时并不会输出任何数据(因为输出后数据就无法更改了), 直到某个窗口的结束时间小于 watermark, 即可以确定后续数据不会再变更该窗口的聚合结果时才会将其输出, 并移除内存中对应窗口的聚合状态.
(4)输出:

+------+----+-----+
|window|word|count|
+------+----+-----+
+------+----+-----+

(5) 更新waterMark = 10:53

2.输入数据:2019-08-14 11:00:00,dog
(1)waterMark = 10:53
(2)计算此数据所属的所有窗口

[2019-08-14 11:00:00, 2019-08-14 11:10:00]
[2019-08-14 10:52:00, 2019-08-14 11:02:00]
[2019-08-14 10:58:00, 2019-08-14 11:08:00]
[2019-08-14 10:54:00, 2019-08-14 11:04:00]
[2019-08-14 10:56:00, 2019-08-14 11:06:00]

(3)窗口endTime和waterMark进行比较
均大于waterMark,所以不输出
(4) 更新waterMark = 10:58
注意,此时内存中有两个窗口已经过期了,但是不会被输出

|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |
|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |

3.输入数据:2019-08-14 10:55:00,dog
(1)此时的 watermark = 10:58
(2)当前内存中有两个窗口的结束时间已经低于 10: 58.

[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |
[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |

则意味着这两个窗口的数据不会再发生变化, 此时输出这个两个窗口的聚合结果, 并在内存中清除这两个窗口的状态.
所以这次输出结果:

+------------------------------------------+----+-----+
|window                                    |word|count|
+------------------------------------------+----+-----+
|[2019-08-14 10:46:00, 2019-08-14 10:56:00]|dog |1    |
|[2019-08-14 10:48:00, 2019-08-14 10:58:00]|dog |1    |
+------------------------------------------+----+-----+

第三个批次的数据处理完成后, 立即计算: watermark= 10:55 - 2min = 10:53, 这个值小于当前的 watermask(10:58), 所以保持不变.(因为 watermask 只能增加不能减少)

4.3 watermark 机制总结

1.watermark 在用于基于时间的状态聚合 *** 作时, 该时间可以基于窗口, 也可以基于 event-time本身.
2.输出模式必须是append或update. 在输出模式是complete的时候(必须有聚合), 要求每次输出所有的聚合结果. 我们使用 watermark 的目的是丢弃一些过时聚合数据, 所以complete模式使用wartermark无效也无意义.
3.在输出模式是append时, 必须设置 watermask 才能使用聚合 *** 作. 其实, watermark 定义了 append 模式中何时输出聚合聚合结果(状态), 并清理过期状态.
4.在输出模式是update时, watermark 主要用于过滤过期数据并及时清理过期状态.
5.watermark 会在处理当前批次数据时更新, 并且会在处理下一个批次数据时生效使用. 但如果节点发送故障, 则可能延迟若干批次生效.
6.withWatermark 必须使用与聚合 *** 作中的时间戳列是同一列.df.withWatermark(“time”, “1 min”).groupBy(“time2”).count() 无效
7.withWatermark 必须在聚合之前调用.
f.groupBy(“time”).count().withWatermark(“time”, “1 min”) 无效

5.Types of time windows

Spark supports three types of time windows: tumbling (fixed), sliding and session.

  • Tumbling windows are a series of fixed-sized, non-overlapping and contiguous time intervals. An input can only be bound to a single window.

  • Sliding windows are similar to the tumbling windows from the point of being “fixed-sized”, but windows can overlap if the duration of slide is smaller than the duration of window, and in this case an input can be bound to the multiple windows.

  • Tumbling and sliding window use window function, which has been described on above examples.

  • Session windows have different characteristic compared to the previous two types. Session window has a dynamic size of the window length, depending on the inputs. A session window starts with an input, and expands itself if following input has been received within gap duration. For static gap duration, a session window closes when there’s no input received within gap duration after receiving the latest input.

  • Session window uses session_window function. The usage of the function is similar to the window function.

val sessionizedCounts = events
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window($"timestamp", "5 minutes"),
        $"userId")
    .count()

Note that there are some restrictions when you use session window in streaming query, like below:

“Update mode” as output mode is not supported.
There should be at least one column in addition to session_window in grouping key.

waterMark清除聚合状态的条件

It is important to note that the following conditions must be satisfied for the watermarking to clean the state in aggregation queries (as of Spark 2.1.1, subject to change in the future).

  • Output mode must be Append or Update. Complete mode requires all aggregate data to be preserved, and hence cannot use watermarking to drop intermediate state.

  • The aggregation must have either the event-time column, or a window on the event-time column.

  • watermark绑定的timestamp字段必须和聚合所使用的的字段是同一个.
    For example, df.withWatermark(“time”, “1 min”).groupBy(“time2”).count() is invalid in Append output mode, as watermark is defined on a different column from the aggregation column.

  • withWatermark must be called before the aggregation for the watermark details to be used. For example, df.groupBy(“time”).count().withWatermark(“time”, “1 min”) is invalid in Append output mode.

5.流数据去重
1,2019-09-14 11:50:00,dog
2,2019-09-14 11:51:00,dog
1,2019-09-14 11:50:00,dog
3,2019-09-14 11:53:00,dog
1,2019-09-14 11:50:00,dog
4,2019-09-14 11:45:00,dog

import java.sql.Timestamp
import org.apache.spark.sql.{Dataframe, Dataset, Row, SparkSession}

object StreamDropDuplicate {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("Test")
            .getOrCreate()
        import spark.implicits._
        
        val lines: Dataframe = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 10000)
            .load()
        
        val words: Dataframe = lines.as[String].map(line => {
            val arr: Array[String] = line.split(",")
            (arr(0), Timestamp.valueOf(arr(1)), arr(2))
        }).toDF("uid", "ts", "word")
        
        val wordCounts: Dataset[Row] = words
            .withWatermark("ts", "2 minutes")
            .dropDuplicates("uid")  // 去重重复数据 uid 相同就是重复.  可以传递多个列
        
        wordCounts.writeStream
            .outputMode("append")
            .format("console")
            .start
            .awaitTermination()
        
    }
}
5.1 测试

1.第一批: 1,2019-09-14 11:50:00,dog

  (1) wm = 0
  (2) timestamp > wm 并且 [wm,~) 无重复  所以输出
    +---+-------------------+----+
    |uid|                 ts|word|
    +---+-------------------+----+
    |  1|2019-09-14 11:50:00| dog|
    +---+-------------------+----+
  (3) 更新wm=48

2.第 2 批: 2,2019-09-14 11:51:00,dog

(1) wm=48
(2) timestamp > wm 并且 [wm,~) 无重复  所以输出
    +---+-------------------+----+
    |uid|                 ts|word|
    +---+-------------------+----+
    |  2|2019-09-14 11:51:00| dog|
    +---+-------------------+----+
(3)  更新wm=49

3.第 3 批: 1,2019-09-14 11:50:00,dog

 (1) wm=49
 (2) timestamp > wm  没过期 但是 [wm,~) id 重复  所以无输出
 (3) wm=49

4.第 4 批: 3,2019-09-14 11:53:00,dog

 (1) wm=49
 (2) timestamp > wm 并且 [wm,~) id 重复无输出
    +---+-------------------+----+
    |uid|                 ts|word|
    +---+-------------------+----+
    |  3|2019-09-14 11:53:00| dog|
    +---+-------------------+----+
 (3) watermark=51

5.第 5 批: 1,2019-09-14 11:50:00,dog

(1) wm=51
(2)timestamp < wm 数据过期    所以无输出

第 6 批 4,2019-09-14 11:45:00,dog 数据过期, 所以无输出
7. 第7批 1,2019-09-14 11:52:00,dog
测试wm>51的还输出不输出

  • append模式下去重的特点:
    来一条数据时,其timestamp > wm 并且 在[wm,~)范围内没有重复过的数据;
  • 如何判断一条数据有没有过期,从内存中移除?
    当timestamp <= waterMark的时候,该条数据就会从内存中移除
5.2 使用注意事项
  1. dropDuplicates 不可用在聚合之后, 即通过聚合得到的 df/ds 不能调用dropDuplicates

  2. 使用watermark - 如果重复记录的到达时间有上限,则可以在事件时间列上定义水印,并使用uid和事件时间列进行重复数据删除。该查询将使用水印从过去的记录中删除旧的状态数据,这些记录不会再当做重复。这限制了查询必须维护的状态量。

  3. 没有使用watermark - 由于重复记录可能到达时没有界限,查询将来自所有过去记录的数据存储为状态

7. join *** 作

Structured Streaming 支持 :
(1)streaming DataSet/Dataframe 与静态的DataSet/Dataframe 进行 join
(2)也支持 streaming DataSet/Dataframe与另外一个streaming DataSet/Dataframe 进行 join.

API:

df1.join(df2,Seq(fields),joinType)

第二个参数是关联字段;

join 的结果也是持续不断的生成, 类似于前面学习的 streaming 的聚合结果.

7.1 static-Stream join
package StructedStreaming._03_流式DF和DS的 *** 作._05_Join

import org.apache.spark.sql.{Dataframe, SparkSession}



object StreamingStatic {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("StreamingStatic")
      .getOrCreate()
    import spark.implicits._
    // 1. 静态 df
    val arr = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));
    var staticDF: Dataframe = spark.sparkContext.parallelize(arr).toDF("name", "sex")

    // 2. streaming df
    val lines: Dataframe = spark.readStream
      .format("socket")
      .option("host", "fantasy")
      .option("port", 9999)
      .load()

    val streamDF: Dataframe = lines.as[String].map(line => {
      val arr = line.split(",")
      (arr(0), arr(1).toInt)
    }).toDF("name", "age")

    // 3. inner join
    //val joinResult: Dataframe = streamDF.join(staticDF, "name")

    //lisi,20
    //zhiling,40
    //ww,30

    //+-------+---+------+
    //|   name|age|   sex|
    //+-------+---+------+
    //|zhiling| 40|female|
    //|   lisi| 20|  male|
    //+-------+---+------+
    // a join b  a 的字段在前面

    // 4.left out join
    val joinResult: Dataframe = streamDF.join(staticDF, Seq("name"), "left")
    // +-------+---+------+
    //|   name|age|   sex|
    //+-------+---+------+
    //|zhiling| 40|female|
    //|     ww| 30|  null|
    //|   lisi| 20|  male|
    //+-------+---+------+


    joinResult.writeStream
      .outputMode("append")
      .format("console")
      .start
      .awaitTermination()
  }
}
7.2 Stream-stream Joins
  • 在 Spark2.3, 开始支持 stream-stream join.
  • Spark 会自动维护两个流的状态, 以保障后续流入的数据能够和之前流入的数据发生 join *** 作, 但这会导致状态无限增长. 因此, 在对两个流进行 join *** 作时, 依然可以用 watermark 机制来消除过期的状态, 避免状态无限增长.
  • 对 2 个流式数据进行 join *** 作. 输出模式仅支持append模式
7.2.1 不带waterMark
package StructedStreaming._03_流式DF和DS的 *** 作._05_Join

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{Dataframe, SparkSession}

import java.sql.Timestamp



object StreamingStreamingInnerJoinWithoutWaterMark {
  def main(args: Array[String]): Unit = {
    //对 2 个流式数据进行 join  *** 作. 输出模式仅支持append模式
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[*]")
      .appName("StreamStream1")
      .getOrCreate()

    import spark.implicits._
    // 第 1 个 stream
    val nameSexStream: Dataframe = spark.readStream
      .format("socket")
      .option("host", "fantasy")
      .option("port", 9999)
      .load
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0), arr(1), Timestamp.valueOf(arr(2)))
      }).toDF("name", "sex", "ts1")


    // 第 2 个 stream
    val nameAgeStream: Dataframe = spark.readStream
      .format("socket")
      .option("host", "hadoop201")
      .option("port", 20000)
      .load
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
      }).toDF("name", "age", "ts2")


    // join  *** 作
    val joinResult: Dataframe = nameSexStream.join(nameAgeStream, "name")

    joinResult.writeStream
      .outputMode("append")
      .format("console")
      .trigger(Trigger.ProcessingTime(0))
      .start()
      .awaitTermination()
  }

  //第 1 个数据格式: 姓名,年龄,事件时间
  //lisi,female,2019-09-16 11:50:00
  //zs,male,2019-09-16 11:51:00
  //ww,female,2019-09-16 11:52:00
  //zhiling,female,2019-09-16 11:53:00
  //fengjie,female,2019-09-16 11:54:00
  //yifei,female,2019-09-16 11:55:00


  //第 2 个数据格式: 姓名,性别,事件时间
  //lisi,18,2019-09-16 11:50:00
  //zs,19,2019-09-16 11:51:00
  //ww,20,2019-09-16 11:52:00
  //zhiling,22,2019-09-16 11:53:00
  //yifei,30,2019-09-16 11:54:00
  //fengjie,98,2019-09-16 11:55:00


  //+-------+------+-------------------+---+-------------------+
  //|   name|   sex|                ts1|age|                ts2|
  //+-------+------+-------------------+---+-------------------+
  //|zhiling|female|2019-09-16 11:53:00| 22|2019-09-16 11:53:00|
  //|     ww|female|2019-09-16 11:52:00| 20|2019-09-16 11:52:00|
  //|  yifei|female|2019-09-16 11:55:00| 30|2019-09-16 11:54:00|
  //|     zs|  male|2019-09-16 11:51:00| 19|2019-09-16 11:51:00|
  //|fengjie|female|2019-09-16 11:54:00| 98|2019-09-16 11:55:00|
  //|   lisi|female|2019-09-16 11:50:00| 18|2019-09-16 11:50:00|
  //+-------+------+-------------------+---+-------------------+
}
8. Streaming DF/DS 不支持的 *** 作

到目前, DF/DS 的有些 *** 作 Streaming DF/DS 还不支持.
1.多个Streaming 聚合(例如在 DF 上的聚合链)目前还不支持
2.limit 和取前 N 行还不支持
3.distinct 也不支持
4.仅仅支持对 complete 模式下的聚合 *** 作进行排序 *** 作
5.仅支持有限的外连接
6.有些方法不能直接用于查询和返回结果, 因为他们用在流式数据上没有意义.

•count() 不能返回单行数据, 必须是s.groupBy().count()
•foreach() 不能直接使用, 而是使用: ds.writeStream.foreach(…)
•show() 不能直接使用, 而是使用 console sink
如果执行上面 *** 作会看到这样的异常: operation XYZ is not supported with streaming Dataframes/Datasets.

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5071117.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-16
下一篇 2022-11-16

发表评论

登录后才能评论

评论列表(0条)

保存