-
路径
-
step1:构建ProducerRecord对象
-
step2:调用KafkaProducer的send方法将数据写入Kafka
-
-
实施
package bigdata.itcast.cn.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerTestClient { public static void main(String[] args) { // todo:1-构建连接 // 构建一个配置对象 Properties props = new Properties(); //指定Kafka服务端地址 props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.put("acks", "all"); //定义写入Kafka的KV序列化的类型 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 构建一个连接对象,指定KV的类型,加载配置 KafkaProducer
producer = new KafkaProducer<>(props); // todo:2-实现 *** 作 for (int i = 0; i < 10; i++){ //调用连接对象方法将数据写入Kafka //ProducerRecord:每一条写入的数据必须封装为ProducerRecord对象 //方式一:指定Topic、Key、Value // producer.send(new ProducerRecord ("bigdata01", i+"", "itcast"+i)); //方式二:指定Topic、Value // producer.send(new ProducerRecord ("bigdata01", "itcast"+i)); //方式三:指定Topic、Partition、Key、Value producer.send(new ProducerRecord ("bigdata01",0,i+"","itcast"+i)); } // todo:3-释放连接 producer.close(); } }
-
路径
-
step1:消费者订阅Topic
-
step2:调用poll方法从Kafka中拉取数据,获取返回值
-
step3:从返回值中输出:Topic、Partition、Offset、Key、Value
-
-
实施
-
package bigdata.itcast.cn.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerTestClient { public static void main(String[] args) { // todo:1-构建消费者连接 Properties props = new Properties(); // 指定服务端地址 props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); // 指定这个消费者属于哪个消费者组,每个消费者组都会有1个id props.setProperty("group.id", "test01"); // 开启自动提交 props.setProperty("enable.auto.commit", "true"); // 自动提交的时间间隔 props.setProperty("auto.commit.interval.ms", "1000"); // 消费者读取数据KV的反序列化类型 props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 构建一个消费者连接对象,指定KV类型以及加载配置 KafkaConsumer
consumer = new KafkaConsumer<>(props); // todo:2-订阅、消费、处理 //订阅Topic consumer.subscribe(Arrays.asList("bigdata01")); //消费&处理 while (true) { // 消费:从Kafka拉取数据,拉取到的所有数据放在ConsumerRecords集合对象中 ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // 处理:取出每一条数据 for (ConsumerRecord record : records){ String topic = record.topic();//这条数据属于哪个topic int part = record.partition();//这条数据属于这个Topic的哪个分区 long offset = record.offset();//这条数据在这个分区中的offset是什么 String key = record.key();//获取数据中的Key String value = record.value();//获取数据中的Value //输出 System.out.println(topic+"t"+part+"t"+offset+"t"+key+"t"+value); } } // todo:3-实时计算中:消费者是不停的 } }
-
路径
-
step1:消费者是如何消费Topic中的数据的?
-
step2:如果消费者故障重启,消费者怎么知道自己上次消费的位置的?
-
-
实施
-
Kafka中消费者消费数据的规则
-
消费者消费Kafka中的Topic根据每个分区的Offset进行消费,每次从上一次的位置继续消费
-
第一次消费规则:由属性决定
auto.offset.reset = latest | earliest | none latest:默认的值,从Topic每个分区的最新的位置开始消费 earliest:从最早的位置开始消费,每个分区的offset为0开始消费 none:如果是第一次消费,这个属性为none,Kafka会抛出异常 如果不是第一次消费:上面这个属性不起作用
-
第二次消费开始:根据上一次消费的Offset位置+1继续进行消费
-
-
假如上一次消费到的offset=3,那么第二次就该消费offset+1的数据了。
-
-
问题1:消费者如何知道上一次消费的位置是什么?
-
消费者每次成功消费会在自己的内存中记录offset的值,下次直接请求上一次消费的位置
-
Consumer Offset:表示当前消费者组已经消费到的位置
-
Commit Offset:表示下一次要消费的位置,等于Consumer Offset+1
-
-
问题2
-
只有1个消费:如果因为一些原因,消费者故障了,重启消费者,相当于一个新的消费者,原来内存中offset就没有了,消费者怎么知道上一次消费的位置?
-
如果多个消费者:因为一些原因,某个消费者故障,这个消费者本来负责的分区,Kafka会交给别的消费者来继续消费,这个接盘侠它怎么知道之前的那个消费者消费到的位置?
-
如果不知道上一次的位置,就无法接着上次的位置继续消费
-
要么重头消费:数据重复
-
要么最新消费:数据丢失
-
-
原因:offset只放在内存中,进程故障,内存的数据丢失,Offset也就丢失了
-
解决:将offset持久化存储并且共享
-
-
-
Kafka Offset偏移量管理
-
Kafka将每个消费者消费的Commit Offset主动记录在一个Topic中:__consumer_offsets
-
如果下次消费者没有给定请求offset,kafka就根据自己记录的offset来提供消费的位置
-
-
提交的规则:根据时间自动提交
props.setProperty("enable.auto.commit", "true");//是否自动提交offset props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
-
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)