- 多流转换
无论是基本的简单转换和聚合, 还是基于窗口的计算,都是针对一条流上的数据进行 处理的。在实际应用中, 可能需要将不同来源的数据连接合并在一起处理, 也有可能需要将 一条流拆分开, 所以经常会有对多条流进行处理的场景。简单划分,多流转换可以分为“分流”和“合流”两大类。目前分流的 *** 作一般是通 过侧输出流(side output) 来实现,而合流的算子比较丰富,根据不同的需求可以调用 union、 connect、join 以及 coGroup 等接口进行连接合并 *** 作。
-
侧输出流
简单来说,只需要调用上下文 ctx的output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取, 都离不开一个“输出标签”(OutputTag),指定了侧输出流的 id 和类型。
代码示例:package com.company.flink.demo; import com.company.flink.data.ClickSource; import com.company.flink.entity.Event; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; public class SplitStreamDemo { // 定义侧输出流 private static OutputTag
> zhangsanTag = new OutputTag >("zhangsan-pv"){}; private static OutputTag > lisiTag = new OutputTag >("lisi-pv"){}; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SingleOutputStreamOperator stream = env .addSource(new ClickSource()); SingleOutputStreamOperator outPutStream = stream.process( new ProcessFunction () { @Override public void processElement(Event value, Context ctx, Collector out) { if (value.user.equals("zhangsan")) { ctx.output(zhangsanTag, new Tuple3<>(value.user, value.url, value.timestamp)); } else if (value.user.equals("lisi")) { ctx.output(lisiTag, new Tuple3<>(value.user, value.url, value.timestamp)); } else { out.collect(value); } } }); DataStream > zhangsanSideOutput = outPutStream.getSideOutput(zhangsanTag); zhangsanSideOutput.print("zhangsan pv"); DataStream > lisiSideOutput = outPutStream.getSideOutput(lisiTag); lisiSideOutput.print("lisi pv"); outPutStream.print("else"); env.execute(); } } -
合流 *** 作
Flink 中合流的 *** 作会更加普遍,对应的 API 也更加丰富。
1)联合(Union)
最简单的合流 *** 作, 就是直接将多条流合在一起,叫作流“联合”(union)。联合 *** 作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素, 数据类型不变。2)连接(Connect)
流的联合虽然简单,不过受限于数据类型不能改变,灵活性不足,实践中较少使用。除了联合(union),Flink 还提供了另外一种合流 *** 作就是连接(connect)。这种 *** 作就是直接把两条流像接线一样对接起来。
为了处理更加灵活,连接 *** 作允许流的数据类型不同。但一个 DataStream 中的数据类型是唯一的(所以需要(co-process)转换 *** 作) - 基于时间的合流——双流联结(Join)
对于两条流的合并,很多情况并不是简单地将所有数据放在一起,而是希望根据某个字段的值将它们联结起来“配对”做处理。 这种需求与关系型数据库中表的join *** 作非常相似。Flink 中两条流 的 connect *** 作,就可以通过 keyBy 指定键进行分组后合并,实现了类似于 SQL 中的 join *** 作; 另外 connect 支持处理函数,可以使用自定义状态和 TimerService 灵活实现各种需求。
1)窗口联结(Window Join)
窗口联结首先需要调用 DataStream 的.join()方法来合并两条流, 得到一 个 、JoinedStreams ;接着通过 .where() 和.equalTo() 方法指定两条流中联结的 key ;然后通 过.window()开窗口, 并调用.apply()传入联结窗口函数进行处理计算。调用形式如下:SingleOutputStreamOperator
eventStream = env.addSource(new ClickSource()) .join(DataStream otherStream) .where(KeySelector keySelector) .equalTo(KeySelector keySelector) .window(WindowAssigner assigner) .apply(JoinFunction function) 上面代码中.where()的参数是键选择器(KeySelector), 用来指定第一条流中的 key ;而.equalTo()传入的 KeySelector 则指定了第二条流中的 key。两者相同的元素,如果在同一窗口中, 就可以匹配起来, 并通过一个“联结函数”(JoinFunction) 进行处理。这里需要注意, JoinFunciton 并不是真正的“窗口函数”,它只是定义了窗口函数在调用时 对匹配数据的具体处理逻辑。
两条流的数据到来之后, 首先会按照 key 分组、进入对应的窗口中存储;当到达窗口结束时间时,算子会先统计出窗口内两条流的数据的所有组合,也就是对两条流中的数据做一个笛卡尔积(相当于表的交叉连接, cross join),然后进行遍历,把每一对匹配的数据, 作为参数 (first ,second)传入 JoinFunction 的.join()方法进行计算处理。所以窗口中每有一对数据成功联结匹配, JoinFunction 的.join()方法就会被调用一次, 并输 出一个结果。窗口join的调用和SQL中表的join非常相似。SQL的 inner join ... on本身表示的是两张表基于 id 的“内连接”(inner join)。而 Flink 中的 window join,同样类似于 inner join。也就是说,最后 处理输出的,只有两条流中数据按 key 配对成功的那些;如果某个窗口中一条流的数据没有任 何另一条流的数据匹配, 那么就不会调用 JoinFunction 的.join()方法, 也就没有任何输出。
代码示列:package com.company.flink.demo; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; public class FlinkWindowJoinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream
> stream1 = env .fromElements( Tuple2.of("a", 1000L), Tuple2.of("a", 2000L), Tuple2.of("b", 1000L), Tuple2.of("b", 2000L), Tuple2.of("c", 5000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy . >forMonotonousTimestamps().withTimestampAssigner( (SerializableTimestampAssigner >) (stringLongTuple2, l) -> stringLongTuple2.f1 ) ); DataStream > stream2 = env .fromElements( Tuple2.of("a", 3000L), Tuple2.of("a", 4000L), Tuple2.of("b", 3000L), Tuple2.of("b", 4000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy . >forMonotonousTimestamps().withTimestampAssigner( (SerializableTimestampAssigner >) (stringLongTuple2, l) -> stringLongTuple2.f1 ) ); stream1 .join(stream2) .where(r -> r.f0) .equalTo(r -> r.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply((JoinFunction , Tuple2 , String>) (left, right) -> left + "=>" + right) .print(); env.execute(); } } 2)间隔联结(Interval Join)
在一些场景下, 处理的时间间隔可能并不是固定的。比如, 在交易系统中, 需要实时地对每一笔交易进行核验,保证两个账户转入转出数额相等,也就是所谓的“实时对账”。 两次转账的数据可能写入了不同的日志流,它们的时间戳相差不大,所以可以考虑只统计一段时间内是否有出账入账的数据匹配。这时显然不能用滚动窗口或滑动窗口来处理因为匹配的两个数据有可能刚好在窗口边缘两侧,这时窗口内就都没有匹配了;会话窗口虽然时间不固定,但也明显不适合这个场景。Flink 为这种场景提供了一种叫作“间隔联结”(interval join) 的合流 *** 作,间隔联结的思路就是针对一条流的每条数据,开辟出其时间戳前后的一段时间间隔, 看这期间是否有来自另一条流的数据与之匹配。
间隔连接原理:
给定两个时间点,分别叫作间隔的“上界”(upperBound) 和“下界”(lowerBound);对于一条流A 中的任意一个数据元素 a,就可以 开辟一段时间间隔: [a.timestamp + lowerBound, a.timestamp + upperBound)把这段时间作为可以匹配另一条流数据 的“窗口”范围。所以对于另一条流 B 中的数据元素 b,如果它的时间戳落在了这 个区间范围内, a 和 b 就可以成功配对,进而进行计算输出结果。所以匹配的条件为:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound 这里需要注意, 做间隔联结的两条流 A 和 B,也必须基于相同的 key; 间隔联结目前只支持事件时间语义。如图所示:
可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join 做匹配的时间段是基于流中数据的并不确定; 而且流 B 中的数据可以不只在一个区间内被匹配。
代码示列:
package com.company.flink.demo; import com.company.flink.entity.Event; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; // 基于间隔的 join public class FlinkIntervalJoinDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator
> orderStream = env.fromElements( Tuple3.of("Mary", "order-1", 5000L), Tuple3.of("Alice", "order-2", 5000L), Tuple3.of("Bob", "order-3", 20000L), Tuple3.of("Alice", "order-4", 20000L), Tuple3.of("Cary", "order-5", 51000L) ).assignTimestampsAndWatermarks(WatermarkStrategy. >forMonotonousTimestamps() .withTimestampAssigner((SerializableTimestampAssigner >) (element, recordTimestamp) -> element.f2) ); SingleOutputStreamOperator clickStream = env.fromElements( new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Alice", "./prod?id=200", 3500L), new Event("Bob", "./prod?id=2", 2500L), new Event("Alice", "./prod?id=300", 36000L), new Event("Bob", "./home", 30000L), new Event("Bob", "./prod?id=1", 23000L), new Event("Bob", "./prod?id=3", 33000L) ).assignTimestampsAndWatermarks(WatermarkStrategy. forMonotonousTimestamps() .withTimestampAssigner((SerializableTimestampAssigner ) (element, recordTimestamp) -> element.timestamp) ); orderStream.keyBy(data -> data.f0) .intervalJoin(clickStream.keyBy(data -> data.user)) .between(Time.seconds(-5), Time.seconds(10)) .process(new ProcessJoinFunction , Event, String>() { @Override public void processElement(Tuple3 left, Event right, Context ctx, Collector out) throws Exception { out.collect(right + " => " + left); } }) .print(); env.execute(); } } 3)窗口同组联结(Window CoGroup)
窗口同组联结(window coGroup)的用法跟 window join 非常类似, 也是将两条流合并之后开窗处理匹配的元素,调用时只需要将.join()换为.coGroup()就可以了。
与 window join的区别在于,调用 .apply() 方法定义 具体 *** 作 时,传入的是一个CoGroupFunction。它是一个函数类接口public interface CoGroupFunction
extends Function, Serializable { void coGroup(Iterable first, Iterable second, Collector out) throws Exception; } coGroup()方法,有些类似于 FlatJoinFunction中 join()的形式, 同样有三个参数, 分别代表两条流中的数据以及用于输出的收集器(Collector)。不同的是,这里的前两个参数 不再是单独的每一组“配对”数据了, 而是传入了可遍历的数据集合。也就是说不会再去计算窗口中两条流数据集的笛卡尔积,而是直接把收集到的所有数据一次性传入,至于要怎样配对完全是自定义。这样coGroup()方法只会被调用一次, 而且即使一条流的数据没有任何另一条流的数据匹配, 也可以出现在集合中、当然也可以定义输出结果了。能够看出 coGroup *** 作比窗口的 join 更加通用,不仅可以实现类似 SQL 中的“内 连接”(inner join),也可以实现左外连接(left outer join)、右外连接(right outer join) 和全外 连接(full outer join)。事实上, 窗口join 的底层,也是通过 coGroup 来实现的。
代码示例:package com.company.flink.demo; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class FlinkCoGroupDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStream
> stream1 = env .fromElements( Tuple2.of("a", 1000L), Tuple2.of("a", 2000L), Tuple2.of("b", 1000L), Tuple2.of("b", 2000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy . >forMonotonousTimestamps() .withTimestampAssigner( (SerializableTimestampAssigner >) (stringLongTuple2, l) -> stringLongTuple2.f1 ) ); DataStream > stream2 = env .fromElements( Tuple2.of("a", 3000L), Tuple2.of("b", 3000L), Tuple2.of("b", 4000L), Tuple2.of("a", 4000L), Tuple2.of("c", 4000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy . >forMonotonousTimestamps() .withTimestampAssigner( (SerializableTimestampAssigner >) (stringLongTuple2, l) -> stringLongTuple2.f1 ) ); stream1 .coGroup(stream2) .where(r -> r.f0) .equalTo(r -> r.f0) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply(new CoGroupFunction , Tuple2 , String>() { @Override public void coGroup(Iterable > iter1, Iterable > iter2, Collector collector) { collector.collect(iter1 + "=>" + iter2); } }) .print(); env.execute(); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)