大数据项目之Flink实时数仓(DWDDIM层)

大数据项目之Flink实时数仓(DWDDIM层),第1张

数据项目之Flink实时数仓(DWD/DIM层)

上文 >>>大数据项目之Flink实时数仓(数据采集/ODS层)
接着:
上一篇文章中简单把实时数仓数据采集以及ODS层搭建完成,开始搭建DWD层
DWD层搭建思路:从kafka的ods层读取用户行为数据和业务数据,进行简单处理,再写入到kafka dwd层

用户日志数据(DWD) 分析用户日志数据

首先分析ods层的用户日志数据,分为页面日志,启动日志,曝光日志,三类数据结构不同,需要进行拆分,将拆分后数据再写回到kafka,作为日志dwd层
页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流

kafka工具类提供获取kafka消费者的方法:
 public static FlinkKafkaConsumer getKafkaSource(String topic, String groupId) {
 //给配置信息对象添加配置项
 properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 //获取 KafkaSource
 return new FlinkKafkaConsumer(topic, new SimpleStringSchema(), properties);
 }
Flink调用消费者读取数据并进行处理:

数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd)
程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> baseLogApp -> kafka

        //TODO 1.获取执行环境
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);

       //TODO 2.消费 ods_base_log 主题数据创建流
       String sourceTopic = "ods_base_log";
       String groupId = "base_log_app_210325";
       DataStreamSource kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));

       //TODO 3.将每行数据转换为JSON对象(过滤脏数据)
       OutputTag outputTag = new OutputTag("Dirty") {};
       SingleOutputStreamOperator jsonObjDS = kafkaDS.process(new ProcessFunction() {
           @Override
           public void processElement(String value, Context ctx, Collector out) throws Exception {
               try {
                   //将value转为Json格式
                   JSONObject jsonObject = JSON.parseObject(value);
                   out.collect(jsonObject);
               } catch (Exception e) {
                   //发生异常,将数据写入侧输出流
                   ctx.output(outputTag, value);
               }
           }
       });
       //TODO 4.新老用户校验  状态编程
       SingleOutputStreamOperator jsonObjWithNewFlagDS = jsonObjDS.keyBy(jsonObj -> jsonObj.getJSONObject("common").getString("mid"))
               .map(new RichMapFunction() {
                   private ValueState valueState;
                   @Override
                   public void open(Configuration parameters) throws Exception {
                       valueState = getRuntimeContext().getState(new ValueStateDescriptor("value-state", String.class));
                   }
                   @Override
                   public JSONObject map(JSONObject value) throws Exception {
                       //获取数据中的"is_new"标记
                       String isNew = value.getJSONObject("common").getString("is_new");
                       //判断isNew标记是否为"1"
                       if ("1".equals(isNew)) {
                           //获取状态数据
                           String state = valueState.value();
                           if (state != null) {
                               //修改isNew标记
                               value.getJSONObject("common").put("is_new", "0");
                           } else {
                               valueState.update("1");
                           }
                       }
                       return value;
                   }
               });
       //TODO 5.分流  侧输出流  页面:主流  启动:侧输出流  曝光:侧输出流
       OutputTag startTag = new OutputTag("start") {};
       OutputTag displayTag = new OutputTag("display") {};
       SingleOutputStreamOperator pageDS = jsonObjWithNewFlagDS.process(new ProcessFunction() {
           @Override
           public void processElement(JSONObject value, Context ctx, Collector out) throws Exception {
               //获取启动日志字段
               String start = value.getString("start");
               if (start != null && start.length() > 0) {
                   //将数据写入启动日志侧输出流
                   ctx.output(startTag, value.toJSONString());
               } else {
                   //将数据写入页面日志主流
                   out.collect(value.toJSONString());
                   //取出数据中的曝光数据
                   JSONArray displays = value.getJSONArray("displays");
                   if (displays != null && displays.size() > 0) {
                       //获取页面ID
                       String pageId = value.getJSONObject("page").getString("page_id");
                       for (int i = 0; i < displays.size(); i++) {
                           JSONObject display = displays.getJSONObject(i);
                           //添加页面id
                           display.put("page_id", pageId);
                           //将输出写出到曝光侧输出流
                           ctx.output(displayTag, display.toJSONString());
                       }
                   }
               }
           }
       });
       
       //TODO 6.提取侧输出流
       DataStream startDS = pageDS.getSideOutput(startTag);
       DataStream displayDS = pageDS.getSideOutput(displayTag);
       
       //TODO 7.将三个流进行打印并输出到对应的Kafka主题中
       startDS.print("Start>>>>>>>>>>>");
       pageDS.print("Page>>>>>>>>>>>");
       displayDS.print("Display>>>>>>>>>>>>");
       
       startDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_start_log"));
       pageDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_page_log"));
       displayDS.addSink(MyKafkaUtil.getKafkaProducer("dwd_display_log"));

       //TODO 8.启动任务
       env.execute("baseLogApp");
