学习flink-1.14 自定义分区

学习flink-1.14 自定义分区,第1张

flink-1.14自定义分区
  • 前言
      • 注意点


前言

flink提供了可以实现的自定义分区设置 partitionCustom

def partitionCustom[K](partitioner : org.apache.flink.api.common.functions.Partitioner[K], fun : scala.Function1[T, K])(implicit evidence$6 : org.apache.flink.api.common.typeinfo.TypeInformation[K]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }

两个参数:
org.apache.flink.api.common.functions.Partitioner[K],
fun : scala.Function1[T, K]

第一个参数是分区选择器
第二参数选择函数,也就是根据什么内容跟选择

	// 偶数的数据到2号分区,奇数的数据到2 号分区
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(4)  //设置并行度
    env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
      .partitionCustom((k: Int, i: Int) => {
        if (k % 2 == 0) 0 else 1
      }, (x: Int) => x)
      .print()

输出的结果是

1> 2
1> 4
1> 6
2> 1
2> 3
2> 5
2> 7
1> 8

可以看出并行度根据我们自定义的方式,分发到两个分区

注意点

设置分区选择器的时候,该数据流的分区数不能小于选择器中分区个数,不然会抛出下标越界的异常

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)  //设置并行度

比如我在最顶层设置数据并行度是1
最后抛出这个异常

	... 6 more
Caused by: java.lang.RuntimeException: Index 1 out of bounds for length 1
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
	at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:231)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/728861.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存