重分区 *** 作,在DataStream类中可以看到很多Partitioner字眼的类。
其中partitionCustom(...)方法用于自定义重分区。
java代码:
package apitest.transform; import apitest.beans.SensorReading; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class TransformTest6_Partition { public static void main(String[] args) throws Exception{ // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 设置并行度 = 4 env.setParallelism(4); // 从文件读取数据 DataStreaminputStream = env.readTextFile("F:\JAVA\bigdata2107\zch\flink\src\main\resources\Sensor.txt"); // 转换成SensorReading类型 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // SingleOutputStreamOperator多并行度默认就rebalance,轮询方式分配 dataStream.print("input"); // 1. shuffle (并非批处理中的获取一批后才打乱,这里每次获取到直接打乱且分区) DataStream shuffleStream = inputStream.shuffle(); shuffleStream.print("shuffle"); // 2. keyBy (Hash,然后取模) dataStream.keyBy(SensorReading::getId).print("keyBy"); // 3. global (直接发送给第一个分区,少数特殊情况才用) dataStream.global().print("global"); env.execute(); } }
输出:
input:3> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8} input:3> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4} input:1> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3} input:1> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8} shuffle:2> sensor_6,1547718201,15.4 shuffle:1> sensor_1,1547718199,35.8 input:4> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7} input:4> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1} shuffle:1> sensor_1,1547718207,36.3 shuffle:2> sensor_1,1547718209,32.8 global:1> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8} keyBy:3> SensorReading{id='sensor_1', timestamp=1547718199, temperature=35.8} global:1> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4} keyBy:3> SensorReading{id='sensor_6', timestamp=1547718201, temperature=15.4} keyBy:3> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3} keyBy:3> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8} global:1> SensorReading{id='sensor_1', timestamp=1547718207, temperature=36.3} shuffle:1> sensor_7,1547718202,6.7 global:1> SensorReading{id='sensor_1', timestamp=1547718209, temperature=32.8} shuffle:2> sensor_10,1547718205,38.1 input:2> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1} global:1> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7} keyBy:4> SensorReading{id='sensor_7', timestamp=1547718202, temperature=6.7} keyBy:2> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1} global:1> SensorReading{id='sensor_10', timestamp=1547718205, temperature=38.1} shuffle:1> sensor_1,1547718212,37.1 keyBy:3> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1} global:1> SensorReading{id='sensor_1', timestamp=1547718212, temperature=37.1}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)