- 前言
- 注意点
前言
flink提供了可以实现的自定义分区设置 partitionCustom
def partitionCustom[K](partitioner : org.apache.flink.api.common.functions.Partitioner[K], fun : scala.Function1[T, K])(implicit evidence$6 : org.apache.flink.api.common.typeinfo.TypeInformation[K]) : org.apache.flink.streaming.api.scala.DataStream[T] = { /* compiled code */ }
两个参数:
org.apache.flink.api.common.functions.Partitioner[K],
fun : scala.Function1[T, K]
第一个参数是分区选择器
第二参数选择函数,也就是根据什么内容跟选择
// 偶数的数据到2号分区,奇数的数据到2 号分区
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4) //设置并行度
env.fromElements(1, 2, 3, 4, 5, 6, 7, 8)
.partitionCustom((k: Int, i: Int) => {
if (k % 2 == 0) 0 else 1
}, (x: Int) => x)
.print()
输出的结果是
1> 2
1> 4
1> 6
2> 1
2> 3
2> 5
2> 7
1> 8
可以看出并行度根据我们自定义的方式,分发到两个分区
注意点设置分区选择器的时候,该数据流的分区数不能小于选择器中分区个数,不然会抛出下标越界的异常
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1) //设置并行度
比如我在最顶层设置数据并行度是1
最后抛出这个异常
... 6 more
Caused by: java.lang.RuntimeException: Index 1 out of bounds for length 1
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
at org.apache.flink.streaming.api.functions.source.FromElementsFunction.run(FromElementsFunction.java:231)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)