Kafka开源代码学习之旅(四)- Producer核心流程

Kafka开源代码学习之旅(四)- Producer核心流程,第1张

Kafka开源代码学习之旅(四)- Producer核心流程 一、概述

上一P简单的阅读了下KafkaProducer 的流程,这一P主要探索异步发送消息的业务罗技

Producer流程 1.调用代码:
//todo: 异步发送
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new
DemoCallBack(startTime, messageNo,
messageStr));
2.send 方法逻辑
public Future
send(ProducerRecord record, Callback
callback) {
// intercept the record, which can be
potentially modified; this method does not
throw exceptions
ProducerRecord
interceptedRecord = this.interceptors == null
? record : this.interceptors.onSend(record);
//todo:关键代码
return doSend(interceptedRecord,
callback);
}
//底层是通过调用了doSend这个核心的方法实现消息的
异步发送
3.同步等待拉取元数据
clusterAndWaitTime =
waitOnmetadata(record.topic(),
record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new
KafkaException("Producer closed while send
in progress", e);
throw e;
}
//clusterAndWaitTime.waitedonmetadataMs 代表
的是拉取元数据用了多少时间。
//maxBlockTimeMs -用了多少时间 =
还剩余多少时间可以使用
long remainingWaitMs =
Math.max(0, maxBlockTimeMs -
clusterAndWaitTime.waitedOnmetadataMs);
Cluster cluster =
clusterAndWaitTime.cluster;
4.对消息的key和value进行序列化
byte[] serializedKey;
try {
serializedKey =
keySerializer.serialize(record.topic(),
record.headers(), record.key());
} catch (ClassCastException cce)
{
throw new
SerializationException("Can't convert key of
class " + record.key().getClass().getName()
+
" to class " +
producerConfig.getClass(ProducerConfig.KEY_S
ERIALIZER_CLASS_CONFIG).getName() +
" specified in
key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue =
valueSerializer.serialize(record.topic(),
record.headers(), record.value());
} catch (ClassCastException cce)
{
throw new
SerializationException("Can't convert value
of class " +
record.value().getClass().getName() +
" to class " +
producerConfig.getClass(ProducerConfig.VALUE
_SERIALIZER_CLASS_CONFIG).getName() +
" specified in
value.serializer", cce);
}
5.根据分区器选择消息应该发送的分区
int partition = partition(record,
serializedKey, serializedValue, cluster);
6.根据元数据来封装分区对象
tp = new TopicPartition(record.topic(),
partition);
7.判断要发送的消息有没有超过消息大小的最大值
//计算消息的大小,后面要进行大小的
判断
int serializedSize =
AbstractRecords.estimateSizeInBytesUpperBoun
d(apiVersions.maxUsableProduceMagic(),
compressionType,
serializedKey, serializedValue, headers);
//计算消息的大小,后面要进行大小的
判断
ensurevalidRecordSize(serializedSize);
8.消息绑定回调函数
Callback interceptCallback = new
InterceptorCallback<>(callback,
this.interceptors, tp);
9.消息缓存到RecordAccumulator中
RecordAccumulator.RecordAppendResult result
= accumulator.append(tp, timestamp,
serializedKey,serializedValue, headers,
interceptCallback, remainingWaitMs);
10.唤醒sender线程发送消息
// 如果批次满了,或者是新创建的一个批次
if (result.batchIsFull ||
result.newBatchCreated) {
log.trace("Waking up the
sender since topic {} partition {} is either
full or getting a new batch", record.topic(),
partition);

this.sender.wakeup();
}
三、生产者源码之生产者源码精华总结

(摘自开课吧)
总结下KafkaProducer生产者源码哪些地方值得我们学习
1、kafka网络部分的设计绝对是一个亮点,kafka自己基于NIO封装了一套自己的网络通信框架,支持一个客户端与多个broker建立连接
2、处理拆包和粘包的的思路和代码,绝对是教科书级别的,大家可以把代码复制粘贴下来直接用到自己的线上的项目去。
3、RecordAccumulator封装消息的batchs,使用的自己封装的数据结构CopyOnWriteMap,采用读写分离的思想,用来面对高并发的场景(读多,写少)提升整个流程的性能。
4、同时封装消息的时候设计的内存缓冲池,这极大的减少了GC的次数。
5、RecordAccumulator封装批次代码中采用的是分段加锁的思想,极大的高了性能,看得出来作者确实编程功底很深厚。
6、个人感觉Kafka的异常体系也是设计得比较清晰的,在核心流程捕获异常,底层抛异常。如果编程经验少一些的同学可以学习借鉴一下。
7、生产者发送消息的两种策略

Referance: Kafka源码解析与实战 - 王亮 机械工业出版社多角度剖析kafka底层源码 - 开课吧

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5575916.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-14
下一篇 2022-12-14

发表评论

登录后才能评论

评论列表(0条)

保存