最近打算用fink 处理历史数据,既是flink 读取clickhouse数据,做数据回放,数据力度向上汇聚。
1.自定义clickhouse数据源自定义数据源
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.types.Row; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; public class SourceFormClickhouse extends RichSourceFunction2. 引入数据源开始处理数据{ PreparedStatement ps = null; ResultSet result = null; Connection conn = null; private boolean flag = true; @Override public void open(Configuration parameters) throws Exception { Connection conn = JdbcUtil.getClickhouseConnection(); StringBuilder sql = new StringBuilder(); sql.append("SELECtn" + "tdate_time,n" + "ths_security_id,n" + "tsecurity_id,n" + "tpre_close_px,n" + "topen_px,n" + "thigh_px,n" + "tlow_px,n" + "tlast_px,n" + "tnum_trades,n" + "tvolume,n" + "tamount,n" + "tphase_code,n" + "tbid_price,n" + "tbid_qty,n" + "toffer_price,n" + "toffer_qty n" + "FROMn" + "xxx"); ps = conn.prepareStatement(sql.toString()); super.open(parameters); } @Override public void run(SourceContext ctx) throws Exception { while (flag) { result = ps.executeQuery(); while (result.next()) { Row row = new Row(16); row.setField(0, result.getString("date_time")); row.setField(1, result.getString("hs_security_id")); row.setField(2, result.getString("security_id")); row.setField(3, result.getLong("pre_close_px")); row.setField(4, result.getLong("open_px")); row.setField(5, result.getLong("high_px")); row.setField(6, result.getLong("low_px")); row.setField(7, result.getLong("last_px")); row.setField(8, result.getLong("num_trades")); row.setField(9, result.getLong("volume")); row.setField(10, result.getLong("amount")); row.setField(11, result.getLong("phase_code")); // 数组用字符串接收 row.setField(12, result.getString("bid_price")); row.setField(13, result.getString("bid_qty")); row.setField(14, result.getString("offer_price")); row.setField(15, result.getString("offer_qty")); ctx.collect(row); } } } @Override public void cancel() { flag = false; } @Override public void close() throws Exception { JdbcUtil.close(conn, ps, result); } }
这里注意 keyBy 数据的使用,keyBy 一个字段 只需在里面遍历对象获取即可,如果是想keyBy后多个字段进行分组,keyby(“field1,field2,field3”)这种方式之前可用,但是我使用的是flink 版本是1.13.1 这个方法已经废弃。
现在新的处理方式 可用用元组来定义你需要排序的字段。
dataStreamSource.keyBy(t-> Tuple2.of(t.getHsSecurityId(),t.getSecurityId()))
public class OfflineDataAggregation implements JobRunner, Serializable { @Override public void run(String[] args) throws Throwable { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcedataStreamSource = env.addSource(new SourceFormClickhouse2()); dataStreamSource.keyBy(t-> Tuple2.of(t.getHsSecurityId(),t.getSecurityId())) .window(TumblingEventTimeWindows.of(Time.seconds(15))) .process(new ProcessWindowFunction , TimeWindow>() { @Override public void process(Tuple2 stringStringTuple2, Context context, Iterable elements, Collector
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)