业务数据(DWD) 分析业务数据

业务数据的变化,可以通过FlinkCDC采集到,但是FlinkCDC将数据统一写入一个Topic中,但是数据包含事实数据和维度数据,数据是从kafka业务数据ods层读取数据,经过处理后将维度数据保存在hbase中, 将事实数据写入kafka作为业务数据的dwd层。

封装Sink数据到Kafka主题方法:
public static  FlinkKafkaProducer getKafkaSinkBySchema(KafkaSerializationSchema
kafkaSerializationSchema) {
 properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5 * 60 * 1000 +
"");
 return new FlinkKafkaProducer(DEFAULT_TOPIC,
 kafkaSerializationSchema,
 properties,
 FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
接受Kafka数据进行处理作为业务数据DWD层:
        //TODO 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO 2.消费Kafka ods_base_db 主题数据创建流
        String sourceTopic = "ods_base_db";
        String groupId = "base_db_app_210325";
        DataStreamSource kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));

        //TODO 3.将每行数据转换为JSON对象并过滤(delete) 主流
        SingleOutputStreamOperator jsonObjDS = kafkaDS.map(JSON::parseObject)
                .filter(new FilterFunction() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        //取出数据的 *** 作类型
                        String type = value.getString("type");

                        return !"delete".equals(type);
                    }
                });

        //TODO 4.使用FlinkCDC消费配置表并处理成         广播流
        DebeziumSourceFunction sourceFunction = MySQLSource.builder()
                .hostname("master")
                .port(3306)
                .username("root")
                .password("000000")
                .databaseList("mall2021_realtime")
                .tableList("mall2021_realtime.table_process")
                .startupOptions(StartupOptions.initial())
                .deserializer(new CustomerDeserialization())
                .build();
        DataStreamSource tableProcessStrDS = env.addSource(sourceFunction);
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
        BroadcastStream broadcastStream = tableProcessStrDS.broadcast(mapStateDescriptor);

        //TODO 5.连接主流和广播流
        BroadcastConnectedStream connectedStream = jsonObjDS.connect(broadcastStream);

        //TODO 6.分流  处理数据  广播流数据,主流数据(根据广播流数据进行处理)
        OutputTag hbaseTag = new OutputTag("hbase-tag") {

        };
        SingleOutputStreamOperator kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor));

        //TODO 7.提取Kafka流数据和Hbase流数据
        DataStream hbase = kafka.getSideOutput(hbaseTag);
        //TODO 8.将Kafka数据写入Kafka主题,将Hbase数据写入Phoenix表
        kafka.print("Kafka>>>>>>>>");
        hbase.print("Hbase>>>>>>>>");

        hbase.addSink(new DimSinkFunction());
        kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema() {
            @Override
            public ProducerRecord serialize(JSONObject element, @Nullable Long timestamp) {
                return new ProducerRecord(element.getString("sinkTable"),
                        element.getString("after").getBytes());
            }
        }));
        
        //TODO 9.启动任务
        env.execute("baseDBApp");

注意这里有个实体类:

@Data
public class TableProcess {
 //动态分流 Sink 常量
 public static final String SINK_TYPE_Hbase = "hbase";
 public static final String SINK_TYPE_KAFKA = "kafka";
 public static final String SINK_TYPE_CK = "clickhouse"; //暂时用不到
 //来源表
 String sourceTable;
 // *** 作类型 insert,update,delete
 String operateType;
 //输出类型 hbase kafka
 String sinkType;
 //输出表(主题)
 String sinkTable;
 //输出字段
 String sinkColumns;
 //主键字段
 String sinkPk;
 //建表扩展
 String sinkExtend;
}
常用配置常量值封装
//Phoenix 库名
 public static final String Hbase_SCHEMA = "MALL2021_REALTIME";
 //Phoenix 驱动
 public static final String PHOENIX_DRIVER = "org.apache.phoenix.jdbc.PhoenixDriver";
 //Phoenix 连接参数
 public static final String PHOENIX_SERVER =
