package cn.itcast.streaming.task;
import cn.itcast.entity.ItcastDataObj;
import cn.itcast.streaming.sink.SrcDataToHbaseSink;
import cn.itcast.streaming.sink.SrcDataToHbaseSinkOptimizer;
import cn.itcast.streaming.sink.VehicleDetailSinkOptimizer;
import cn.itcast.utils.JsonParseUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.util.concurrent.TimeUnit;
public class KafkaSourceDataTask extends baseTask {
public static void main(String[] args) throws Exception {
//TODO 1)初始化flink流式处理的开发环境
StreamExecutionEnvironment env = getEnv(KafkaSourceDataTask.class.getSimpleName());
//TODO 6)将kafka消费者对象添加到环境中
DataStream
kafkaStreamSource.printToErr("原始数据>>>");
//TODO 7)将json字符串转换成javaBean对象
SingleOutputStreamOperator
itcastDataObjStream.print("解析后的javaBean对象>>>");
//TODO 8)过滤出来异常的数据
SingleOutputStreamOperator
errorDataStream.print("异常数据>>>");
//TODO 9)过滤出来正常的数据
SingleOutputStreamOperator
srcDataStream.print("正常数据>>>");
//TODO 10)将异常的数据写入到hdfs(BucketingSink、StreamingFileSink)
// StreamingFileSink是flink1.10的新特性,而在flink1.10之前的版本是没有的,之前使用BucketingSink实现数据流的方式写入到hdfs中
//指定写入的文件名称和格式
OutputFileConfig outputFileConfig = OutputFileConfig.builder().withPartPrefix("prefix").withPartSuffix(".ext").build();
//实时写入到hdfs的数据最终需要跟hive进行整合,因此可以将数据直接写入到hive默认的数仓路径下
StreamingFileSink errorFileSink = StreamingFileSink.forRowFormat(
new Path(parameterTool.getRequired("hdfsUri") + "/apps/hive/warehouse/ods.db/itcast_error"),
new SimpleStringEncoder<>("utf-8"))
.withBucketAssigner(new DateTimeBucketAssigner("yyyyMMdd"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(5)) //设置滚动时间间隔,5s滚动一次
.withInactivityInterval(TimeUnit.SECONDS.toMillis(2)) //设置不活动的时间间隔,未写入数据处于不活动状态的时间时滚动文件
.withMaxPartSize(128 * 1024 * 1024) //128M滚动一次文件
.build()
).withOutputFileConfig(outputFileConfig).build();
//将itcastDataObject转换成hive所能够解析的字符串
errorDataStream.map(ItcastDataObj::toHiveString).addSink(errorFileSink);
//TODO 11)将正常的数据写入到hdfs中(StreamingFileSink)
StreamingFileSink srcFileSink = StreamingFileSink.forRowFormat(
new Path(parameterTool.getRequired("hdfsUri") + "/apps/hive/warehouse/ods.db/itcast_src"),
new SimpleStringEncoder<>("utf-8"))
.withBucketAssigner(new DateTimeBucketAssigner("yyyyMMdd"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.SECONDS.toMillis(5)) //设置滚动时间间隔,5s滚动一次
.withInactivityInterval(TimeUnit.SECONDS.toMillis(2)) //设置不活动的时间间隔,未写入数据处于不活动状态的时间时滚动文件
.withMaxPartSize(128 * 1024 * 1024) //128M滚动一次文件
.build()
).withOutputFileConfig(outputFileConfig).build();
srcDataStream.map(ItcastDataObj::toHiveString).addSink(srcFileSink);
//TODO 12)将正常的数据写入到hbase中
SrcDataToHbaseSink srcDataToHbaseSink = new SrcDataToHbaseSink("itcast_src");
SrcDataToHbaseSinkOptimizer srcDataToHbaseSinkOptimizer = new SrcDataToHbaseSinkOptimizer("itcast_src");
srcDataStream.addSink(srcDataToHbaseSinkOptimizer);
//TODO 13)将经常用于分析的字段提取出来保存到一个hbase独立的表中,这个表的字段要远远小于正常数据宽表的字段数量,
// 将来与Phoenix整合以后,在可视化页面zepplin中分析查询hbase表数据
VehicleDetailSinkOptimizer vehicleDetailSinkFunction = new VehicleDetailSinkOptimizer("itcastsrc_vehicle_detail");
srcDataStream.addSink(vehicleDetailSinkFunction);
//TODO 14)启动作业,运行任务
env.execute();
}
}
package cn.itcast.streaming.task;
import cn.itcast.entity.ElectricFenceModel;
import cn.itcast.entity.ElectricFenceResultTmp;
import cn.itcast.entity.ItcastDataObj;
import cn.itcast.streaming.function.flatmap.ElectricFenceModelFunction;
import cn.itcast.streaming.function.flatmap.ElectricFenceRulesFuntion;
import cn.itcast.streaming.function.window.ElectricFenceWindowFunction;
import cn.itcast.streaming.sink.ElectricFenceMysqlSink;
import cn.itcast.streaming.source.MysqlElectricFenceResultSource;
import cn.itcast.streaming.source.MysqlElectricFenceSouce;
import cn.itcast.streaming.watermark.ElectricFenceWatermark;
import cn.itcast.utils.JsonParseUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.util.HashMap;
public class ElectricFenceTask extends baseTask {
public static void main(String[] args) {
//TODO 1)初始化flink流处理的运行环境(设置按照事件时间处理数据、设置hadoopHome的用户名、设置checkpoint)
StreamExecutionEnvironment env = getEnv(ElectricFenceTask.class.getSimpleName());
//TODO 2)读取kafka数据源(调用父类的方法)
DataStream
kafkaStream.print("消费到的原始数据>>>");
//TODO 3)将字符串转换成javaBean(ItcastDataObj)对象
SingleOutputStreamOperator
//TODO 4)过滤出来正常数据
SingleOutputStreamOperator
itcastDataObj -> StringUtils.isEmpty(itcastDataObj.getErrorData()));
//TODO 5)读取电子围栏规则数据以及电子围栏规则关联的车辆数据并进行广播
DataStream
//TODO 6)将原始数据(消费的kafka数据)与电子围栏规则数据进行关联 *** 作(Connect)
ConnectedStreams
srcItcastDataObjStream.connect(electricFenceVinsStream);
SingleOutputStreamOperator
electricFenceModelStream.print("原始数据于电子围栏规则广播流数据拉宽后的结果>>>");
//TODO 7)对合并后的数据分组后应用90s滚动窗口,然后对窗口进行自定义函数的开发(计算出来该窗口的数据属于电子围栏外还是电子围栏内)
SingleOutputStreamOperator
.assignTimestampsAndWatermarks(new ElectricFenceWatermark())
.keyBy(ElectricFenceModel::getVin) //根据车架号进行分流 *** 作
.window(TumblingEventTimeWindows.of(Time.seconds(90))) //设置90s滚动一次窗口
.apply(new ElectricFenceWindowFunction());
electricFenceDataStream.printToErr("验证进或者出电子围栏分析结果>>>");
//TODO 8)读取电子围栏分析结果表的数据并进行广播
DataStream
//TODO 9)对第七步和第八步产生的数据进行关联 *** 作(connect)
ConnectedStreams
//TODO 10)对第九步的结果进行滚动窗口 *** 作,应用自定义窗口函数(实现添加uuid和inMysql属性赋值)
SingleOutputStreamOperator
//TODO 11)将分析后的电子围栏结果数据实时写入到mysql数据库中
resultDataStream.addSink(new ElectricFenceMysqlSink());
try {
//TODO 12)运行作业,等待停止
env.execute();
} catch (Exception exception) {
exception.printStackTrace();
}
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)