除了正常的消息发送和消费,在使用 Kafka 的过程中难免会遇到一些其他高级应用类的需求,比如消费回溯,这个可以通过原生 Kafka 提供的 KafkaConsumer.seek() 方法来实现,然而类似延时队列、消息轨迹等应用需求在原生 Kafka 中就没有提供了。我们在使用其他消息中间件时,比如 RabbitMQ,使用到了延时队列、消息轨迹的功能,如果我们将应用直接切换到 Kafka 中,那么只能选择舍弃它们。但这也不是绝对的,我们可以通过一定的手段来扩展 Kafka,从本节开始讲述的就是如何实现这类扩展的高级应用。
过期时间(TTL)我们在《图解Kafka之实战指南》中讲述消费者拦截器用法的时候就使用了消息 TTL(Time To Live,过期时间),其中通过消息的 timestamp 字段和 ConsumerInterceptor 接口的 onConsume() 方法来实现消息的 TTL 功能。
消息超时之后不是只能如案例中的那样被直接丢弃,因为从消息可靠性层面而言这些消息就丢失了,消息超时可以配合死信队列(后面会讲到)使用,这样原本被丢弃的消息可以被再次保存起来,方便应用在此之后通过消费死信队列中的消息来诊断系统的运行概况。
案例中有一个局限,就是每条消息的超时时间都是一样的,都是固定的 EXPIRE_INTERVAL 值的大小。如果要实现自定义每条消息TTL的功能,那么应该如何处理呢?
这里还可以沿用消息的 timestamp 字段和拦截器 ConsumerInterceptor 接口的 onConsume() 方法,不过我们还需要消息中的 headers 字段来做配合。我们可以将消息的 TTL 的设定值以键值对的形式保存在消息的 headers 字段中,这样消费者消费到这条消息的时候可以在拦截器中根据 headers 字段设定的超时时间来判断此条消息是否超时,而不是根据原先固定的 EXPIRE_INTERVAL 值来判断。
下面我们来通过一个具体的示例来演示自定义消息 TTL 的实现方式。这里使用了消息的 headers 字段,而 headers 字段涉及 Headers 和 Header 两个接口,Headers 是对多个 Header 的封装,Header 接口表示的是一个键值对,具体实现如下:
package org.apache.kafka.common.header; public interface Header { String key(); byte[] value(); }
我们可以自定义实现 Headers 和 Header 接口,但这样未免过于烦琐,这里可以直接使用 Kafka 提供的实现类 org.apache.kafka.common.header.internals.RecordHeaders 和 org.apache.kafka.common.header.internals.RecordHeader。这里只需使用一个 Header,key 可以固定为“ttl”,而 value 用来表示超时的秒数,超时时间一般用 Long 类型表示,但是 RecordHeader 中的构造方法 RecordHeader(String key, byte[] value) 和 value() 方法的返回值对应的 value 都是 byte[] 类型,这里还需要一个小工具实现整型类型与 byte[] 的互转,具体实现如下:
public class BytesUtils { public static byte[] longToBytes(long res) { byte[] buffer = new byte[8]; for (int i = 0; i < 8; i++) { int offset = 64 - (i + 1) * 8; buffer[i] = (byte) ((res >> offset) & 0xff); } return buffer; } public static long bytesToLong(byte[] b) { long values = 0; for (int i = 0; i < 8; i++) { values <<= 8; values|= (b[i] & 0xff); } return values; } }
下面我们向 Kafka 中发送3条 TTL 分别为20秒、5秒和30秒的3条消息,主要代码如代码清单18-1所示。
代码清单18-1 发送自定义TTL消息的主要代码 ProducerRecordrecord1 = new ProducerRecord<>(topic, 0, System.currentTimeMillis(), null, "msg_ttl_1",new RecordHeaders().add(new RecordHeader("ttl", BytesUtils.longToBytes(20)))); ProducerRecord record2 = //超时的消息 new ProducerRecord<>(topic, 0, System.currentTimeMillis()-5*1000, null, "msg_ttl_2",new RecordHeaders().add(new RecordHeader("ttl", BytesUtils.longToBytes(5)))); ProducerRecord record3 = new ProducerRecord<>(topic, 0, System.currentTimeMillis(), null, "msg_ttl_3",new RecordHeaders().add(new RecordHeader("ttl", BytesUtils.longToBytes(30)))); producer.send(record1).get(); producer.send(record2).get(); producer.send(record3).get();
ProducerRecord 中包含 Headers 字段的构造方法只有2个,具体如下:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterableheaders) public ProducerRecord(String topic, Integer partition, K key, V value, Iterable headers)
代码清单18-1中指定了分区编号为0和消息 key 的值为 null,其实这个示例中我们并不需要指定这2个值,但是碍于 ProducerRecord 中只有2种与 Headers 字段有关的构造方法。其实完全可以扩展 ProducerRecord 中的构造方法,比如添加下面这个方法:
//add by myself public ProducerRecord(String topic, Long timestamp, V value, Iterableheaders) { this(topic, null, timestamp, null, value, headers); }
这样就可以修改代码清单18-1中 ProducerRecord 的构建方式,类似下面这种写法:
ProducerRecordrecord1 = new ProducerRecord<>(topic, System.currentTimeMillis(), "msg_ttl_1", new RecordHeaders().add(new RecordHeader("ttl", BytesUtils.longToBytes(20))));
回归正题,很显然代码清单18-1中的第2条消息 record2 是故意被设定为超时的,因为这条消息的创建时间为 System.currentTimeMillis()-5×1000,往前推进了5秒,而这条消息的超时时间也为5秒。如果在发送这3条消息的时候也开启了消费者,那么经过拦截器处理后应该只会收到“msg_ttl_1”和“msg_ttl_3”这两条消息。
我们再来看一下经过改造之后拦截器的具体实现,如代码清单18-2所示。
代码清单18-2 自定义TTL的拦截器关键代码实现 @Override public ConsumerRecordsonConsume( ConsumerRecords records) { long now = System.currentTimeMillis(); Map >> newRecords = new HashMap<>(); for (TopicPartition tp : records.partitions()) { List > tpRecords = records.records(tp); List > newTpRecords = new ArrayList<>(); for (ConsumerRecord record : tpRecords) { Headers headers = record.headers(); long ttl = -1; for (Header header : headers) {//判断headers中是否有key为“ttl”的Header if (header.key().equalsIgnoreCase("ttl")) { ttl = BytesUtils.bytesToLong(header.value()); } } //消息超时判定 if (ttl > 0 && now - record.timestamp() < ttl * 1000) { newTpRecords.add(record); } else {//没有设置TTL,不需要超时判定 newTpRecords.add(record); } } if (!newTpRecords.isEmpty()) { newRecords.put(tp, newTpRecords); } } return new ConsumerRecords<>(newRecords); }
代码清单18-2中判断每条消息的 headers 字段中是否包含 key 为“ttl”的 Header,如果包含则对其进行超时判定;如果不包含,则不需要超时判定,即无须拦截处理。
使用这种方式实现自定义消息 TTL 时同样需要注意的是:使用类似中这种带参数的位移提交的方式,有可能会提交错误的位移信息。在一次消息拉取的批次中,可能含有最大偏移量的消息会被消费者拦截器过滤。不过这个也很好解决,比如在过滤之后的消息集中的头部或尾部设置一个状态消息,专门用来存放这一批消息的最大偏移量。
到目前为止,无论固定消息 TTL,还是自定义消息 TTL,都是在消费者客户端通过拦截器来实现的,其实这个功能也可以放在 Kafka 服务端来实现,而且具体实现也并不太复杂。不过这样会降低系统的灵活性和扩展性,并不建议这么做,通过扩展客户端就足以应对此项功能。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)