SlideLive是一款PPT在线播放和分享的网站,目前已收录1w+的PPT文档。在SlideLive网站中,我们需要统计出每个PPT详情页总的浏览量和昨日新增浏览次数。由于页面浏览事件请求量大,直接将用户行为事件写入到mysql中会对数据库造成性能上的影响。因此,我们引入了Kafka Stream作为流式计算引擎帮助实时计算各幻灯片的浏览次数。本文将简要介绍Kafka Stream在SlideLive网站中的使用。
具体的功能体验可以参阅官网:SlideLive:分享和发现知识
需求SlideLive对于Kafka有两个层面的需求:
-
消息队列
-
实时统计
消息队列
SlideLive网站需要将用户的搜索行为数据记录下来,然后交给Learning to Rank算法训练精排模型,由于用户的搜索数据大,因此我们使用Kafka最为缓冲队列。
实时统计
SlideLive网站需要统计如下指标
-
幻灯片详情页的浏览次数
-
幻灯片的分享次数
-
幻灯片的收藏次数
统计的维度:
-
幻灯片级别:总的次数、昨日新增
-
网站级别:总的次数、昨日新增
实时性:高
技术实现 Kafka Stream 介绍Kafka Stream类似于Spark Stream和Flink,提供了对存储在Kafka内的数据进行流式处理和分析的功能。它具有如下特点:
-
Kafka Stream 提供了一个非常简单而轻量的类库,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
-
除了Kafka外,无任何外部依赖
-
充分利用Kafka分区机制实现水平扩展和顺序性保证
-
提供记录级的处理能力,从而实现毫秒级的低延迟
-
支持基于事件时间的窗口 *** 作
Kafka Stream 原理这里我只简单列举了关键几点,详情请参阅:Apache Kafka
在Kafka Stream计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,通过先定义目标计算并在数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。
总体结构
整个架构中,包含三个处理程序和两类 Kafka Topic。
-
Event Producer是消息的发布者,它接受前端发送的用户行为事件(Events),并将Events写入到指定的Kafka Topic中;
-
Event Count Stream Processor是整个流式计算的核心,它负责从Input Topic实时读取数据,并汇统计浏览、分享、收藏事件的总次数以及昨日新增次数,并将统计结果实时写入到Output Topic中;
-
Event Count Consumer是消息的消费者,它从Output Topic中读取统计消息,并将结果写入到mysql数据库或者缓存中便于SlideLive前端读取。
Serde总结slideEventSerde = Serdes.serdeFrom(new SlideEventSerializer(), new SlideEventDeserializer()); Map streamProps = this.kafkaProperties.getStreamProps(); Properties properties = new Properties(); properties.putAll(streamProps); StreamsBuilder builder = new StreamsBuilder(); KStream source = builder.stream(Constants.KAFKA_INPUT_TOPIC, Consumed.with(Serdes.Long(), slideEventSerde)); KStream viewFilterStream = source.filter((key, slideEvent) -> slideEvent.getType().equals("VIEW")); KStream shareFilterStream = source.filter((key, slideEvent) -> slideEvent.getType().equals("SHARE")); KStream collectFilterStream = source.filter((key, slideEvent) -> slideEvent.getType().equals("COLLECT")); // slide级别: 总的 viewFilterStream .groupByKey(Grouped.with(Serdes.Long(), slideEventSerde)) .count(Materialized.as("level.slide.view-event-count")) .toStream() .to("level.slide.view-event-count"); collectFilterStream .groupByKey(Grouped.with(Serdes.Long(), slideEventSerde)) .count(Materialized.as("level.slide.collect-event-count")) .toStream() .to("level.slide.collect-event-count"); shareFilterStream .groupByKey(Grouped.with(Serdes.Long(), slideEventSerde)) .count(Materialized.as("level.slide.share-event-count")) .toStream() .to("level.slide.share-event-count"); // slide级别:每日 viewFilterStream .groupByKey(Grouped.with(Serdes.Long(), slideEventSerde)) .windowedBy(TimeWindows.of(1 * 3600 * 1000 * 24)) .count(Materialized.as("level.slide.view-event-count-day")) .toStream() .to("level.slide.view-event-count-day"); collectFilterStream .groupByKey(Grouped.with(Serdes.Long(), slideEventSerde)) .windowedBy(TimeWindows.of(1 * 3600 * 1000 * 24)) .count(Materialized.as("level.slide.collect-event-count-day")) .toStream() .to("level.slide.collect-event-count-day"); shareFilterStream .groupByKey(Grouped.with(Serdes.Long(), slideEventSerde)) .windowedBy(TimeWindows.of(1 * 3600 * 1000 * 24)) .count(Materialized.as("level.slide.share-event-count-day")) .toStream() .to("level.slide.share-event-count-day"); Topology topology = builder.build(); System.out.println(topology.describe()); final KafkaStreams streams = new KafkaStreams(topology, properties); streams.start();
SlideLive是一款PPT在线播放和分享的网站,其采用轻量级的流式计算引擎Kafka Stream对用户行为事件进行统计分析。更多内容,请访问SlideLive。SlideLive地址:SlideLive:分享和发现知识
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)