Kafka学习之事务

Kafka学习之事务,第1张

Kafka 事务

Kafka 事务
Kafka 从 0.11 版本开始引入了事务支持。事务可以保证 Kafka 在 Exactly Once 语义的基础上,生产和消费可以跨分区和会话,要么全部成功,要么全部失败。

Producer 事务
为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID 和Transaction ID 绑定。这样当Producer 重启后就可以通过正在进行的 TransactionID 获得原来的PID。
为了管理Transaction,Kafka 引入了一个新的组件 Transaction Coordinator。Producer 就是通过和Transaction Coordinator 交互获得 Transaction ID 对应的任务状态。TransactionCoordinator 还负责将事务所有写入 Kafka 的一个内部 Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

Consumer 事务
上述事务机制主要是从 Producer 方面考虑,对于 Consumer 而言,事务的保证就会相对较弱,尤其时无法保证 Commit 的信息被精确消费。这是由于 Consumer 可以通过 offset 访问任意信息,而且不同的 Segment File 生命周期不同,同一事务的消息可能会出现重启后被删除的情况。

Kafka API

Producer API 消息发送流程
Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程,以及一个线程共享变量RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。相关参数:
batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。

异步发送 API

1)导入依赖

org.apache.kafka
kafka-clients
0.11.0.0

2)编写代码,需要用到的类:
KafkaProducer:需要创建一个生产者对象,用来发送数据
ProducerConfig:获取所需的一系列配置参数
ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象

不带回调函数的 API

package com.atguigu.kafka;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
 public static void main(String[] args) throws ExecutionException,
 InterruptedException {
 Properties props = new Properties();
 //kafka 集群,broker-list
 props.put("bootstrap.servers", "node4:9092");
 //ACK应答级别
 props.put("acks", "all");
 //重试次数
 props.put("retries", 1);
 //批次大小,超过即发
 props.put("batch.size", 16384);
 //等待时间,超过即发
 props.put("linger.ms", 1);
 //RecordAccumulator 缓冲区大小 32M
 props.put("buffer.memory", 33554432);
 //key:values的序列化类
 props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
 //创建生产者对象
 Producer producer = new 
KafkaProducer<>(props);
 for (int i = 0; i < 100; i++) {
 //发送数据
 producer.send(new ProducerRecord("first", 
Integer.toString(i), Integer.toString(i)));
 }
 //关闭掉资源
 producer.close();
 } }

带回调函数的 API
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package com.atguigu.kafka;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, 
InterruptedException {
 Properties props = new Properties();
 props.put("bootstrap.servers", "node4:9092");//kafka 集群,broker-list
 props.put("acks", "all");
 props.put("retries", 1);//重试次数
 props.put("batch.size", 16384);//批次大小
 props.put("linger.ms", 1);//等待时间
 props.put("buffer.memory", 33554432);//RecordAccumulator 缓冲区大小
 props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
 Producer producer = new 
KafkaProducer<>(props);
 for (int i = 0; i < 100; i++) {
 producer.send(new ProducerRecord("first", 
Integer.toString(i), Integer.toString(i)), new Callback() {
 //回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
 @Override
 public void onCompletion(RecordMetadata metadata, 
Exception exception) {
 if (exception == null) {
 System.out.println("success->" + 
metadata.offset());
 } else {
 exception.printStackTrace();
 }
 }
 });
 }
 producer.close();
 } }

同步发送 API
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同步发送的效果,只需在调用 Future 对象的 get 方发即可。

package com.atguigu.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, 
InterruptedException {
 Properties props = new Properties();
 props.put("bootstrap.servers", "node4:9092");//kafka 集群,broker-list
 props.put("acks", "all");
 props.put("retries", 1);//重试次数
 props.put("batch.size", 16384);//批次大小
 props.put("linger.ms", 1);//等待时间
 props.put("buffer.memory", 33554432);//RecordAccumulator 缓冲区大小
 props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
 Producer producer = new 
KafkaProducer<>(props);
 for (int i = 0; i < 100; i++) {
 producer.send(new ProducerRecord("first", 
Integer.toString(i), Integer.toString(i))).get();   //区别处.get()
 }
 producer.close();
 } }

Consumer API
Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故不用担心数据丢失问题。
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。所以 offset 的维护是 Consumer 消费数据是必须考虑的问题

自动提交 offset

1)导入依赖

org.apache.kafka
kafka-clients
0.11.0.0

2)编写代码
需要用到的类:
KafkaConsumer:需要创建一个消费者对象,用来消费数据
ConsumerConfig:获取所需的一系列配置参数
ConsuemrRecord:每条数据都要封装成一个 ConsumerRecord 对象
自动提交 offset 的相关参数:
enable.auto.commit:是否开启自动提交 offset 功能
auto.commit.interval.ms:自动提交 offset 的时间间隔
以下为自动提交 offset 的代码:
package com.atguigu.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
 Properties props = new Properties();
 props.put("bootstrap.servers", "node4:9092");
 props.put("group.id", "test");
 //配置项
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "1000");
 
 props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer consumer = new 
KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("first"));
 while (true) {
 ConsumerRecords records = 
consumer.poll(100);
 for (ConsumerRecord record : records)
 System.out.printf("offset = %d, key = %s, value 
= %s%n", record.offset(), record.key(), record.value());
 }
 } }

手动提交 offset

手动提交 offset 的方法有两种:分别是 commitSync(同步提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。

同步提交 offset
package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class CustomComsumer {
 public static void main(String[] args) {
 Properties props = new Properties();
 props.put("bootstrap.servers", "node4:9092");   //Kafka 集群
 props.put("group.id", "test");   //消费者组,只要 group.id 相同,就属于同一个消费者组
 props.put("enable.auto.commit", "false");	//关闭自动提交 offset
 props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer consumer = new 
KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("first")); //消费者订阅主题
 while (true) {
//消费者拉取数据
 ConsumerRecords records = 
consumer.poll(100);
 for (ConsumerRecord record : records) {
 System.out.printf("offset = %d, key = %s, value 
= %s%n", record.offset(), record.key(), record.value());
 }
 consumer.commitSync();  //同步提交,当前线程会阻塞直到 offset 提交成功
 }
 } }

异步提交 offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。

package com.atguigu.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
public class CustomConsumer {
 public static void main(String[] args) {
 Properties props = new Properties();
 props.put("bootstrap.servers", "node4:9092");   //Kafka 集群
 props.put("group.id", "test");   //消费者组,只要 group.id 相同,就属于同一个消费者组
 props.put("enable.auto.commit", "false");    //关闭自动提交 offset
 props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer consumer = new 
KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("first"));	//消费者订阅主题
 while (true) {
 ConsumerRecords records = 
consumer.poll(100);	//消费者拉取数据
 for (ConsumerRecord record : records) {
 System.out.printf("offset = %d, key = %s, value 
= %s%n", record.offset(), record.key(), record.value());
 }
 异步提交
 	consumer.commitAsync(new OffsetCommitCallback() {
 		@Override
 		public void onComplete(Map offsets, Exception exception) {
			 if (exception != null) {
 				System.err.println("Commit failed for" + offsets);
 				}
 		}
	 });
 ///
 }
 } }

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/742764.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-28
下一篇 2022-04-28

发表评论

登录后才能评论

评论列表(0条)

保存