上文 >>>大数据项目之Flink实时数仓(数据采集/ODS层)
接着:
上一篇文章中简单把实时数仓数据采集以及ODS层搭建完成,开始搭建DWD层
DWD层搭建思路:从kafka的ods层读取用户行为数据和业务数据,进行简单处理,再写入到kafka dwd层
首先分析ods层的用户日志数据,分为页面日志,启动日志,曝光日志,三类数据结构不同,需要进行拆分,将拆分后数据再写回到kafka,作为日志dwd层
页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流
public static FlinkKafkaConsumerFlink调用消费者读取数据并进行处理:getKafkaSource(String topic, String groupId) { //给配置信息对象添加配置项 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); //获取 KafkaSource return new FlinkKafkaConsumer (topic, new SimpleStringSchema(), properties); }
数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd)
程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> baseLogApp -> kafka
//TODO 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //TODO 2.消费 ods_base_log 主题数据创建流 String sourceTopic = "ods_base_log"; String groupId = "base_log_app_210325"; DataStreamSource业务数据(DWD) 分析业务数据kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId)); //TODO 3.将每行数据转换为JSON对象(过滤脏数据) OutputTag outputTag = new OutputTag ("Dirty") {}; SingleOutputStreamOperator jsonObjDS = kafkaDS.process(new ProcessFunction () { @Override public void processElement(String value, Context ctx, Collector out) throws Exception { try { //将value转为Json格式 JSONObject jsonObject = JSON.parseObject(value); out.collect(jsonObject); } catch (Exception e) { //发生异常,将数据写入侧输出流 ctx.output(outputTag, value); } } }); //TODO 4.新老用户校验 状态编程 SingleOutputStreamOperator jsonObjWithNewFlagDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid")) .map(new RichMapFunction () { private ValueState valueState; @Override public void open(Configuration parameters) throws Exception { valueState = getRuntimeContext().getState(new ValueStateDescriptor ("value-state", String.class)); } @Override public JSONObject map(JSONObject value) throws Exception { //获取数据中的"is_new"标记 String isNew = value.getJSONObject("common").getString("is_new"); //判断isNew标记是否为"1" if ("1".equals(isNew)) { //获取状态数据 String state = valueState.value(); if (state != null) { //修改isNew标记 value.getJSONObject("common").put("is_new", "0"); } else { valueState.update("1"); } } return value; } }); //TODO 5.分流 侧输出流 页面:主流 启动:侧输出流 曝光:侧输出流 OutputTag startTag = new OutputTag ("start") {}; OutputTag displayTag = new OutputTag ("display") {}; SingleOutputStreamOperator pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction () { @Override public void processElement(JSONObject value, Context ctx, Collector out) throws Exception { //获取启动日志字段 String start = value.getString("start"); if (start != null && start.length() > 0) { //将数据写入启动日志侧输出流 ctx.output(startTag, value.toJSONString()); } else { //将数据写入页面日志主流 out.collect(value.toJSONString()); //取出数据中的曝光数据 JSONArray displays = value.getJSONArray("displays"); if (displays != null && displays.size() > 0) { //获取页面ID String pageId = value.getJSONObject("page").getString("page_id"); for (int i = 0; i < displays.size(); i++) { JSONObject display = displays.getJSONObject(i); //添加页面id display.put("page_id", pageId); //将输出写出到曝光侧输出流 ctx.output(displayTag, display.toJSONString()); } } } } }); //TODO 6.提取侧输出流 DataStream startDS = pageDS.getSideOutput(startTag); DataStream displayDS = pageDS.getSideOutput(displayTag); //TODO 7.将三个流进行打印并输出到对应的Kafka主题中 startDS.print("Start>>>>>>>>>>>"); pageDS.print("Page>>>>>>>>>>>"); displayDS.print("Display>>>>>>>>>>>>"); startDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_start_log")); pageDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_page_log")); displayDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_display_log")); //TODO 8.启动任务 env.execute("baseLogApp");
业务数据的变化,可以通过FlinkCDC采集到,但是FlinkCDC将数据统一写入一个Topic中,但是数据包含事实数据和维度数据,数据是从kafka业务数据ods层读取数据,经过处理后将维度数据保存在hbase中, 将事实数据写入kafka作为业务数据的dwd层。
封装Sink数据到Kafka主题方法:public static接受Kafka数据进行处理作为业务数据DWD层:FlinkKafkaProducer getKafkaSinkBySchema(KafkaSerializationSchema kafkaSerializationSchema) { properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5 * 60 * 1000 + ""); return new FlinkKafkaProducer (DEFAULT_TOPIC, kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); }
//TODO 1.获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //TODO 2.消费Kafka ods_base_db 主题数据创建流 String sourceTopic = "ods_base_db"; String groupId = "base_db_app_210325"; DataStreamSourcekafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId)); //TODO 3.将每行数据转换为JSON对象并过滤(delete) 主流 SingleOutputStreamOperator jsonObjDS = kafkaDS.map(JSON::parseObject) .filter(new FilterFunction () { @Override public boolean filter(JSONObject value) throws Exception { //取出数据的 *** 作类型 String type = value.getString("type"); return !"delete".equals(type); } }); //TODO 4.使用FlinkCDC消费配置表并处理成 广播流 DebeziumSourceFunction sourceFunction = MySQLSource. builder() .hostname("master") .port(3306) .username("root") .password("000000") .databaseList("mall2021_realtime") .tableList("mall2021_realtime.table_process") .startupOptions(StartupOptions.initial()) .deserializer(new CustomerDeserialization()) .build(); DataStreamSource tableProcessStrDS = env.addSource(sourceFunction); MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class); BroadcastStream broadcastStream = tableProcessStrDS.broadcast(mapStateDescriptor); //TODO 5.连接主流和广播流 BroadcastConnectedStream connectedStream = jsonObjDS.connect(broadcastStream); //TODO 6.分流 处理数据 广播流数据,主流数据(根据广播流数据进行处理) OutputTag hbaseTag = new OutputTag ("hbase-tag") { }; SingleOutputStreamOperator kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor)); //TODO 7.提取Kafka流数据和Hbase流数据 DataStream hbase = kafka.getSideOutput(hbaseTag); //TODO 8.将Kafka数据写入Kafka主题,将Hbase数据写入Phoenix表 kafka.print("Kafka>>>>>>>>"); hbase.print("Hbase>>>>>>>>"); hbase.addSink(new DimSinkFunction()); kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema () { @Override public ProducerRecord serialize(JSONObject element, @Nullable Long timestamp) { return new ProducerRecord (element.getString("sinkTable"), element.getString("after").getBytes()); } })); //TODO 9.启动任务 env.execute("baseDBApp");
注意这里有个实体类:
@Data public class TableProcess { //动态分流 Sink 常量 public static final String SINK_TYPE_Hbase = "hbase"; public static final String SINK_TYPE_KAFKA = "kafka"; public static final String SINK_TYPE_CK = "clickhouse"; //暂时用不到 //来源表 String sourceTable; // *** 作类型 insert,update,delete String operateType; //输出类型 hbase kafka String sinkType; //输出表(主题) String sinkTable; //输出字段 String sinkColumns; //主键字段 String sinkPk; //建表扩展 String sinkExtend; }常用配置常量值封装
//Phoenix 库名 public static final String Hbase_SCHEMA = "MALL2021_REALTIME"; //Phoenix 驱动 public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver"; //Phoenix 连接参数 public static final String PHOENIX_SERVER = "jdbc:phoenix:master,slave,slave1:2181";TableProcessFunction
public TableProcessFunction(OutputTagTableProcessFunction-openobjectOutputTag, MapStateDescriptor mapStateDescriptor) { this.objectOutputTag = objectOutputTag; this.mapStateDescriptor = mapStateDescriptor; }
public void open(Configuration parameters) throws Exception { Class.forName(GmallConfig.PHOENIX_DRIVER); connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER); }TableProcessFunction-processBroadcastElement
//value:{“db”:"",“tn”:"",“before”:{},“after”:{},“type”:""} //数据格式
public void processBroadcastElement(String value, Context ctx, CollectorTableProcessFunction-checkTableout) throws Exception { //1.获取并解析数据 JSONObject jsonObject = JSON.parseObject(value); String data = jsonObject.getString("after"); TableProcess tableProcess = JSON.parseObject(data, TableProcess.class); //2.建表 if (TableProcess.SINK_TYPE_Hbase.equals(tableProcess.getSinkType())) { checkTable(tableProcess.getSinkTable(), tableProcess.getSinkColumns(), tableProcess.getSinkPk(), tableProcess.getSinkExtend()); } //3.写入状态,广播出去 BroadcastState broadcastState = ctx.getBroadcastState(mapStateDescriptor); String key = tableProcess.getSourceTable() + "-" + tableProcess.getOperateType(); broadcastState.put(key, tableProcess); }
create table if not exists db.tn(id varchar primary key,tm_name varchar) xxx; //建表语句
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) { PreparedStatement preparedStatement = null; try { if (sinkPk == null) { sinkPk = "id"; } if (sinkExtend == null) { sinkExtend = ""; } StringBuffer createTableSQL = new StringBuffer("create table if not exists ") .append(GmallConfig.Hbase_SCHEMA) .append(".") .append(sinkTable) .append("("); String[] fields = sinkColumns.split(","); for (int i = 0; i < fields.length; i++) { String field = fields[i]; //判断是否为主键 if (sinkPk.equals(field)) { createTableSQL.append(field).append(" varchar primary key "); } else { createTableSQL.append(field).append(" varchar "); } //判断是否为最后一个字段,如果不是,则添加"," if (i < fields.length - 1) { createTableSQL.append(","); } } createTableSQL.append(")").append(sinkExtend); //打印建表语句 System.out.println(createTableSQL); //预编译SQL preparedStatement = connection.prepareStatement(createTableSQL.toString()); //执行 preparedStatement.execute(); } catch (SQLException e) { throw new RuntimeException("Phoenix表" + sinkTable + "建表失败!"); } finally { if (preparedStatement != null) { try { preparedStatement.close(); } catch (SQLException e) { e.printStackTrace(); } } } }TableProcessFunction-processElement
value:{“db”:"",“tn”:"",“before”:{},“after”:{},“type”:""}
public void processElement(JSONObject value, ReadOnlyContext ctx, CollectorTableProcessFunction-filterColumnout) throws Exception { //1.获取状态数据 ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(mapStateDescriptor); String key = value.getString("tableName") + "-" + value.getString("type"); TableProcess tableProcess = broadcastState.get(key); if (tableProcess != null) { //2.过滤字段 JSONObject data = value.getJSONObject("after"); filterColumn(data, tableProcess.getSinkColumns()); //3.分流 //将输出表/主题信息写入Value value.put("sinkTable", tableProcess.getSinkTable()); String sinkType = tableProcess.getSinkType(); if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) { //Kafka数据,写入主流 out.collect(value); } else if (TableProcess.SINK_TYPE_Hbase.equals(sinkType)) { //Hbase数据,写入侧输出流 ctx.output(objectOutputTag, value); } } else { System.out.println("该组合Key:" + key + "不存在!"); } }
@param data
{“id”:“11”,“tm_name”:“keven”,“logo_url”:“aaa”}
@param sinkColumns id,tm_name
{“id”:“11”,“tm_name”:“keven”}
private void filterColumn(JSONObject data, String sinkColumns) { String[] fields = sinkColumns.split(","); ListbaseDBApp 中调用 TableProcessFunction 进行分流columns = Arrays.asList(fields); data.entrySet().removeIf(next -> !columns.contains(next.getKey())); }
//TODO 5.连接主流和广播流 BroadcastConnectedStream维度数据(DIM) 分析维度数据connectedStream = jsonObjDS.connect(broadcastStream); //TODO 6.分流 处理数据 广播流数据,主流数据(根据广播流数据进行处理) OutputTag hbaseTag = new OutputTag ("hbase-tag") { }; SingleOutputStreamOperator kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor)); //TODO 7.提取Kafka流数据和Hbase流数据 DataStream hbase = kafka.getSideOutput(hbaseTag);
DimSink 继承了 RickSinkFunction,这个 function 得分两条时间线。
一条是任务启动时执行 open *** 作,可以把连接的初始化工作放在此处一次性执行。
另一条是随着每条数据的到达反复执行 invoke(),实
现数据的保存,主要策略就是根据数据组合成 sql 提交给 hbase。
public class DimSinkFunction extends RichSinkFunction分流Sink业务数据保存Kafka{ private Connection connection; @Override public void open(Configuration parameters) throws Exception { Class.forName(mallConfig.PHOENIX_DRIVER); connection = DriverManager.getConnection(mallConfig.PHOENIX_SERVER); connection.setAutoCommit(true); } //value:{"sinkTable":"dim_base_trademark","database":"mall-210325-flink","before":{"tm_name":"keven","id":12},"after":{"tm_name":"Keven","id":12},"type":"update","tableName":"base_trademark"} //SQL:upsert into db.tn(id,tm_name) values('...','...') @Override public void invoke(JSONObject value, Context context) throws Exception { PreparedStatement preparedStatement = null; try { //获取SQL语句 String sinkTable = value.getString("sinkTable"); JSONObject after = value.getJSONObject("after"); String upsertSql = genUpsertSql(sinkTable, after); System.out.println(upsertSql); //预编译SQL preparedStatement = connection.prepareStatement(upsertSql); //执行插入 *** 作 preparedStatement.executeUpdate(); } catch (SQLException e) { e.printStackTrace(); } finally { if (preparedStatement != null) { preparedStatement.close(); } } } //data:{"tm_name":"kevenhe","id":12} //SQL:upsert into db.tn(id,tm_name,aa,bb) values('...','...','...','...') private String genUpsertSql(String sinkTable, JSONObject data) { Set keySet = data.keySet(); Collection
封装sink方法:
public static总结(DWD)FlinkKafkaProducer getKafkaSinkBySchema(KafkaSerializationSchema kafkaSerializationSchema) { properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5 * 60 * 1000 + ""); return new FlinkKafkaProducer (DEFAULT_TOPIC, kafkaSerializationSchema, properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE); }
DWD的实时计算核心是数据分流,再就是状态识别,看一下使用过的一些算子RichMapFunction,ProcessFunction,RichSinkFunction。关于这些算子的选择,可以参考下面:
可以看出Rich函数功能强大,Processfunction功能更强大,但是功能越全面的函数使用越繁琐。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)