我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何 *** 作呢?
通常来说针对不同的场景,有以下三种办法进行流的拆分。
- Filter 分流
- Split 分流
- SideOutPut 分流
我们可以通过做多次 filter 算子,把需要的不同数据生成不同的流。
代码示例:
object filterStreamExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
//1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
//2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
val littleStream = inputStream.filter(_.split(",")(0).toInt < 500)
val bigStream = inputStream.filter(_.split(",")(0).toInt >= 500)
//打印结果
littleStream.print("little------")
bigStream.print("big------")
env.execute()
}
}
结果:
little------:13> 49,2011-02-18,1,0,2,0,5,1,1,0.521667,0.511983,0.516667,0.264925,579,2348,2927
big------:11> 687,2012-11-17,4,1,11,0,6,0,1,0.325,0.326383,0.545417,0.179729,1313,4316,5629
big------:8> 552,2012-07-05,3,1,7,0,4,1,1,0.8275,0.761367,0.457917,0.194029,1405,4836,6241
big------:11> 688,2012-11-18,4,1,11,0,0,0,1,0.3425,0.337746,0.692917,0.227612,922,3747,4669
little------:2> 279,2011-10-06,4,0,10,0,4,1,1,0.494167,0.480425,0.620833,0.134954,639,4126,4765
big------:11> 689,2012-11-19,4,1,11,0,1,1,2,0.380833,0.375621,0.623333,0.235067,449,5050,5499
...
总结:
Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。
2. Split 分流
Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。
代码示例:
object splitStreamExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setParallelism(1)
//1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
//2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
val splitStream: SplitStream[String] = inputStream.split(new OutputSelector[String] {
override def select(out: String): lang.Iterable[String] = {
val tags = new util.ArrayList[String]()
if (out.split(",")(0).toInt < 500) {
tags.add("littleStream")
} else if (out.split(",")(0).toInt >= 500) {
tags.add("bigStream")
}
return tags
}
})
//打印结果
splitStream.select("littleStream").print("little------")
splitStream.select("bigStream").print("big------")
env.execute()
}
}
结果:
little------:14> 49,2011-02-18,1,0,2,0,5,1,1,0.521667,0.511983,0.516667,0.264925,579,2348,2927
big------:12> 687,2012-11-17,4,1,11,0,6,0,1,0.325,0.326383,0.545417,0.179729,1313,4316,5629
little------:5> 369,2012-01-04,1,1,1,0,3,1,2,0.1075,0.119337,0.414583,0.1847,95,2273,2368
big------:12> 688,2012-11-18,4,1,11,0,0,0,1,0.3425,0.337746,0.692917,0.227612,922,3747,4669
little------:14> 50,2011-02-19,1,0,2,0,6,0,1,0.399167,0.391404,0.187917,0.507463,532,1103,1635
...
总结:
需要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 littleStream 和 bigStream 流再次调用 split 切分,控制台会抛出以下异常。
Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.
3. SideOutPut 分流是因为该方式已经 废弃 并且建议使用最新的 SideOutPut 进行分流 *** 作。
SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:
• 定义 OutputTag
• 调用特定函数进行数据拆分
- ProcessFunction (本次使用该函数)
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
代码示例:
object sideOutStreamExample {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.setParallelism(1)
//1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
//2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
//定义两个OutTag
val littleOutTag = new OutputTag[String]("littleStream")
val bigOutTag = new OutputTag[String]("bigStream")
val processStream = inputStream.process(new ProcessFunction[String, String] {
override def processElement(i: String,
context: ProcessFunction[String, String]#Context,
out: Collector[String]): Unit = {
if (i.split(",")(0).toInt < 500) {
context.output(littleOutTag, i)
} else if (i.split(",")(0).toInt >= 500) {
context.output(bigOutTag, i)
}
}
})
val littleStream = processStream.getSideOutput(littleOutTag)
val bigStream = processStream.getSideOutput(bigOutTag)
littleStream.print("little------")
bigStream.print("big------")
env.execute()
}
}
结果:
little------:1> 95,2011-04-05,2,0,4,0,2,1,2,0.414167,0.39835,0.642083,0.388067,167,1628,1795
little------:6> 323,2011-11-19,4,0,11,0,6,0,1,0.329167,0.324483,0.502083,0.224496,943,2720,3663
big------:11> 552,2012-07-05,3,1,7,0,4,1,1,0.8275,0.761367,0.457917,0.194029,1405,4836,6241
little------:6> 324,2011-11-20,4,0,11,0,0,0,2,0.463333,0.457058,0.684583,0.18595,787,2733,3520
big------:11> 553,2012-07-06,3,1,7,0,5,1,1,0.828333,0.752533,0.450833,0.146142,1366,4841,6207
little------:6> 325,2011-11-21,4,0,11,0,1,1,3,0.4475,0.445062,0.91,0.138054,220,2545,2765
...
总结:
Flink 最新提供的 SideOutPut 方式拆分流是可以多次进行拆分的,无需担心会报出异常。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)