今天开始有个新的需求:统计pv和uv
使用Flink整合kafka通过埋点计算评估分析客户行为
接下来一起从小白开始玩吧:
1.Flink-kafka-connectorKafka中的partition机制和Flink的并行度机制结合,实现数据恢复
Kafka可以作为Flink的source和sink
任务失败,通过设置kafka的offset来恢复应用
依赖2.kafka简单 *** 作org.apache.flink flink-connector-kafka_2.111.31.1
启动zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
启动server: nohup bin/kafka-server-start.sh config/server.properties &
创建一个topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic:bin/kafka-topics.sh --list --zookeeper localhost:2181
发送数据:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
启动一个消费者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
删除topic: bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topn
3.Flink消费Kafka注意事项-
setStartFromGroupOffsets()【默认消费策略】
默认读取上次保存的offset信息
如果是应用第一次启动,读取不到上次的offset信息,则会根据这个参数auto.offset.reset的值来进行消费数据
- setStartFromEarliest()
从最早的数据开始进行消费,忽略存储的offset信息
- setStartFromLatest()
从最新的数据进行消费,忽略存储的offset信息
-
setStartFromSpecificOffsets(Map
)
从指定位置进行消费 -
当checkpoint机制开启的时候,KafkaConsumer会定期把kafka的offset信息还有其他operator的状态信息一块保存起来。当job失败重启的时候,Flink会从最近一次的checkpoint中进行恢复数据,重新消费kafka中的数据。
- 为了能够使用支持容错的kafka Consumer,需要开启checkpoint
env.enableCheckpointing(5000); // 每5s checkpoint一次
我本地安装了一个kafka_2.11-2.1.0版本的kafka
启动Zookeeper和kafka server:
启动zk:nohup bin/zookeeper-server-start.sh config/zookeeper.properties & 启动server: nohup bin/kafka-server-start.sh config/server.properties &
创建一个topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKafka作为Flink Sink
向kafka写入数据:
public class KafkaProducer { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSourcetext = env.addSource(new MyNoParalleSource()).setParallelism(1); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE); FlinkKafkaProducer producer = new FlinkKafkaProducer("test",new SimpleStringSchema(),properties); text.addSink(producer); env.execute(); } }//
实现一个并行度为1的MyNoParalleSource来生产数据,代码如下:
//使用并行度为1的source public class MyNoParalleSource implements SourceFunction{//1 //private long count = 1L; private boolean isRunning = true; @Override public void run(SourceContext ctx) throws Exception { while(isRunning){ //图书的排行榜 List books = new ArrayList<>(); books.add("Pyhton从入门到放弃");//10 books.add("Java从入门到放弃");//8 books.add("Php从入门到放弃");//5 books.add("C++从入门到放弃");//3 books.add("Scala从入门到放弃");//0-4 int i = new Random().nextInt(5); ctx.collect(books.get(i)); //每2秒产生一条数据 Thread.sleep(2000); } } //取消一个cancel的时候会调用的方法 @Override public void cancel() { isRunning = false; } }
代码实现了一个发送器,来发送书名
然后右键运行程序,控制台输出如下:
开始源源不断的生产数据了。
然后用命令去查看一下 kafka test这个topic:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
输出如下:
Kafka作为Flink Source直接上代码:
public class KafkaConsumer { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); //从最早开始消费 consumer.setStartFromEarliest(); DataStream stream = env .addSource(consumer); stream.print(); //stream.map(); env.execute(); } }//
控制台输出如下:
好了,实际开发中有很多配置参数,后期有时间再更新哈
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)