如何让Kafka在保证高性能、高吞吐的同时通过各种机制来保证高可用性?

如何让Kafka在保证高性能、高吞吐的同时通过各种机制来保证高可用性?,第1张

如何让Kafka在保证高性能、高吞吐的同时通过各种机制来保证高可用性? 二、事务 1.场景

幂等性并不能跨多个分区运作,而事务可以弥补这个缺憾,**事务可以保证对多个分区写入 *** 作的原子性。** *** 作的原子性是指多个 *** 作要么全部成功,要么全部失败,不存在部分成功部分失败的可能。

为了实现事务,应用程序必须提供唯一的transactionalId,这个参数通过客户端程序来进行设定。

见代码库:com.heima.kafka.chapter7.ProducerTransactionSend

properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);

2.前期准备

事务要求生产者开启幂等性特性,因此通过将transactional.id参数设置为非空从而开启事务特性的同时需要将ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG设置为true(默认值为true),如果显示设置为false,则会抛出异常。

KafkaProducer提供了5个与事务相关的方法,详细如下:

//初始化事务,前提是配置了transactionalId
public void initTransactions()
//开启事务
public void beginTransaction()
//为消费者提供事务内的位移提交 *** 作
public void sendOffsetsToTransaction(Map offsets, String consumerGroupId)
//提交事务
public void commitTransaction()
//终止事务,类似于回滚
public void abortTransaction()

3.案例解析

见代码库:com.heima.kafka.chapter7.ProducerTransactionSend

消息发送端


public class ProducerTransactionSend {
public static final String topic = “topic-transaction”;
public static final String brokerList = “localhost:9092”;
public static final String transactionId = “transactionId”;

public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);

KafkaProducer producer = new KafkaProducer<> (properties);

producer.initTransactions();
producer.beginTransaction();

try {
//处理业务逻辑并创建ProducerRecord
ProducerRecord record1 = new ProducerRecord<>(topic, “msg1”);
producer.send(record1);
ProducerRecord record2 = new ProducerRecord<>(topic, “msg2”);
producer.send(record2);
ProducerRecord record3 = new ProducerRecord<>(topic, “msg3”);
producer.send(record3);
//处理一些其它逻辑
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.abortTransaction();
}
producer.close();
}
}

模拟事务回滚案例

try {
//处理业务逻辑并创建ProducerRecord
ProducerRecord record1 = new ProducerRecord<>(topic, “msg1”);
producer.send(record1);

//模拟事务回滚案例
System.out.println(1/0);

ProducerRecord record2 = new ProducerRecord<>(topic, “msg2”);
producer.send(record2);
ProducerRecord record3 = new ProducerRecord<>(topic, “msg3”);
producer.send(record3);
//处理一些其它逻辑
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.abortTransaction();
}

从上面案例中,msg1发送成功之后,出现了异常事务进行了回滚,则msg1消费端也收不到消息

三、控制器

在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。

Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:

1.ZooInspector管理
  • 使用zookeeper图形化的客户端工具(ZooInspector)提供的jar来进行管理,启动如下:
  1. 定位到jar所在目录
  2. 运行jar文件 java -jar zookeeper-dev-ZooInspector.jar
  3. 连接Zookeeper

{“version”:1,“brokerid”:0,“timestamp”:“1529210278988”}


其中version在目前版本中固定为1,brokerid表示称为控制器的broker的id编号,timestamp表示竞选称为控制器时的时间戳。

在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取**/controller节点**的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。

Zookeeper中还有一个与控制器有关的/controller_epoch节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为**“控制器的纪元”**。

controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,没选出一个新的控制器就将该字段值加1。每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。由此可见,Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关 *** 作的一致性。

具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:

  1. 监听partition相关的变化。
  2. 监听topic相关的变化。
  3. 监听broker相关的变化。
  4. 从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。
四、可靠性保证
  1. 可靠性保证:确保系统在各种不同的环境下能够发生一致的行为
  2. Kafka的保证
  • 保证分区消息的顺序
  • 如果使用同一个生产者往同一个分区写入消息,而且消息B在消息A之后写入
  • 那么Kafka可以保证消息B的偏移量比消息A的偏移量大,而且消费者会先读取消息A再读取消息B
  • 只有当消息被写入分区的所有同步副本时(文件系统缓存),它才被认为是已提交
  • 生产者可以选择接收不同类型的确认,控制参数 acks
  • 只要还有一个副本是活跃的,那么已提交的消息就不会丢失
  • 消费者只能读取已经提交的消息
