Flink 分流之 FilterSplitSideOutPut 比较

Flink 分流之 FilterSplitSideOutPut 比较,第1张

应用场景:

我们在生产实践中经常会遇到这样的场景,需把输入源按照需要进行拆分,比如我期望把订单流按照金额大小进行拆分,或者把用户访问日志按照访问者的地理位置进行拆分等。面对这样的需求该如何 *** 作呢?

通常来说针对不同的场景,有以下三种办法进行流的拆分。

  1. Filter 分流
  2. Split 分流
  3. SideOutPut 分流
1. Filter 分流

我们可以通过做多次 filter 算子,把需要的不同数据生成不同的流。

代码示例:

object filterStreamExample {
	  def main(args: Array[String]): Unit = {
	    val env = StreamExecutionEnvironment.getExecutionEnvironment
	    env.setParallelism(1)
	
	    //1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
	    //2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
	    val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
	
	    val littleStream = inputStream.filter(_.split(",")(0).toInt < 500)
	    val bigStream = inputStream.filter(_.split(",")(0).toInt >= 500)
	
	    //打印结果
	    littleStream.print("little------")
	    bigStream.print("big------")
	    env.execute()
	  }
}

结果:

little------:13> 49,2011-02-18,1,0,2,0,5,1,1,0.521667,0.511983,0.516667,0.264925,579,2348,2927
big------:11> 687,2012-11-17,4,1,11,0,6,0,1,0.325,0.326383,0.545417,0.179729,1313,4316,5629
big------:8> 552,2012-07-05,3,1,7,0,4,1,1,0.8275,0.761367,0.457917,0.194029,1405,4836,6241
big------:11> 688,2012-11-18,4,1,11,0,0,0,1,0.3425,0.337746,0.692917,0.227612,922,3747,4669
little------:2> 279,2011-10-06,4,0,10,0,4,1,1,0.494167,0.480425,0.620833,0.134954,639,4126,4765
big------:11> 689,2012-11-19,4,1,11,0,1,1,2,0.380833,0.375621,0.623333,0.235067,449,5050,5499
...

总结:

Filter 的弊端是显而易见的,为了得到我们需要的流数据,需要多次遍历原始流,这样无形中浪费了我们集群的资源。


2. Split 分流

Split 也是 Flink 提供给我们将流进行切分的方法,需要在 split 算子中定义 OutputSelector,然后重写其中的 select 方法,将不同类型的数据进行标记,最后对返回的 SplitStream 使用 select 方法将对应的数据选择出来。

代码示例:

object splitStreamExample {
	  def main(args: Array[String]): Unit = {
		    val env = StreamExecutionEnvironment.getExecutionEnvironment
		//    env.setParallelism(1)
		
		    //1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
		    //2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
		    val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
		
		    val splitStream: SplitStream[String] = inputStream.split(new OutputSelector[String] {
		      override def select(out: String): lang.Iterable[String] = {
		        val tags = new util.ArrayList[String]()
		        if (out.split(",")(0).toInt < 500) {
		          tags.add("littleStream")
		        } else if (out.split(",")(0).toInt >= 500) {
		          tags.add("bigStream")
		        }
		        return tags
		      }
		    })
		    
			//打印结果
		    splitStream.select("littleStream").print("little------")
		    splitStream.select("bigStream").print("big------")
		
		    env.execute()
	  }
}

结果:

little------:14> 49,2011-02-18,1,0,2,0,5,1,1,0.521667,0.511983,0.516667,0.264925,579,2348,2927
big------:12> 687,2012-11-17,4,1,11,0,6,0,1,0.325,0.326383,0.545417,0.179729,1313,4316,5629
little------:5> 369,2012-01-04,1,1,1,0,3,1,2,0.1075,0.119337,0.414583,0.1847,95,2273,2368
big------:12> 688,2012-11-18,4,1,11,0,0,0,1,0.3425,0.337746,0.692917,0.227612,922,3747,4669
little------:14> 50,2011-02-19,1,0,2,0,6,0,1,0.399167,0.391404,0.187917,0.507463,532,1103,1635
...

总结:

需要注意,使用 split 算子切分过的流,是不能进行二次切分的,假如把上述切分出来的 littleStream 和 bigStream 流再次调用 split 切分,控制台会抛出以下异常。

Exception in thread "main" java.lang.IllegalStateException: Consecutive multiple splits are not supported. Splits are deprecated. Please use side-outputs.

是因为该方式已经 废弃 并且建议使用最新的 SideOutPut 进行分流 *** 作。

3. SideOutPut 分流

SideOutPut 是 Flink 框架为我们提供的最新的也是最为推荐的分流方法,在使用 SideOutPut 时,需要按照以下步骤进行:
• 定义 OutputTag
• 调用特定函数进行数据拆分

  1. ProcessFunction (本次使用该函数)
  2. KeyedProcessFunction
  3. CoProcessFunction
  4. KeyedCoProcessFunction
  5. ProcessWindowFunction
  6. ProcessAllWindowFunction

代码示例:

object sideOutStreamExample {
		  def main(args: Array[String]): Unit = {
			    val env = StreamExecutionEnvironment.getExecutionEnvironment
			//    env.setParallelism(1)
			
			    //1,2011-01-01,1,0,1,0,6,0,2,0.344167,0.363625,0.805833,0.160446,331,654,985
			    //2,2011-01-02,1,0,1,0,0,0,2,0.363478,0.353739,0.696087,0.248539,131,670,801
			    val inputStream: DataStream[String] = env.readTextFile("/home/rjxy/zlp/Code/CodePro/GuoSai/Task01/src/main/resources/day.csv")
			
			    //定义两个OutTag
			    val littleOutTag = new OutputTag[String]("littleStream")
			    val bigOutTag = new OutputTag[String]("bigStream")
			
			    val processStream = inputStream.process(new ProcessFunction[String, String] {
			      override def processElement(i: String,
			                                  context: ProcessFunction[String, String]#Context,
			                                  out: Collector[String]): Unit = {
			        if (i.split(",")(0).toInt < 500) {
			          context.output(littleOutTag, i)
			        } else if (i.split(",")(0).toInt >= 500) {
			          context.output(bigOutTag, i)
			        }
			      }
			    })
			
			    val littleStream = processStream.getSideOutput(littleOutTag)
			    val bigStream = processStream.getSideOutput(bigOutTag)
			
			    littleStream.print("little------")
			    bigStream.print("big------")
			    env.execute()
		  }
}

结果:

little------:1> 95,2011-04-05,2,0,4,0,2,1,2,0.414167,0.39835,0.642083,0.388067,167,1628,1795
little------:6> 323,2011-11-19,4,0,11,0,6,0,1,0.329167,0.324483,0.502083,0.224496,943,2720,3663
big------:11> 552,2012-07-05,3,1,7,0,4,1,1,0.8275,0.761367,0.457917,0.194029,1405,4836,6241
little------:6> 324,2011-11-20,4,0,11,0,0,0,2,0.463333,0.457058,0.684583,0.18595,787,2733,3520
big------:11> 553,2012-07-06,3,1,7,0,5,1,1,0.828333,0.752533,0.450833,0.146142,1366,4841,6207
little------:6> 325,2011-11-21,4,0,11,0,1,1,3,0.4475,0.445062,0.91,0.138054,220,2545,2765
...

总结:

Flink 最新提供的 SideOutPut 方式拆分流是可以多次进行拆分的,无需担心会报出异常。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/915552.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存