Kafka3.0学习—外部系统集成

Kafka3.0学习—外部系统集成,第1张

Kafka3.0学习—外部系统集成
  • 一、集成Flume
    • 1.Flume生产者
    • 2.Flume消费者
  • 二、集成Flink
    • 1.Flink生产者
    • 2.Flink消费者
  • 三、集成SpringBoot
    • 1.生产者
    • 2.消费者

一、集成Flume

Flume经常和Kafka一起使用,即可以作为生产者将数据发送到Kafka中,也可以作为消费者从Kafka中获取数据。

1.Flume生产者

场景:
使用Flume监控Linux某个文件夹下的文件,然后将监控到的数据发送到Kafka中。

这里最主要的就是Flume的配置:

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

直接在Flume中运行这个配置文件即可,然后在/opt/module/applog目录下对文件新增内容,Kafka就可以接收到消息。

2.Flume消费者

场景:
Kafka中的某个主题需要通过Flume传输到别的地方比如打印到控制台。

最重要的也是配置Flume:

# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger

启动Flume和Kafka的生产者,向Kafka中输入数据,就可以在控制台看到打印的数据。

二、集成Flink

在Flink中经常需要和Kafka配合使用,当然Flink也可以作为Kafka的生产者和消费者。

1.Flink生产者

代码如下:

public class FlinkKafkaProducer1 {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        // 1 读取集合中数据
        ArrayList<String> wordsList = new ArrayList<>();
        wordsList.add("hello");
        wordsList.add("world");
        DataStreamSource<String> streamSource = env.fromCollection(wordsList);

        //2 创建Kafka生产者
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>("first",new SimpleStringSchema(),properties);

        streamSource.addSink(kafkaProducer);

        env.execute();
    }
}

2.Flink消费者

代码如下:

public class FlinkKafkaConsumer1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);


        //创建Kafka消费者
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("first",new SimpleStringSchema(),properties);

        DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);

        streamSource.print();

        env.execute();
    }
}

这里不写group.id不会报错,最好还是写上。

三、集成SpringBoot

SpringBoot也可以用于Kafka的生产者和消费者。

1.生产者

SpringBoot中使用Kafka模板类KafkaTemplate来向Kafka发送消息,主要代码:

@RestController
public class ProducerController {

    //Kafka模板用来向Kafka发送数据
    @Autowired
    KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping("/ssl")
    public String date(String msg){
        kafkaTemplate.send("first",msg);
        return "ok";
    }
}

SpringBoot的配置文件如下:

#SpringBoot应用名称
spring.application.name=ssl_springboot_kafka

#Kafka的连接地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

#Key和Value的序列化方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
2.消费者

SpringBoot主要使用@KafkaListener这个注解来实现对Kafka的消费,主要代码如下:

@Configuration
public class KafkaConsumer {
    
    //指定要坚挺的topic
    @KafkaListener(topics = "first")
    public void consumerTopic(String msg){
        System.out.println("收到的信息: "+msg);
    }
}

SpringBoot消费者配置文件内容如下:

#SpringBoot应用名称
spring.application.name=ssl_springboot_kafka

#Kafka的连接地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

#Key和Value的反序列化方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#指定消费者组的id
spring.kafka.consumer.group-id=ssl

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/871135.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-13
下一篇 2022-05-13

发表评论

登录后才能评论

评论列表(0条)

保存