Flink+kafka根据埋点客户计算漏损率

Flink+kafka根据埋点客户计算漏损率,第1张

Flink+kafka根据埋点客户计算漏损率

今天开始有个新的需求:统计pv和uv

使用Flink整合kafka通过埋点计算评估分析客户行为

接下来一起从小白开始玩吧:

1.Flink-kafka-connector 

Kafka中的partition机制和Flink的并行度机制结合,实现数据恢复

Kafka可以作为Flink的source和sink

任务失败,通过设置kafka的offset来恢复应用

依赖



    org.apache.flink

    flink-connector-kafka_2.11

    1.31.1


2.kafka简单 *** 作

启动zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

启动server: nohup bin/kafka-server-start.sh config/server.properties &

创建一个topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看topic:bin/kafka-topics.sh --list --zookeeper localhost:2181

发送数据:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

启动一个消费者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

删除topic: bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topn

3.Flink消费Kafka注意事项
  • setStartFromGroupOffsets()【默认消费策略】

    默认读取上次保存的offset信息
    如果是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据

  • setStartFromEarliest()
    从最早的数据开始进行消费,忽略存储的offset信息
  • setStartFromLatest()
    从最新的数据进行消费,忽略存储的offset信息
  • setStartFromSpecificOffsets(Map)
    从指定位置进行消费

  • 当checkpoint机制开启的时候,KafkaConsumer会定期把kafka的offset信息还有其他operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,重新消费kafka中的数据。

  • 为了能够使用支持容错的kafka Consumer,需要开启checkpoint
    env.enableCheckpointing(5000); // 每5s checkpoint一次
4.接下来coding吧 搭建Kafka单机环境

我本地安装了一个kafka_2.11-2.1.0版本的kafka

启动Zookeeper和kafka server:

启动zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties  &

启动server: nohup bin/kafka-server-start.sh config/server.properties &

创建一个topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

Kafka作为Flink Sink

向kafka写入数据:

public class KafkaProducer {


    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource text = env.addSource(new MyNoParalleSource()).setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        FlinkKafkaProducer producer = new FlinkKafkaProducer("test",new SimpleStringSchema(),properties);

        text.addSink(producer);
        env.execute();
    }
}//

实现一个并行度为1的MyNoParalleSource来生产数据,代码如下:

//使用并行度为1的source
public class MyNoParalleSource implements SourceFunction {//1

    //private long count = 1L;
    private boolean isRunning = true;
    
    
    @Override
    public void run(SourceContext ctx) throws Exception {
        while(isRunning){
            //图书的排行榜
            List books = new ArrayList<>();
            books.add("Pyhton从入门到放弃");//10
            books.add("Java从入门到放弃");//8
            books.add("Php从入门到放弃");//5
            books.add("C++从入门到放弃");//3
            books.add("Scala从入门到放弃");//0-4
            int i = new Random().nextInt(5);
            ctx.collect(books.get(i));

            //每2秒产生一条数据
            Thread.sleep(2000);
        }
    }
    //取消一个cancel的时候会调用的方法
    @Override
    public void cancel() {
        isRunning = false;
    }
}

代码实现了一个发送器,来发送书名等...

然后右键运行程序,控制台输出如下:

开始源源不断的生产数据了。

然后用命令去查看一下 kafka test这个topic:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

输出如下:

Kafka作为Flink Source

直接上代码:

public class KafkaConsumer {

    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");

        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
        //从最早开始消费
        consumer.setStartFromEarliest();
        DataStream stream = env
                .addSource(consumer);
        stream.print();
        //stream.map();
        env.execute();

    }
}//

控制台输出如下:

 好了,实际开发中有很多配置参数,后期有时间再更新哈

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5677494.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存