- 一、集成Flume
- 1.Flume生产者
- 2.Flume消费者
- 二、集成Flink
- 1.Flink生产者
- 2.Flink消费者
- 三、集成SpringBoot
- 1.生产者
- 2.消费者
Flume经常和Kafka一起使用,即可以作为生产者将数据发送到Kafka中,也可以作为消费者从Kafka中获取数据。
场景:
使用Flume监控Linux某个文件夹下的文件,然后将监控到的数据发送到Kafka中。
这里最主要的就是Flume的配置:
# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# 5 拼接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
直接在Flume中运行这个配置文件即可,然后在/opt/module/applog
目录下对文件新增内容,Kafka就可以接收到消息。
场景:
Kafka中的某个主题需要通过Flume传输到别的地方比如打印到控制台。
最重要的也是配置Flume:
# 1 组件定义
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# 2 配置 source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 50
a1.sources.r1.batchDurationMillis = 200
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092
a1.sources.r1.kafka.topics = first
a1.sources.r1.kafka.consumer.group.id = custom.g.id
# 3 配置 channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 4 配置 sink
a1.sinks.k1.type = logger
启动Flume和Kafka的生产者,向Kafka中输入数据,就可以在控制台看到打印的数据。
二、集成Flink在Flink中经常需要和Kafka配合使用,当然Flink也可以作为Kafka的生产者和消费者。
1.Flink生产者代码如下:
public class FlinkKafkaProducer1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
// 1 读取集合中数据
ArrayList<String> wordsList = new ArrayList<>();
wordsList.add("hello");
wordsList.add("world");
DataStreamSource<String> streamSource = env.fromCollection(wordsList);
//2 创建Kafka生产者
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>("first",new SimpleStringSchema(),properties);
streamSource.addSink(kafkaProducer);
env.execute();
}
}
2.Flink消费者
代码如下:
public class FlinkKafkaConsumer1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
//创建Kafka消费者
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("first",new SimpleStringSchema(),properties);
DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);
streamSource.print();
env.execute();
}
}
这里不写group.id不会报错,最好还是写上。
三、集成SpringBootSpringBoot也可以用于Kafka的生产者和消费者。
1.生产者SpringBoot中使用Kafka模板类KafkaTemplate来向Kafka发送消息,主要代码:
@RestController
public class ProducerController {
//Kafka模板用来向Kafka发送数据
@Autowired
KafkaTemplate<String,String> kafkaTemplate;
@RequestMapping("/ssl")
public String date(String msg){
kafkaTemplate.send("first",msg);
return "ok";
}
}
SpringBoot的配置文件如下:
#SpringBoot应用名称
spring.application.name=ssl_springboot_kafka
#Kafka的连接地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#Key和Value的序列化方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
2.消费者
SpringBoot主要使用@KafkaListener
这个注解来实现对Kafka的消费,主要代码如下:
@Configuration
public class KafkaConsumer {
//指定要坚挺的topic
@KafkaListener(topics = "first")
public void consumerTopic(String msg){
System.out.println("收到的信息: "+msg);
}
}
SpringBoot消费者配置文件内容如下:
#SpringBoot应用名称
spring.application.name=ssl_springboot_kafka
#Kafka的连接地址
spring.kafka.bootstrap-servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#Key和Value的反序列化方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#指定消费者组的id
spring.kafka.consumer.group-id=ssl
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)