这是一个计算每个驾驶员乘坐次数的示例。
关键类TaxiRide 出租车乘车事件抽象类DataGenerator 模拟数据生成器Tuple 数据类型
元组是包含固定数量的各种类型的字段的复合类型。Java API 提供从Tuple1最高到Tuple25. 元组的每个字段都可以是任意 Flink 类型,包括更多元组,从而产生嵌套元组。tuple.f4可以使用字段名称 as或使用通用 getter 方法 直接访问元组的字段tuple.getField(int position)。字段索引从 0 开始。请注意,这与 Scala 元组相反,但它更符合 Java 的一般索引。
RideCountExample 代码
public class RideCountExample { public static void main(String[] args) throws Exception { // set up streaming execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 增加源数据 DataStreamrides = env.addSource(new TaxiRideGenerator()); // 聚合源数据成一个元素数据流 DataStream > tuples = rides.map(new MapFunction >() { @Override public Tuple2 map(TaxiRide ride) { return Tuple2.of(ride.driverId, 1L); } }); // 根据driverId对数据源进行分区 KeyedStream , Long> keyedByDriverId = tuples.keyBy(t -> t.f0); // 聚合计算司机数量 sum(1) 中的参数是分区号 DataStream > rideCounts = keyedByDriverId.sum(1); // we could, in fact, print out any or all of these streams rideCounts.print(); // run the cleansing pipeline env.execute("Ride Count"); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)