- 一、Flume 对 接 Kafka
- 1)配置 flume(flume-kafka.conf)
- 2) 启动 kafkaIDEA 消费者
- 3) 进入 flume 根目录下,启动 flume
- 4) 向 /opt/module/data/flume.log 里追加数据,查看 kafka 消费者消费情况
- 二、为什么要kafka对接Flume
- 1、问题
- 三、kafka对接Flume (数据分类)
- 1、编码
- 2、丢到服务器
- 3、在flume 的job里面新增分类文件如下
- 4、启动两个消费者
- 5、启动flume
- 6、开启发送数据端口
# define a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type = exec a1.sources.r1.command = tail -F -c +0 /opt/module/data/flume.log a1.sources.r1.shell = /bin/bash -c # sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 # channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c12) 启动 kafkaIDEA 消费者 3) 进入 flume 根目录下,启动 flume
$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf4) 向 /opt/module/data/flume.log 里追加数据,查看 kafka 消费者消费情况
$ echo hello >> /opt/module/data/flume.log二、为什么要kafka对接Flume 1、问题
三、kafka对接Flume (数据分类) 1、编码采集日志给多个人使用
如果使用flume、那就的再多加一个channel、不能动态加业务线
增加业务线动态增加(类似消费者可以动态增加、副本数不变)
监听头部信息 headers.put(“topic”, “first”);
package org.example.interceptor; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.util.ArrayList; import java.util.List; import java.util.Map; public class TypeInterceptor implements Interceptor { //声明一个集合、用于存放拦截器处理后的事件 private List2、丢到服务器addHeaderEvents; @Override public void initialize() { //初始化集合用于存放拦截器处理后的事件 addHeaderEvents = new ArrayList<>(); } @Override public Event intercept(Event event) { //1、获取事件中的头部信息 header & body Map headers = event.getHeaders(); //获取事件中的body信息 String body = new String(event.getBody()); //根据body中是否有hello 来决定添加怎样的头部信息 if (body.contains("hello")) { headers.put("topic", "first"); } else { headers.put("topic", "second"); } //返回数据 return event; } @Override public List intercept(List list) { //清空集合 addHeaderEvents.clear(); for (Event event : list) { //交给单个Event 处理 addHeaderEvents.add(intercept(event)); } //返回数据 return addHeaderEvents; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new TypeInterceptor(); } @Override public void configure(Context context) { } } }
位置是flume的lib下
3、在flume 的job里面新增分类文件如下
新增配置属性
#Name a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 #Interceptor a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = org.example.interceptor.TypeInterceptor$Builder #Chabbel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 #事务容量 a1.channels.c1.transactionCapacity = 100 #Sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sinks.k1.kafka.topic = first a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 1 #Bind a1.sources.r1.channels = c1 a1.sinks.k1.channel = c14、启动两个消费者
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic second5、启动flume
bin/flume-ng agent -c conf/ -f job/type_kafka.conf -n a16、开启发送数据端口
nc localhost 44444
查看效果
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)