话不多说,直接切入主题
前情概要:
编写scala代码,实现定时插入数据到MySQL的功能,模拟实时数据情景
工具版本信息:
Flume:1.9.0 Kafka:2.11-2.3.1 MySQL:5.6.13
基本架构:
数据源只有MYSQL,所以采集者Flume和MySQL在同一服务器
消费者Kafka我选择集群分布
host | server |
---|---|
leader | MySQL,zookeeper,Kafka,Flume |
follower1 | zookeeper,Kafka |
follower2 | zookeeper,Kafka |
- 1.安装Kafka,配置Kafka
- 2.安装Flume,配置Flume
- 3.整合Flume+Kafka
- 4.整合MySQL+Flume
前提:装好jdk,并配置好zookeeper,hdfs集群
此处使用jdk8
同样使用tar安装Kafka
- 修改配置文件
修改kafka下的conf中的server.properties文件
vi $KAFKA_HOME/conf/server.properties
主要修改两个参数
#唯一的服务器id,每台机器分别不一样。
broker.id=001
#监听ip和端口,此处leader为本机存在ip映射的hostname
listeners=PLAINTEXT://leader:9092
- 测试Kafka
进入Kafka下的bin目录
#创建topic kafka_netcat_test
[root@leader bin]# ./kafka-topics.sh --create --zookeeper leader:2181,follower1:2181,follower2:2181 --replication-factor 1 --partitions 1 --topic kafka_netcat_test
#查看topic 是否创建成功
[root@leader bin]#./kafka-topics.sh --describe --zookeeper leader:2181,follower1:2181,follower2:2181 --topic kafka_netcat_test
#参数含义
–replication-factor 2 #复制两份
–partitions 2 #创建2个分区
–topic #主题
本处参考:https://blog.csdn.net/qq_32066409/article/details/81198488
通过tar安装flume
在flume的bin目录下新建一个配置文件
kafka_netcat.conf
内容如下
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 如果是用Java通过Flume API发送数据,把netcat改成avro
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = kafka_netcat_test
a1.sinks.k1.kafka.bootstrap.servers = leader:9092,follower1:9092,follower2:9092
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
整个配置文件内容分为三个大部分:
1、从整体上描述代理agent中sources、sinks、channels所涉及到的组件;2、详细描述agent中每一个source、sink与channel的具体实现:
Source使用了netcat,指定绑定的主机以及端口号;
Sink按照Kafka的方式进行配置,
务必要注意type中的KafkaSink是首字母大写,否则会报错
3、通过channel将source与sink连接起来。
- 启动Kafka
当前会话:会话1,启动Kafka
[root@leader bin]#./kafka-console-consumer.sh --bootstrap-server leader:9092,follower1:9092,follower2:9092 --topic kafka_netcat_test --from-beginning
- 启动Flume
新建会话:会话2,启动flume
(启动时注意查看打印在终端的信息,注意捕捉ERROR信息)
[root@leader bin]# ./flume-ng agent -n a1 -c ../conf/ -f kafka_netcat.conf -Dflume.root.logger=INFO,console
- 查看效果
新建会话:会话3
[root@leader bin]# nc localhost 44444
随意输入数据回车后,显示OK
在会话1(Kafka)显示会话3中输入的数据
整合成功。
以上部分参考:https://blog.csdn.net/a_drjiaoda/article/details/85003929
flume的source变为MySQL
- 修改kafka_netcat.conf的source信息
#example.conf: A single-node flume configuration
#Test Kafka Sink in netcat Source
#Name the components on this agent
a1.sources = src-1
a1.sinks = k1
a1.channels = c1
a1.sources.src-1.type =org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://leader:3306/user_behavior
# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = 123456
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
a1.sources.src-1.run.query.delay=5000
a1.sources.src-1.status.file.path = /home/leader/flume-1.9.0/flume_status
a1.sources.src-1.status.file.name = sql-source.status
# Custom query
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query = select * from behavior
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=50
#Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = kafka_netcat_test
a1.sinks.k1.kafka.bootstrap.servers = leader:9092,follower1:9092,follower2:9092
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Bind the source and sink to the channel
a1.sources.src-1.channels = c1
a1.sinks.k1.channel = c1
- 添加JAR包
将以下JAR包放到flume下的lib文件夹中
(版本不一定与我的一样,但是以下JAR包可用)
- flume-ng-sql-source-1.4.3.jar
- mysql-connector-java-5.1.32-bin.jar
- 启动
- 启动Kafka
会话1,启动Kafka
[root@leader bin]#./kafka-console-consumer.sh --bootstrap-server leader:9092,follower1:9092,follower2:9092 --topic kafka_netcat_test --from-beginning
- 启动Flume
会话2,启动flume
[root@leader bin]# ./flume-ng agent -n a1 -c ../conf/ -f kafka_netcat.conf -Dflume.root.logger=INFO,console
可能会出现MySQL权限错误:
进入MYSQL
mysql>grant all privileges on 数据库名.* to root@leader(报错的用户) identified by '123456'(密码)
mysql>flush privileges;
Flume和Kafka都启动成功后,运行实时插入数据库脚本
会话1中有内容输出,数据流通成功
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)