【源码篇】Kafka客户端发送消息

【源码篇】Kafka客户端发送消息,第1张

在上一篇文章中,已经介绍了初始化 KafkaProducer 基本流程。当客户端对 KafkaProducer 完成完成后,可以调用 send() 方法将数据发送至kafka broker集群。

图中描述了当客户端调用#send方法后的一系列逻辑处理。本文将主要对客户端发送消息的主要流程进行梳理,核心区域(元数据获取、分区 *** 作、Re cordAccumlator、Sender线程唤醒)等核心代码会在后续的文章中单独分析。

获取集群元数据

当向broker中发送数据时,极其重要的一步是获取到集群的元数据信息。以至于我们能确定当前待发消息的 Topic 对应集群节点、分区等分布情况,因此在Kafka内部采取了同步等待的方式去获取。

try {
    // 等待集群元数据{clusterInstance,更新元数据的时间}
    clusterAndWaitTime = waitOnMetadata(
            record.topic(),         // 消息
            record.partition(),     // 分区
            nowMs,                  // 当前时间
            maxBlockTimeMs          // 等待元数据更新的最长时间
    );
} catch (KafkaException e) {
    if (metadata.isClosed())
        throw new KafkaException("Producer closed while send in progress", e);
    throw e;
}
消息序列化

获取到集群元数据后,消息传递需要跨网络传输。

/* Key、Value序列化 *** 作 */
byte[] serializedKey;
try {
    serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
    throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
            " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
            " specified in key.serializer", cce);
}
byte[] serializedValue;
try {
    serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
    throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
            " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
            " specified in value.serializer", cce);
}
计算分区位置

Kafka 是一款高性能分布式消息系统,分区可以实现负载均衡,对于消费端可以提高并发度来提高效率。对于客户端来说需要确定好发送消息分区的位置。

/* 计算消息插入的分区位置(指定优先) */
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);// Topic和分区对象
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    /*
     * 如果指定分区,按照指定分区配置
     * 未指定分区,按照粘性分区处理(随机地选择一个分区并会尽可能地坚持使用该分区)
     */
    return partition != null ?
            partition :
            partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

粘性分区器:通过选择单个分区来发送所有非键记录,解决了将没有键的记录分散成较小批次的问题。 一旦该分区的批次被填满或以其他方式完成,粘性分区程序会随机选择并“粘”到一个新分区。 这样,在更长的时间内,记录大致均匀地分布在所有分区中,同时获得更大批量的额外好处。

消息大小计算验证

max.request.size 限制单条消息最大(默认1M),buffer.memory限制缓存区大小(默认32M)。先计算序列化后的消息大小,再确定消息是否有超过限制大小。

// 计算序列化后的消息大小
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(
        apiVersions.maxUsableProduceMagic(),
        compressionType,
        serializedKey,
        serializedValue,
        headers);

// 验证消息是否过大,保证数据大小能够传输(序列化后的  压缩后的)
ensureValidRecordSize(serializedSize);
private void ensureValidRecordSize(int size) {
    // 单条信息最大值 maxRequestSize 1m
    if (size > maxRequestSize)
        throw new RecordTooLargeException("The message is " + size +
                " bytes when serialized which is larger than " + maxRequestSize + ", which is the value of the " +
                ProducerConfig.MAX_REQUEST_SIZE_CONFIG + " configuration.");
    // totalMemorySize  缓存大小 默认32m
    if (size > totalMemorySize)
        throw new RecordTooLargeException("The message is " + size +
                " bytes when serialized which is larger than the total memory buffer you have configured with the " +
                ProducerConfig.BUFFER_MEMORY_CONFIG +
                " configuration.");
}
消息缓存器追加消息

消息传递不是直接由客户端发送到对应的 Broker 服务器上,会经过客户端的缓存达到配置指定的batch.size(默认16k)批次大小后,再一批次一批次到服务器。

linger.ms 控制消息刷新时间间隔。同时设置batch.size和 linger.ms,哪个条件先满足都会将消息发送出去

/*  向accumulator缓存中追加数据。result表示是否添加成功的结果 */
RecordAccumulator.RecordAppendResult result = accumulator.append(
    tp,                     // 这条记录被发送到的主题/分区
    timestamp,              // 记录的时间戳
    serializedKey,          // 序列化key
    serializedValue,        // 序列化value
    headers,                // 记录header
    interceptCallback,      // 请求完成时,执行的用户提供的回调
    remainingWaitMs,        // 剩余最大阻塞时间 =(总共 - 已用)
    true,   // 保证可以添加一个新的批次,如果正在运行的批次无法找到对应的Topic+分区
    nowMs);                 // 当前时间
唤醒Sender线程

当消息已经添加到消息缓存区 RecordAccumulator 不是真正的把消息发送到 Broker,在初始化 KafkaProducer 可以看到初始化的时候有一个后台线程 Sender 专门用于将缓存区数据发送到服务器。当消息批次大小满足或者是一个新批次建立起来,我们需要唤醒 Sender 线程。

// 批次大小已经满了,或者一个新批次创建
if (result.batchIsFull || result.newBatchCreated) {
    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
    // 唤醒发送线程
    this.sender.wakeup();
}

最后,盗用大哥一张图,读者更幸福!

专注于java大数据领域离线、实时技术干货定期分享!个人网站 www.lllpan.top

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/723109.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-26
下一篇 2022-04-26

发表评论

登录后才能评论

评论列表(0条)

保存