接收实时数据 MySQL+Flume+Kafka 配置整合

接收实时数据 MySQL+Flume+Kafka 配置整合,第1张

话不多说,直接切入主题
前情概要:
编写scala代码,实现定时插入数据到MySQL的功能,模拟实时数据情景

工具版本信息:
Flume:1.9.0 Kafka:2.11-2.3.1 MySQL:5.6.13

基本架构:
数据源只有MYSQL,所以采集者Flume和MySQL在同一服务器
消费者Kafka我选择集群分布

hostserver
leaderMySQL,zookeeper,Kafka,Flume
follower1zookeeper,Kafka
follower2zookeeper,Kafka

思路
      • 1.安装Kafka,配置Kafka
      • 2.安装Flume,配置Flume
      • 3.整合Flume+Kafka
      • 4.整合MySQL+Flume

1.安装Kafka,配置Kafka

前提:装好jdk,并配置好zookeeper,hdfs集群
此处使用jdk8
同样使用tar安装Kafka

  • 修改配置文件
    修改kafka下的conf中的server.properties文件
  vi $KAFKA_HOME/conf/server.properties

主要修改两个参数

#唯一的服务器id,每台机器分别不一样。
broker.id=001
#监听ip和端口,此处leader为本机存在ip映射的hostname
listeners=PLAINTEXT://leader:9092
  • 测试Kafka
    进入Kafka下的bin目录
#创建topic kafka_netcat_test
[root@leader bin]# ./kafka-topics.sh --create --zookeeper leader:2181,follower1:2181,follower2:2181 --replication-factor 1 --partitions 1 --topic kafka_netcat_test
#查看topic 是否创建成功
[root@leader bin]#./kafka-topics.sh --describe --zookeeper leader:2181,follower1:2181,follower2:2181 --topic kafka_netcat_test

#参数含义
–replication-factor 2 #复制两份
–partitions 2 #创建2个分区
–topic #主题
本处参考:https://blog.csdn.net/qq_32066409/article/details/81198488

2.安装Flume,配置Flume

通过tar安装flume
在flume的bin目录下新建一个配置文件
kafka_netcat.conf
内容如下

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 如果是用Java通过Flume API发送数据,把netcat改成avro
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = kafka_netcat_test
a1.sinks.k1.kafka.bootstrap.servers = leader:9092,follower1:9092,follower2:9092
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

整个配置文件内容分为三个大部分:
1、从整体上描述代理agent中sources、sinks、channels所涉及到的组件;2、详细描述agent中每一个source、sink与channel的具体实现:
Source使用了netcat,指定绑定的主机以及端口号;
Sink按照Kafka的方式进行配置,
务必要注意type中的KafkaSink是首字母大写,否则会报错
3、通过channel将source与sink连接起来。

3.整合Flume+Kafka
  1. 启动Kafka

当前会话:会话1,启动Kafka

[root@leader bin]#./kafka-console-consumer.sh --bootstrap-server leader:9092,follower1:9092,follower2:9092 --topic kafka_netcat_test --from-beginning 
  1. 启动Flume
    新建会话:会话2,启动flume
    (启动时注意查看打印在终端的信息,注意捕捉ERROR信息)
[root@leader bin]# ./flume-ng agent -n a1 -c ../conf/ -f kafka_netcat.conf  -Dflume.root.logger=INFO,console
  1. 查看效果
    新建会话:会话3
[root@leader bin]# nc localhost 44444

随意输入数据回车后,显示OK
在会话1(Kafka)显示会话3中输入的数据
整合成功。
以上部分参考:https://blog.csdn.net/a_drjiaoda/article/details/85003929

4.整合MySQL+Flume

flume的source变为MySQL

  1. 修改kafka_netcat.conf的source信息
#example.conf: A single-node flume configuration
#Test Kafka Sink in netcat Source
 
#Name the components on this agent
a1.sources = src-1
a1.sinks = k1
a1.channels = c1
 
a1.sources.src-1.type =org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://leader:3306/user_behavior
# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = root
a1.sources.src-1.hibernate.connection.password = 123456
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.cj.jdbc.Driver
a1.sources.src-1.run.query.delay=5000
a1.sources.src-1.status.file.path = /home/leader/flume-1.9.0/flume_status
a1.sources.src-1.status.file.name = sql-source.status
# Custom query
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query = select * from behavior
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=50
 
#Describe the sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.topic = kafka_netcat_test

a1.sinks.k1.kafka.bootstrap.servers = leader:9092,follower1:9092,follower2:9092

a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder
 
#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
 
#Bind the source and sink to the channel
a1.sources.src-1.channels = c1
a1.sinks.k1.channel = c1

  1. 添加JAR包
    将以下JAR包放到flume下的lib文件夹中
    (版本不一定与我的一样,但是以下JAR包可用)
  • flume-ng-sql-source-1.4.3.jar
  • mysql-connector-java-5.1.32-bin.jar
  1. 启动
  • 启动Kafka
    会话1,启动Kafka
[root@leader bin]#./kafka-console-consumer.sh --bootstrap-server leader:9092,follower1:9092,follower2:9092 --topic kafka_netcat_test --from-beginning 
  • 启动Flume
    会话2,启动flume
[root@leader bin]# ./flume-ng agent -n a1 -c ../conf/ -f kafka_netcat.conf  -Dflume.root.logger=INFO,console

可能会出现MySQL权限错误:
进入MYSQL

mysql>grant all privileges on 数据库名.*  to root@leader(报错的用户) identified by '123456'(密码)
mysql>flush privileges; 

Flume和Kafka都启动成功后,运行实时插入数据库脚本
会话1中有内容输出,数据流通成功

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/790756.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-05
下一篇 2022-05-05

发表评论

登录后才能评论

评论列表(0条)

保存