"jdbc:phoenix:master,slave,slave1:2181";
TableProcessFunction
    public TableProcessFunction(OutputTag objectOutputTag, MapStateDescriptor mapStateDescriptor) {
        this.objectOutputTag = objectOutputTag;
        this.mapStateDescriptor = mapStateDescriptor;
    }
TableProcessFunction-open
    public void open(Configuration parameters) throws Exception {
        Class.forName(GmallConfig.PHOENIX_DRIVER);
        connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
    }
TableProcessFunction-processBroadcastElement

//value:{“db”:"",“tn”:"",“before”:{},“after”:{},“type”:""} //数据格式

    public void processBroadcastElement(String value, Context ctx, Collector out) throws Exception {
        //1.获取并解析数据
        JSONObject jsonObject = JSON.parseObject(value);
        String data = jsonObject.getString("after");
        TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);
        //2.建表
        if (TableProcess.SINK_TYPE_Hbase.equals(tableProcess.getSinkType())) {
            checkTable(tableProcess.getSinkTable(),
                    tableProcess.getSinkColumns(),
                    tableProcess.getSinkPk(),
                    tableProcess.getSinkExtend());
        }
        //3.写入状态,广播出去
        BroadcastState broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = tableProcess.getSourceTable() + "-" + tableProcess.getOperateType();
        broadcastState.put(key, tableProcess);
    }
TableProcessFunction-checkTable

create table if not exists db.tn(id varchar primary key,tm_name varchar) xxx; //建表语句

    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
        PreparedStatement preparedStatement = null;
        try {
            if (sinkPk == null) {
                sinkPk = "id";
            }
            if (sinkExtend == null) {
                sinkExtend = "";
            }
            StringBuffer createTableSQL = new StringBuffer("create table if not exists ")
                    .append(GmallConfig.Hbase_SCHEMA)
                    .append(".")
                    .append(sinkTable)
                    .append("(");
            String[] fields = sinkColumns.split(",");
            for (int i = 0; i < fields.length; i++) {
                String field = fields[i];
                //判断是否为主键
                if (sinkPk.equals(field)) {
createTableSQL.append(field).append(" varchar primary key ");
                } else {
createTableSQL.append(field).append(" varchar ");
                }
                //判断是否为最后一个字段,如果不是,则添加","
                if (i < fields.length - 1) {
                    createTableSQL.append(",");
                }
            }
 createTableSQL.append(")").append(sinkExtend);
            //打印建表语句
            System.out.println(createTableSQL);
            //预编译SQL
            preparedStatement = connection.prepareStatement(createTableSQL.toString());
            //执行
            preparedStatement.execute();
        } catch (SQLException e) {
            throw new RuntimeException("Phoenix表" + sinkTable + "建表失败!");
        } finally {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
TableProcessFunction-processElement

value:{“db”:"",“tn”:"",“before”:{},“after”:{},“type”:""}

    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector out) throws Exception {
        //1.获取状态数据
        ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = value.getString("tableName") + "-" + value.getString("type");
        TableProcess tableProcess = broadcastState.get(key);
        if (tableProcess != null) {
            //2.过滤字段
            JSONObject data = value.getJSONObject("after");
            filterColumn(data, tableProcess.getSinkColumns());
            //3.分流
            //将输出表/主题信息写入Value
            value.put("sinkTable", tableProcess.getSinkTable());
            String sinkType = tableProcess.getSinkType();
            if (TableProcess.SINK_TYPE_KAFKA.equals(sinkType)) {
                //Kafka数据,写入主流
                out.collect(value);
            } else if (TableProcess.SINK_TYPE_Hbase.equals(sinkType)) {
                //Hbase数据,写入侧输出流
                ctx.output(objectOutputTag, value);
            }
        } else {
            System.out.println("该组合Key:" + key + "不存在!");
        }
    }
