在上一篇文章中,已经介绍了初始化 KafkaProducer 基本流程。当客户端对 KafkaProducer 完成完成后,可以调用 send() 方法将数据发送至kafka broker集群。
图中描述了当客户端调用#send方法后的一系列逻辑处理。本文将主要对客户端发送消息的主要流程进行梳理,核心区域(元数据获取、分区 *** 作、Re cordAccumlator、Sender线程唤醒)等核心代码会在后续的文章中单独分析。
当向broker中发送数据时,极其重要的一步是获取到集群的元数据信息。以至于我们能确定当前待发消息的 Topic 对应集群节点、分区等分布情况,因此在Kafka内部采取了同步等待的方式去获取。
try {
// 等待集群元数据{clusterInstance,更新元数据的时间}
clusterAndWaitTime = waitOnMetadata(
record.topic(), // 消息
record.partition(), // 分区
nowMs, // 当前时间
maxBlockTimeMs // 等待元数据更新的最长时间
);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
消息序列化
获取到集群元数据后,消息传递需要跨网络传输。
/* Key、Value序列化 *** 作 */
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
计算分区位置
Kafka 是一款高性能分布式消息系统,分区可以实现负载均衡,对于消费端可以提高并发度来提高效率。对于客户端来说需要确定好发送消息分区的位置。
/* 计算消息插入的分区位置(指定优先) */
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);// Topic和分区对象
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
/*
* 如果指定分区,按照指定分区配置
* 未指定分区,按照粘性分区处理(随机地选择一个分区并会尽可能地坚持使用该分区)
*/
return partition != null ?
partition :
partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
消息大小计算验证粘性分区器:通过选择单个分区来发送所有非键记录,解决了将没有键的记录分散成较小批次的问题。 一旦该分区的批次被填满或以其他方式完成,粘性分区程序会随机选择并“粘”到一个新分区。 这样,在更长的时间内,记录大致均匀地分布在所有分区中,同时获得更大批量的额外好处。
max.request.size
限制单条消息最大(默认1M),buffer.memory
限制缓存区大小(默认32M)。先计算序列化后的消息大小,再确定消息是否有超过限制大小。
// 计算序列化后的消息大小
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(
apiVersions.maxUsableProduceMagic(),
compressionType,
serializedKey,
serializedValue,
headers);
// 验证消息是否过大,保证数据大小能够传输(序列化后的 压缩后的)
ensureValidRecordSize(serializedSize);
private void ensureValidRecordSize(int size) {
// 单条信息最大值 maxRequestSize 1m
if (size > maxRequestSize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
// totalMemorySize 缓存大小 默认32m
if (size > totalMemorySize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}
消息缓存器追加消息
消息传递不是直接由客户端发送到对应的 Broker 服务器上,会经过客户端的缓存达到配置指定的batch.size
(默认16k)批次大小后,再一批次一批次到服务器。
linger.ms
控制消息刷新时间间隔。同时设置batch.size和 linger.ms,哪个条件先满足都会将消息发送出去
/* 向accumulator缓存中追加数据。result表示是否添加成功的结果 */
RecordAccumulator.RecordAppendResult result = accumulator.append(
tp, // 这条记录被发送到的主题/分区
timestamp, // 记录的时间戳
serializedKey, // 序列化key
serializedValue, // 序列化value
headers, // 记录header
interceptCallback, // 请求完成时,执行的用户提供的回调
remainingWaitMs, // 剩余最大阻塞时间 =(总共 - 已用)
true, // 保证可以添加一个新的批次,如果正在运行的批次无法找到对应的Topic+分区
nowMs); // 当前时间
唤醒Sender线程
当消息已经添加到消息缓存区 RecordAccumulator 不是真正的把消息发送到 Broker,在初始化 KafkaProducer 可以看到初始化的时候有一个后台线程 Sender 专门用于将缓存区数据发送到服务器。当消息批次大小满足或者是一个新批次建立起来,我们需要唤醒 Sender 线程。
// 批次大小已经满了,或者一个新批次创建
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
// 唤醒发送线程
this.sender.wakeup();
}
最后,盗用大哥一张图,读者更幸福!
专注于java大数据领域离线、实时技术干货定期分享!个人网站 www.lllpan.top
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)