消费kafka

消费kafka,第1张

消费kafka

package cn.itcast.streaming.task;

import cn.itcast.entity.ItcastDataObj;
import cn.itcast.streaming.sink.SrcDataToHbaseSink;
import cn.itcast.streaming.sink.SrcDataToHbaseSinkOptimizer;
import cn.itcast.streaming.sink.VehicleDetailSinkFunction;
import cn.itcast.utils.FlinkUtil;
import cn.itcast.utils.JsonParseUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;


public class KafkaSourceDataTask {
     
    public static void main(String[] args) throws Exception {
        
        //加载conf.properties配置文件,返回ParameterTool工具类对象
        ParameterTool parameterTool = ParameterTool.fromPropertiesFile(baseTask.class.getClassLoader().getResourceAsStream("conf.properties"));

        //TODO 1)初始化flink流式处理的开发环境
        System.setProperty("HADOOP_USER_NAME", "root");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置全局的参数
        env.getConfig().setGlobalJobParameters(parameterTool);
        //TODO 2)设置按照事件时间处理数据(划分窗口或者添加水印都需要事件时间)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //TODO 3)开启checkpoint
        //TODO 3.1:设置每隔30秒钟开启checkpoint
        env.enableCheckpointing(30*1000);
        //TODO 3.2:设置检查点的model,exactly-once,保证数据消费一次,数据不重复消费
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //TODO 3.3:设置两次checkpoint时间间隔,避免两次间隔太近导致频繁checkpoint而出现业务处理能力下降
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(20000);
        //TODO 3.4:设置checkpoint的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(20000);
        //TODO 3.5:设置checkpoint最大的尝试次数,同一个时间有几个checkpoint并行执行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //TODO 3.6:设置checkpoint取消的时候,是否保留checkpoint,checkpoint默认会在job取消的时候删除
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //TODO 3.7:设置执行job过程中,保存检查点错误时,job不失败
        env.getCheckpointConfig().setFailonCheckpointingErrors(false);
        //TODO 3.8:设置检查点存储的位置,使用rocksDBStateBackend,存储到本地+hdfs分布式文件,增量检查点
        String bashHdfsUri = parameterTool.getRequired("hdfsUri");
        try {
            env.setStateBackend(new RocksDBStateBackend(bashHdfsUri+"/flink/checkpoint/"+KafkaSourceDataTask.class.getSimpleName()));
        } catch (IOException e) {
            e.printStackTrace();
        }
        //TODO 4)设置任务的重启策略(固定延迟重启策略、失败率重启策略、无重启策略)
        env.setRestartStrategy(RestartStrategies.noRestart());

        //TODO 5)创建flink消费kafka数据的对象,指定kafka的参数信息
        Properties props = new Properties();
        //TODO     5.1:设置kafka集群地址
        props.setProperty("bootstrap.servers", parameterTool.getRequired("bootstrap.servers"));
        //TODO     5.2:设置消费者组id
        props.setProperty("group.id", "KafkaSourceDataTask04");
        //TODO     5.3:设置kafka的分区感知(动态监测)
        props.setProperty("flink.partition-discovery.interval-millis", "30000");
        //TODO     5.5:设置自动递交offset位置策略
        props.setProperty("auto.offset.reset", parameterTool.get("auto.offset.reset", "earliest"));
        //5.不自动提交偏移量,交给flink的checkpoint处理哦
        props.setProperty("enable.auto.commit", parameterTool.get("enable.auto.commit", "false"));
        //TODO     5.6:创建kafka的消费者实例
        FlinkKafkaConsumer011 kafkaConsumer = new FlinkKafkaConsumer011(
                parameterTool.getRequired("kafka.topic"),
                new SimpleStringSchema(), props
        );
        //TODO     5.7:设置自动递交offset保存到检查点
        kafkaConsumer.setCommitOffsetsonCheckpoints(true);

        //TODO 6)将kafka消费者对象添加到环境中
        DataStream dataStreamSource = env.addSource(kafkaConsumer);

        //打印输出测试
        dataStreamSource.print();

        //TODO 13)启动作业,运行任务
        env.execute();
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/4685076.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-07
下一篇 2022-11-07

发表评论

登录后才能评论

评论列表(0条)

保存