Flink之keyBy开窗实战

Flink之keyBy开窗实战,第1张

Flink之keyBy开窗实战 前言

最近打算用fink 处理历史数据,既是flink 读取clickhouse数据,做数据回放,数据力度向上汇聚。

1.自定义clickhouse数据源

自定义数据源

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.types.Row;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


public class SourceFormClickhouse extends RichSourceFunction {

    
    PreparedStatement ps = null;
    
    ResultSet result = null;
    
    Connection conn = null;
    
    private boolean flag = true;

    @Override
    public void open(Configuration parameters) throws Exception {
        Connection conn = JdbcUtil.getClickhouseConnection();
        StringBuilder sql = new StringBuilder();
        sql.append("SELECtn" +
                "tdate_time,n" +
                "ths_security_id,n" +
                "tsecurity_id,n" +
                "tpre_close_px,n" +
                "topen_px,n" +
                "thigh_px,n" +
                "tlow_px,n" +
                "tlast_px,n" +
                "tnum_trades,n" +
                "tvolume,n" +
                "tamount,n" +
                "tphase_code,n" +
                "tbid_price,n" +
                "tbid_qty,n" +
                "toffer_price,n" +
                "toffer_qty n" +
                "FROMn" +
                "xxx");
        ps = conn.prepareStatement(sql.toString());
        super.open(parameters);
    }

    @Override
    public void run(SourceContext ctx) throws Exception {
        while (flag) {
            result = ps.executeQuery();
            while (result.next()) {
                Row row = new Row(16);
                row.setField(0, result.getString("date_time"));
                row.setField(1, result.getString("hs_security_id"));
                row.setField(2, result.getString("security_id"));
                row.setField(3, result.getLong("pre_close_px"));
                row.setField(4, result.getLong("open_px"));
                row.setField(5, result.getLong("high_px"));
                row.setField(6, result.getLong("low_px"));
                row.setField(7, result.getLong("last_px"));
                row.setField(8, result.getLong("num_trades"));
                row.setField(9, result.getLong("volume"));
                row.setField(10, result.getLong("amount"));
                row.setField(11, result.getLong("phase_code"));
                // 数组用字符串接收
                row.setField(12, result.getString("bid_price"));
                row.setField(13, result.getString("bid_qty"));
                row.setField(14, result.getString("offer_price"));
                row.setField(15, result.getString("offer_qty"));
                ctx.collect(row);
            }
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }

    @Override
    public void close() throws Exception {
        JdbcUtil.close(conn, ps, result);
    }
}


2. 引入数据源开始处理数据

这里注意 keyBy 数据的使用,keyBy 一个字段 只需在里面遍历对象获取即可,如果是想keyBy后多个字段进行分组,keyby(“field1,field2,field3”)这种方式之前可用,但是我使用的是flink 版本是1.13.1 这个方法已经废弃。
现在新的处理方式 可用用元组来定义你需要排序的字段。

 dataStreamSource.keyBy(t-> Tuple2.of(t.getHsSecurityId(),t.getSecurityId()))

public class OfflineDataAggregation implements JobRunner, Serializable {
    @Override
    public void run(String[] args) throws Throwable {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource dataStreamSource = env.addSource(new SourceFormClickhouse2());
        dataStreamSource.keyBy(t-> Tuple2.of(t.getHsSecurityId(),t.getSecurityId()))
                .window(TumblingEventTimeWindows.of(Time.seconds(15)))
                .process(new ProcessWindowFunction, TimeWindow>() {
                    @Override
                    public void process(Tuple2 stringStringTuple2, Context context, Iterable elements, Collector out) throws Exception {

                    }
                });
        dataStreamSource.print();
        env.execute("快照数据读取");
    }
}

					
										


					

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/3975702.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-10-21
下一篇 2022-10-21

发表评论

登录后才能评论

评论列表(0条)