1. 失效副本

怎么样判定一个分区是否有副本是处于同步失效状态的呢?从Kafka 0.9.x版本开始通过唯一的一个参数replica.lag.time.max.ms(默认大小为10,000)来控制,当ISR中的一个follower副本滞后leader副本的时间超过参数replica.lag.time.max.ms指定的值时即判定为副本失效,需要将此follower副本剔出除ISR之外。具体实现原理很简单,当follower副本将leader副本的LEO(Log End Offset,每个分区最后一条消息的位置)之前的日志全部同步时,则认为该follower副本已经追赶上leader副本,此时更新该副本的lastCaughtUpTimeMs标识。Kafka的副本管理器(ReplicaManager)启动时会启动一个副本过期检测的定时任务,而这个定时任务会定时检查当前时间与副本的lastCaughtUpTimeMs差值是否大于参数replica.lag.time.max.ms指定的值。千万不要错误的认为follower副本只要拉取leader副本的数据就会更新lastCaughtUpTimeMs,试想当leader副本的消息流入速度大于follower副本的拉取速度时,follower副本一直不断的拉取leader副本的消息也不能与leader副本同步,如果还将此follower副本置于ISR中,那么当leader副本失效,而选取此follower副本为新的leader副本,那么就会有严重的消息丢失。

2.副本复制

Kafka 中的每个主题分区都被复制了 n 次,其中的 n 是主题的复制因子(replication factor)。这允许Kafka 在集群服务器发生故障时自动切换到这些副本,以便在出现故障时消息仍然可用。Kafka 的复制是以分区为粒度的,分区的预写日志被复制到 n 个服务器。 在 n 个副本中,一个副本作为 leader,其他副本成为 followers。顾名思义,producer 只能往 leader 分区上写数据(读也只能从 leader 分区上进行),followers 只按顺序从 leader 上复制日志。

一个副本可以不同步Leader有如下几个原因 慢副本:在一定周期时间内follower不能追赶上leader。最常见的原因之一是I / O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。 卡住副本:在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。

新启动副本:当用户给主题增加副本因子时,新的follower不在同步副本列表中,直到他们完全赶上了leader日志。

如何确定副本是滞后的

replica.lag.max.messages=4


在服务端现在只有一个参数需要配置replica.lag.time.max.ms。这个参数解释replicas响应partition leader的最长等待时间。检测卡住或失败副本的探测——如果一个replica失败导致发送拉取请求时间间隔超过replica.lag.time.max.ms。Kafka会认为此replica已经死亡会从同步副本列表从移除。检测慢副本机制发生了变化——如果一个replica开始落后leader超过replica.lag.time.max.ms。Kafka会认为太缓慢并且会从同步副本列表中移除。除非replica请求leader时间间隔大于replica.lag.time.max.ms,因此即使leader使流量激增和大批量写消息。Kafka也不会从同步副本列表从移除该副本。

1.Leader Epoch引用

数据丢失场景

数据出现不一致场景

2.Kafka 0.11.0.0.版本解决方案

造成上述两个问题的根本原因在于HW值被用于衡量副本备份的成功与否以及在出现failture时作为日志截断的依据,但HW值的更新是异步延迟的,特别是需要额外的FETCH请求处理流程才能更新,故这中间发生的任何崩溃都可能导致HW值的过期。鉴于这些原因,Kafka 0.11引入了leader epoch来取代HW值。Leader端多开辟一段内存区域专门保存leader的epoch信息,这样即使出现上面的两个场景也能很好地规避这些问题。

所谓leader epoch实际上是一对值:(epoch,offset)。epoch表示leader的版本号,从0开始,当leader变更过1次时epoch就会+1,而offset则对应于该epoch版本的leader写入第一条消息的位移。因此假设有两对值:

  1. (0, 0)
  2. (1, 120)

则表示第一个leader从位移0开始写入消息;共写了120条[0, 119];而第二个leader版本号是1,从位移120处开始写入消息。

leader broker中会保存这样的一个缓存,并定期地写入到一个checkpoint文件中。

避免数据丢失:

避免数据不一致
0开始写入消息;共写了120条[0, 119];而第二个leader版本号是1,从位移120处开始写入消息。

leader broker中会保存这样的一个缓存,并定期地写入到一个checkpoint文件中。

避免数据丢失:
[外链图片转存中…(img-BuxUJWDR-1640195980373)]
避免数据不一致

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5680666.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存