Flume-Kafka-SparkStreaming对接案例实 ***

Flume-Kafka-SparkStreaming对接案例实 *** ,第1张

Flume-Kafka-SparkStreaming对接案例实 *** 1.准备工作

①一个java程序,需要有一些 *** 作能够打印特定log日志,并打成jar包;

②在linux服务器上安装flume、zookeeper、kafka;

2.案例思路

当我把jar包程序部署在linux服务器上时,每当我访问特定接口,就会产生特定日志文件,这时候flume监听该日志文件,然后并且对日志进行过滤,一些springboot应用启动日志需要过滤,然后发送给kafka对应主题,最后对接SparkStreaming就可以进行相关业务处理了

3.案例流程图

4.案例实 *** 4.1产生日志的主程序

首先我的java程序使用springboot框架写的,其中对外提供一个查看商品的接口

比如说每次我访问就会在/usr/data/service.log文件下打印一条日志

2022-01-23 14:31:29.718 [http-nio-9000-exec-2] INFO  com.wy.basicservice.controller.SearchController - VIEW_LOG:61d05c0bf18b1d3555701d18|61da461a79862b4f9e9b8e6c

主要构成部分是VIEW_LOG:用户id|商品id。表示用户实时访问了某商品。

打包成jar包放到linux服务器下(打包之前需要把日志输出路径配一下,我这里使用的是/usr/data/service.log),通过java -jar运行。

4.2Flume配置

在job文件夹下创建file-kafka.conf配置文件

基本都打注释,没打注释的大部分可以理解为八股文般的配置,没什么技术含量。

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
# 监听该日志文件
a1.sources.r1.command = tail -f /usr/data/service.log
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=regex_filter
# 定义日志过滤前缀的正则,因为一些其他日志我们不需要,只需要用户实时浏览产生的日志
a1.sources.r1.interceptors.i1.regex=.+VIEW_LOG.+

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = kafka部署服务器的ip:9092
#主题
a1.sinks.k1.kafka.topic = view
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume

 bin/flume-ng agent -n a1 -c conf/ -f job/file-kafka.conf

4.3kafka

其中zookeeper和kafka

bin/zkServer.sh start

bin/kafka-server-start.sh config/server.properties

提前创建一个view主题,用于接受flume采集得到的消息

bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic view
4.4编写SparkStreaming程序

首先创建SparkConf和StreamingContext,其中定义流数据被分批处理的时间间隔为3s,然后对接kafka数据源,从view主题获取数据;

 对从kafka读出的数据进行清洗转换,即

原始的格式

com.wy.basicservice.controller.SearchController - VIEW_LOG:61d05c0bf18b1d3555701d18|61da461a79862b4f9e9b8e6c

转为(userId,productId)格式

(61d05c0bf18b1d3555701d18,61da461a79862b4f9e9b8e6c)

最后打印出数据,当然得到了rdd就可以做各种其他业务逻辑 *** 作了

4.5测试

当访问如下接口时:

控制台打印:

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5710652.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存