flink 1.10.1 + kafka实现流数据时间窗口平均数计算(java版本)

flink 1.10.1 + kafka实现流数据时间窗口平均数计算(java版本),第1张

flink 1.10.1 + kafka实现流数据时间窗口平均数计算(java版本) 1. 在idea创建maven项目并添加依赖

        8
        8
        1.10.1
        1.2.17
        1.7.7
        2.11
    


        
            org.slf4j
            slf4j-log4j12
            ${slf4j.version}
        
        
            log4j
            log4j
            ${log4j.version}
        
        
        
            org.apache.flink
            flink-java
            ${flink.version}
            ${project.build.scope}
        
        
        
            org.apache.flink
            flink-streaming-java_${scala.version}
            ${flink.version}
            ${project.build.scope}
        
        
        
            org.apache.flink
            flink-runtime-web_${scala.version}
            ${flink.version}
            ${project.build.scope}
        
        
            org.apache.flink
            flink-core
            1.10.1
        

        
            org.apache.flink
            flink-runtime_2.11
            1.10.1
        

        
            org.apache.flink
            flink-connector-kafka_2.11
            1.10.1
        

    

这里的scala版本选择的是2.11,flink-runtime_2.11在idea开发环境运行时,需要添加此依赖。

2. 添加主功能代码
package com.demo;


import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class FlinkWindowAvgKafkaStreaming {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000); // 设置启动检查点!!
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "flink-group");

        FlinkKafkaConsumer consumer =
                new FlinkKafkaConsumer<>("flink-topic", new SimpleStringSchema(), props);
        consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());

        DataStream> keyedStream = env
                .addSource(consumer)
                .flatMap(new MessageSplitter())
                .keyBy(0)
                .timeWindow(Time.seconds(10))
                .apply(new WindowFunction, Tuple3, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable> input, Collector> out) throws Exception {
                        long sum = 0L;
                        int count = 0;
                        for (Tuple2 record: input) {
                            sum += record.f1;
                            count++;
                        }
                        Tuple2 temp = input.iterator().next();

                        // 统计数据按三元组形式输出
                        Tuple3 result = new Tuple3(temp.f0, sum / count, window.getEnd());

                        out.collect(result);
                    }
                });
        
        keyedStream.print("output");
        env.execute("Flink-Kafka demo");
    }

}

从kafka读取数据,对数据进行转换,对转换后的数据先进行分组,然后进行开窗,在窗口范围内计算平均数,并且输出计算的平均数和窗口结束时间。

其中窗口时间为10秒。

3. kafka的模拟消息
1643685175905,machine-1,5436289024
1643685176920,machine-1,5422505984
1643685177924,machine-1,5431537664
1643685178935,machine-1,5425504256
1643685179940,machine-1,5430718464
1643685180947,machine-1,5437231104
1643685181960,machine-1,5522214912
1643685182965,machine-1,5745750016
1643685183976,machine-1,5746868224

模拟数据可以手动通过输入kafka消息生产者进行生成,也可以结合java maven写入kafka消息demo 进行生成。

4. 辅助代码(MessageWaterEmitter)
package com.demo;

import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks {
    //@Nullable
    @Override
    public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
        if (lastElement != null && lastElement.contains(",")) {
            String[] parts = lastElement.split(",");
            return new Watermark(Long.parseLong(parts[0]));
        }
        return null;
    }

    @Override
    public long extractTimestamp(String element, long previousElementTimestamp) {
        if (element != null && element.contains(",")) {
            String[] parts = element.split(",");
            return Long.parseLong(parts[0]);
        }
        return 0L;
    }
}

这里定义了时间watermark(水位线)的获取方式。

5. 辅助代码(MessageSplitter)
package com.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class MessageSplitter implements FlatMapFunction> {

    @Override
    public void flatMap(String value, Collector> out) throws Exception {
        if (value != null && value.contains(",")) {
            String[] parts = value.split(",");
            out.collect(new Tuple2<>(parts[1], Long.parseLong(parts[2])));
        }
    }
}
6. 运行程序,输出结果

 可以看出每隔10秒,就有一组窗口平均数输出。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5718885.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存