TableProcessFunction-filterColumn

@param data
{“id”:“11”,“tm_name”:“keven”,“logo_url”:“aaa”}
@param sinkColumns id,tm_name
{“id”:“11”,“tm_name”:“keven”}

    private void filterColumn(JSONObject data, String sinkColumns) {
        String[] fields = sinkColumns.split(",");
        List columns = Arrays.asList(fields);
        data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
    }
baseDBApp 中调用 TableProcessFunction 进行分流
        //TODO 5.连接主流和广播流
        BroadcastConnectedStream connectedStream = jsonObjDS.connect(broadcastStream);
        //TODO 6.分流  处理数据  广播流数据,主流数据(根据广播流数据进行处理)
        OutputTag hbaseTag = new OutputTag("hbase-tag") {
        };
        SingleOutputStreamOperator kafka = connectedStream.process(new TableProcessFunction(hbaseTag, mapStateDescriptor));
        //TODO 7.提取Kafka流数据和Hbase流数据
        DataStream hbase = kafka.getSideOutput(hbaseTag);
维度数据(DIM) 分析维度数据



DimSink 继承了 RickSinkFunction,这个 function 得分两条时间线。
一条是任务启动时执行 open *** 作,可以把连接的初始化工作放在此处一次性执行。
另一条是随着每条数据的到达反复执行 invoke(),实
现数据的保存,主要策略就是根据数据组合成 sql 提交给 hbase。

编写DimSink
public class DimSinkFunction extends RichSinkFunction {
    private Connection connection;
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName(mallConfig.PHOENIX_DRIVER);
        connection = DriverManager.getConnection(mallConfig.PHOENIX_SERVER);
        connection.setAutoCommit(true);
    }
    //value:{"sinkTable":"dim_base_trademark","database":"mall-210325-flink","before":{"tm_name":"keven","id":12},"after":{"tm_name":"Keven","id":12},"type":"update","tableName":"base_trademark"}
    //SQL:upsert into db.tn(id,tm_name) values('...','...')
    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        PreparedStatement preparedStatement = null;
        try {
            //获取SQL语句
            String sinkTable = value.getString("sinkTable");
            JSONObject after = value.getJSONObject("after");
            String upsertSql = genUpsertSql(sinkTable,
                    after);
            System.out.println(upsertSql);
            //预编译SQL
            preparedStatement = connection.prepareStatement(upsertSql);
            //执行插入 *** 作
            preparedStatement.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            if (preparedStatement != null) {
                preparedStatement.close();
            }
        }
    }
    //data:{"tm_name":"kevenhe","id":12}
    //SQL:upsert into db.tn(id,tm_name,aa,bb) values('...','...','...','...')
    private String genUpsertSql(String sinkTable, JSONObject data) {
        Set keySet = data.keySet();
        Collection values = data.values();
        //keySet.mkString(",");  =>  "id,tm_name"
        return "upsert into " + GmallConfig.Hbase_SCHEMA + "." + sinkTable + "(" +
                StringUtils.join(keySet, ",") + ") values('" +
                StringUtils.join(values, "','") + "')";
    }
}
 
分流Sink业务数据保存Kafka 

封装sink方法:

public static  FlinkKafkaProducer getKafkaSinkBySchema(KafkaSerializationSchema
kafkaSerializationSchema) {
properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5 * 60 * 1000 +
"");
 return new FlinkKafkaProducer(DEFAULT_TOPIC,
 kafkaSerializationSchema,
 properties,
 FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
总结(DWD)

DWD的实时计算核心是数据分流,再就是状态识别,看一下使用过的一些算子RichMapFunction,ProcessFunction,RichSinkFunction。关于这些算子的选择,可以参考下面:

Function可转换结构可过滤数据测输出open方法可以使用状态输出至MapFunctionyesnononono下游算子FilterFunctionnoyesnonono下游算子RichMapFunctionyesnonoyesyes下游算子RichFilterFunctionnoyesnoyesyes下游算子ProcessFunctionyesyesyesyesyes下游算子SinkFunctionyesyesnonono外部RichSinkFunctionyesyesnoyesyes外部

可以看出Rich函数功能强大,Processfunction功能更强大,但是功能越全面的函数使用越繁琐。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5716926.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-18
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)