1.消息:字节数据 2.键(key):可选项 3.批次:提供效率,权衡(时间延迟与吞吐量) 4.主题:(数据库中的表),分表-分区 5.分区:(分区器判断去哪个分区)相当于分类,为何分区:提高性能 6.生产者:发消息,消费者:取消息 7.偏移量:消费者取消息到哪个id 8.消费群组:一个分区被一个消费者消费,有某个消费者挂掉,则该分区由其他一个消费者消费 9.broker:独立kafka主机,一个服务器叫broker,集群:用来复制 10.优点:多生产者多消费者,基于磁盘的数据存储,高伸缩性,高性能 11.常见场景:活动跟踪,传递消息,收集指标和日志,提交日志,流处理。
2.创建kafka消费者:
2.1.初始化
Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092");//指定服务 props.put("group.id", "CountryCounter");//消费者的消费组 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//反序列化,将字节数组转换成对象 props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerconsumer = new KafkaConsumer (props);//实例化消费者对象
2.2.订阅主题
//创建完消费者后订阅主题,只需要通过调用subscribe()方法即可,该方法接收一个主题列表 consumer.subscribe(Collections.singletonList("customerCountries"));//订阅主题 //订阅多个主题,通过正则表达式来匹配多个主题,且订阅后如果又有匹配的新主题,那么这个消费组会立即对其进行消费 consumer.subscribe("test.*");
关于kafka的详细配置参考官方文档:Apache Kafka
2.3.拉取循环
(25条消息) 深入解析Kafka消费者——提交和偏移量_bobozai86的博客-CSDN博客
//只需要循环不断的拉取消息即可,kafka对外poll方法,(内部实现了协作,分区重平衡、心跳、数据拉取等功能) try { while (true) { //步骤1,不断拉取 ConsumerRecordsrecords = consumer.poll(100); //步骤2在100ms内拉取的数据返回消费者端 for (ConsumerRecord record : records) //步骤3遍历拉取的数据记录进行业务处理 { log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %sn", record.topic(), record.partition(), record.offset(), record.key(), record.value()); int updatedCount = 1; if (custCountryMap.countainsValue(record.value())) { updatedCount = custCountryMap.get(record.value()) + 1; } custCountryMap.put(record.value(), updatedCount) JSonObject json = new JSonObject(custCountryMap); System.out.println(json.toString(4)) } } } finally { consumer.close(); //步骤4停止提交offset//原因是卡在了consumer.close()方法里面,它会提交offset信息,如果网络中断或者kafka服务器有问题导致提交不了offset,则consumer.close方法会一直卡住(不停的循环尝试提交offset,永不中断) } //其中,代码中标注了几点,说明如下: //1)这个例子使用无限循环消费并处理数据,这也是使用Kafka最多的一个场景,后面我们会讨论如何更好的退出循环并关闭。 //2)这是上面代码中最核心的一行代码。我们不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡。参数值是一个超时时间,指明线程如果没有数据时等待多长时间,0表示不等待立即返回。 //3)poll()方法返回记录的列表,每条记录包含key/value以及主题、分区、位移信息。 //4)主动关闭可以使得Kafka立即进行重平衡而不需要等待会话过期。
2.4 提交(commit)与位移(offset)
当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到,kafka让消费者自身来管理消费位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit).
提交当前位移为了减少消息重复消费活着避免消息丢失,选择主动提交位移,设置auto.commit.offset为false,那么需要通过调用commitSync()来主动提交位移,会提交poll返回的最后位移。
为了避免消息丢失,在完成业务逻辑后才提交位移。
//自动提交 while (true) { ConsumerRecordsrecords = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %sn", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } try { consumer.commitSync(); } catch (CommitFailedException e) { log.error("commit failed", e) } }
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。
异步提交
//异步提交 while (true) { ConsumerRecordsrecords = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %sn", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(); }
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
//基于这种性质,一般情况下对于异步提交,我们可能会通过回调的方式记录提交结果: while (true) { ConsumerRecords混合同步提交与异步提交records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %sn", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map offsets, Exception exception) { if (e != null) log.error("Commit failed for offsets {}", offsets, e); } }); } //而如果想进行重试同时又保证提交顺序的话,一种简单的办法是使用单调递增的序号。每次发起异步提交时增加此序号,并且将此时的序号作为参数传给回调方法;当消息提交失败回调时,检查参数中的序号值与全局的序号值,如果相等那么可以进行重试提交,否则放弃(因为已经有更新的位移提交了)。
//正常情况下,偶然的提交失败并不是什么大问题,因为后续的提交成功就可以了。但是在某些情况下(例如程序退出、重平衡),我们希望最后的提交成功,因此一种非常普遍的方式是混合异步提交和同步提交,如下所示: try { while (true) { ConsumerRecords提交特定位移records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %sn", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(); } } catch (Exception e) { log.error("Unexpected error", e); } finally { try { consumer.commitSync(); } finally { consumer.close(); } } //在正常处理流程中,我们使用异步提交来提高性能,但最后使用同步提交来保证位移提交成功。
//commitSync()和commitAsync()会提交上一次poll()的最大位移,但如果poll()返回了批量消息,而且消息数量非常多,我们可能会希望在处理这些批量消息过程中提交位移,以免重平衡导致从头开始消费和处理。幸运的是,commitSync()和commitAsync()允许我们指定特定的位移参数,参数为一个分区与位移的map。由于一个消费者可能会消费多个分区,所以这种方式会增加一定的代码复杂度,如下所示: private MapcurrentOffsets = new HashMap<>(); int count = 0; .... while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %sn", record.topic(), record.partition(), record.offset(), record.key(), record.value()); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndmetadata(record.offset()+1, "no metadata")); if (count % 1000 == 0) consumer.commitAsync(currentOffsets, null); count++; } } //代码中在处理poll()消息的过程中,不断保存分区与位移的关系,每处理1000条消息就会异步提交(也可以使用同步提交)。
优雅退出
//在一般情况下,我们会在一个主线程中循环poll消息并进行处理。当需要退出poll循环时,我们可以使用另一个线程调用consumer.wakeup(),调用此方法会使得poll()抛出WakeupException。如果调用wakup时,主线程正在处理消息,那么在下一次主线程调用poll时会抛出异常。主线程在抛出WakeUpException后,需要调用consumer.close(),此方法会提交位移,同时发送一个退出消费组的消息到Kafka的组协调者。组协调者收到消息后会立即进行重平衡(而无需等待此消费者会话过期)。 //注册JVM关闭时的回调钩子,当JVM关闭时调用此钩子。 Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { System.out.println("Starting exit..."); //调用消费者的wakeup方法通知主线程退出 consumer.wakeup(); try { //等待主线程退出 mainThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }); ... try { // looping until ctrl-c, the shutdown hook will cleanup on exit while (true) { ConsumerRecordsrecords = consumer.poll(1000); System.out.println(System.currentTimeMillis() + "-- waiting for data..."); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %sn",record.offset(), record.key(), record.value()); } for (TopicPartition tp: consumer.assignment()) System.out.println("Committing offset at position:" + consumer.position(tp)); consumer.commitSync(); } } catch (WakeupException e) { // ignore for shutdown } finally { consumer.close(); System.out.println("Closed consumer and we are done"); }
3.消费者创建实例:
1.一个消费组G1里只有一个消费者
public class MyConsumer { public static void main(String[] args) { Properties prop=new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.195.20:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //latest收最新的数据 none会报错 earliest最早的数据 prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G1"); //创建消费者 KafkaConsumerconsumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("kb09two"));//订阅 while (true){ ConsumerRecords poll = consumer.poll(100); for (ConsumerRecord record : poll) { System.out.println(record.offset()+"t"+record.key()+"t"+record.value()); System.out.println(" "); } } }}
2.一个消费组G2里有多个消费者
public class MyConsumer { public static void main(String[] args) { Properties prop=new Properties(); prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.195.20:9092"); prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class); prop.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000"); prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false"); prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000"); //latest收最新的数据 none会报错 earliest最早的数据 prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); prop.put(ConsumerConfig.GROUP_ID_CONFIG,"G2");//这里改为G2 for (int i = 0; i <3 ; i++) { new Thread(new Runnable() { @Override public void run() { KafkaConsumerconsumer = new KafkaConsumer<>(prop); consumer.subscribe(Collections.singleton("kb09two"));//订阅 while (true){ ConsumerRecords poll = consumer.poll(100); for (ConsumerRecord record : poll) { System.out.println(Thread.currentThread().getName()+"t" +record.offset()+"t"+record.key()+"t"+record.value()); } } } }).start(); } } }
4.生产者实例:
调用kafka生产者Api,需要导入kafka-clients依赖
org.apache.kafka kafka-clients2.0.0
创建生产者
public class MyProducer { public static void main(String[] args) { Properties prop=new Properties(); prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.195.20:9092"); prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); prop.put(ProducerConfig.ACKS_CONFIG,"-1"); //创建生产者 KafkaProducerproducer=new KafkaProducer (prop); for (int i = 0; i <200 ; i++) { ProducerRecord producerRecord = new ProducerRecord<>("kb09two", "hello world" + i); producer.send(producerRecord); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("gg"); } }
测试:
在moba上启动zookeeper,Kafka;
1)先创建一个topic kafka-topics.sh --create --zookeeper 192.168.195.20:2181 --topic kb09two --partitions 3 --replication-factor 1 (2)查看topic kafka-topics.sh --zookeeper 192.168.195.20:2181 --topic kb09two --describe (3)启动消费者 kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kb09two --from-beginning
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)