上一P简单的阅读了下KafkaProducer 的流程,这一P主要探索异步发送消息的业务罗技
Producer流程 1.调用代码://todo: 异步发送 producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr));2.send 方法逻辑
public Future3.同步等待拉取元数据send(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); //todo:关键代码 return doSend(interceptedRecord, callback); } //底层是通过调用了doSend这个核心的方法实现消息的 异步发送
clusterAndWaitTime = waitOnmetadata(record.topic(), record.partition(), maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } //clusterAndWaitTime.waitedonmetadataMs 代表 的是拉取元数据用了多少时间。 //maxBlockTimeMs -用了多少时间 = 还剩余多少时间可以使用 long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnmetadataMs); Cluster cluster = clusterAndWaitTime.cluster;4.对消息的key和value进行序列化
byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_S ERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE _SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); }5.根据分区器选择消息应该发送的分区
int partition = partition(record, serializedKey, serializedValue, cluster);6.根据元数据来封装分区对象
tp = new TopicPartition(record.topic(), partition);7.判断要发送的消息有没有超过消息大小的最大值
//计算消息的大小,后面要进行大小的 判断 int serializedSize = AbstractRecords.estimateSizeInBytesUpperBoun d(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); //计算消息的大小,后面要进行大小的 判断 ensurevalidRecordSize(serializedSize);8.消息绑定回调函数
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);9.消息缓存到RecordAccumulator中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);10.唤醒sender线程发送消息
// 如果批次满了,或者是新创建的一个批次 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); }三、生产者源码之生产者源码精华总结
(摘自开课吧)
总结下KafkaProducer生产者源码哪些地方值得我们学习
1、kafka网络部分的设计绝对是一个亮点,kafka自己基于NIO封装了一套自己的网络通信框架,支持一个客户端与多个broker建立连接
2、处理拆包和粘包的的思路和代码,绝对是教科书级别的,大家可以把代码复制粘贴下来直接用到自己的线上的项目去。
3、RecordAccumulator封装消息的batchs,使用的自己封装的数据结构CopyOnWriteMap,采用读写分离的思想,用来面对高并发的场景(读多,写少)提升整个流程的性能。
4、同时封装消息的时候设计的内存缓冲池,这极大的减少了GC的次数。
5、RecordAccumulator封装批次代码中采用的是分段加锁的思想,极大的高了性能,看得出来作者确实编程功底很深厚。
6、个人感觉Kafka的异常体系也是设计得比较清晰的,在核心流程捕获异常,底层抛异常。如果编程经验少一些的同学可以学习借鉴一下。
7、生产者发送消息的两种策略
Referance: Kafka源码解析与实战 - 王亮 机械工业出版社多角度剖析kafka底层源码 - 开课吧
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)