package cn.itcast.streaming.task;
import cn.itcast.entity.ItcastDataObj;
import cn.itcast.streaming.sink.SrcDataToHbaseSink;
import cn.itcast.streaming.sink.SrcDataToHbaseSinkOptimizer;
import cn.itcast.streaming.sink.VehicleDetailSinkFunction;
import cn.itcast.utils.FlinkUtil;
import cn.itcast.utils.JsonParseUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class KafkaSourceDataTask {
public static void main(String[] args) throws Exception {
//加载conf.properties配置文件,返回ParameterTool工具类对象
ParameterTool parameterTool = ParameterTool.fromPropertiesFile(baseTask.class.getClassLoader().getResourceAsStream("conf.properties"));
//TODO 1)初始化flink流式处理的开发环境
System.setProperty("HADOOP_USER_NAME", "root");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置全局的参数
env.getConfig().setGlobalJobParameters(parameterTool);
//TODO 2)设置按照事件时间处理数据(划分窗口或者添加水印都需要事件时间)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//TODO 3)开启checkpoint
//TODO 3.1:设置每隔30秒钟开启checkpoint
env.enableCheckpointing(30*1000);
//TODO 3.2:设置检查点的model,exactly-once,保证数据消费一次,数据不重复消费
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//TODO 3.3:设置两次checkpoint时间间隔,避免两次间隔太近导致频繁checkpoint而出现业务处理能力下降
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
//TODO 3.4:设置checkpoint的超时时间
env.getCheckpointConfig().setCheckpointTimeout(20000);
//TODO 3.5:设置checkpoint最大的尝试次数,同一个时间有几个checkpoint并行执行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//TODO 3.6:设置checkpoint取消的时候,是否保留checkpoint,checkpoint默认会在job取消的时候删除
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//TODO 3.7:设置执行job过程中,保存检查点错误时,job不失败
env.getCheckpointConfig().setFailonCheckpointingErrors(false);
//TODO 3.8:设置检查点存储的位置,使用rocksDBStateBackend,存储到本地+hdfs分布式文件,增量检查点
String bashHdfsUri = parameterTool.getRequired("hdfsUri");
try {
env.setStateBackend(new RocksDBStateBackend(bashHdfsUri+"/flink/checkpoint/"+KafkaSourceDataTask.class.getSimpleName()));
} catch (IOException e) {
e.printStackTrace();
}
//TODO 4)设置任务的重启策略(固定延迟重启策略、失败率重启策略、无重启策略)
env.setRestartStrategy(RestartStrategies.noRestart());
//TODO 5)创建flink消费kafka数据的对象,指定kafka的参数信息
Properties props = new Properties();
//TODO 5.1:设置kafka集群地址
props.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"));
//TODO 5.2:设置消费者组id
props.setProperty("group.id", "KafkaSourceDataTask04");
//TODO 5.3:设置kafka的分区感知(动态监测)
props.setProperty("flink.partition-discovery.interval-millis", "30000");
//TODO 5.5:设置自动递交offset位置策略
props.setProperty("auto.offset.reset", parameterTool.get("auto.offset.reset", "earliest"));
//5.不自动提交偏移量,交给flink的checkpoint处理哦
props.setProperty("enable.auto.commit", parameterTool.get("enable.auto.commit", "false"));
//TODO 5.6:创建kafka的消费者实例
FlinkKafkaConsumer011
parameterTool.getRequired("kafka.topic"),
new SimpleStringSchema(), props
);
//TODO 5.7:设置自动递交offset保存到检查点
kafkaConsumer.setCommitOffsetsonCheckpoints(true);
//TODO 6)将kafka消费者对象添加到环境中
DataStream
//打印输出测试
dataStreamSource.print();
//TODO 13)启动作业,运行任务
env.execute();
}
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)