大数据之 Flume 对 接 Kafka 完整使用 (第六章)

大数据之 Flume 对 接 Kafka 完整使用 (第六章),第1张

数据之 Flume 对 接 Kafka 完整使用 (第六章)

大数据之 Flume 对 接 Kafka 完整使用
  • 一、Flume 对 接 Kafka
    • 1)配置 flume(flume-kafka.conf)
    • 2) 启动 kafkaIDEA 消费者
    • 3) 进入 flume 根目录下,启动 flume
    • 4) 向 /opt/module/data/flume.log 里追加数据,查看 kafka 消费者消费情况
  • 二、为什么要kafka对接Flume
    • 1、问题
  • 三、kafka对接Flume (数据分类)
    • 1、编码
    • 2、丢到服务器
    • 3、在flume 的job里面新增分类文件如下
    • 4、启动两个消费者
    • 5、启动flume
    • 6、开启发送数据端口

一、Flume 对 接 Kafka 1)配置 flume(flume-kafka.conf)
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F -c +0 /opt/module/data/flume.log
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 
hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) 启动 kafkaIDEA 消费者 3) 进入 flume 根目录下,启动 flume
$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
4) 向 /opt/module/data/flume.log 里追加数据,查看 kafka 消费者消费情况
$ echo hello >> /opt/module/data/flume.log
二、为什么要kafka对接Flume 1、问题

采集日志给多个人使用
如果使用flume、那就的再多加一个channel、不能动态加业务线
增加业务线动态增加(类似消费者可以动态增加、副本数不变)

三、kafka对接Flume (数据分类) 1、编码

监听头部信息 headers.put(“topic”, “first”);

package org.example.interceptor;


import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;


public class TypeInterceptor implements Interceptor {


    //声明一个集合、用于存放拦截器处理后的事件
    private List addHeaderEvents;

    @Override
    public void initialize() {
        //初始化集合用于存放拦截器处理后的事件
        addHeaderEvents = new ArrayList<>();
    }


    
    @Override
    public Event intercept(Event event) {
        //1、获取事件中的头部信息 header & body
        Map headers = event.getHeaders();

        //获取事件中的body信息
        String body = new String(event.getBody());

        //根据body中是否有hello 来决定添加怎样的头部信息
        if (body.contains("hello")) {
            headers.put("topic", "first");

        } else {
            headers.put("topic", "second");
        }

        //返回数据
        return event;
    }


    
    @Override
    public List intercept(List list) {
        //清空集合
        addHeaderEvents.clear();
        for (Event event : list) {
            //交给单个Event 处理
            addHeaderEvents.add(intercept(event));
        }

        //返回数据
        return addHeaderEvents;
    }

    @Override
    public void close() {


    }


    
    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new TypeInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }

}

2、丢到服务器

位置是flume的lib下

3、在flume 的job里面新增分类文件如下


新增配置属性

#Name
a1.sources = r1
a1.channels = c1
a1.sinks = k1


# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = org.example.interceptor.TypeInterceptor$Builder


#Chabbel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
#事务容量
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


4、启动两个消费者
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic second
5、启动flume
bin/flume-ng agent -c conf/ -f job/type_kafka.conf -n a1
6、开启发送数据端口
nc localhost 44444

查看效果

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5480362.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-12
下一篇 2022-12-12

发表评论

登录后才能评论

评论列表(0条)

保存