作为一个分布式的消息队列,kafka的日志模块主要用来存储、读取消息,它为kafka集群的高可用、高性能提供了基础。下面结合kafka的源码分析一下日志模块的设计思路。
二、日志格式kafka的以topic为单位组织消息,为了提高系统的吞吐率,将一个topic分为N个partition。以partition为单位接收、消费消息。一个partition有多个分片,主分片接收生成者发送的消息,同时副分片从主分片出拉取消息,消费者也从主分片出消费消息。其工作原理如下图。
图1-kafka消息流转过程
kafka集群中的每个broker中都会创建对应的日志文件来存储partition中的数据,每个partition都会有一个Log对象来进行管理。Log的定义如下:
class Log(val dir: File, @volatile var config: LogConfig, @volatile var recoveryPoint: Long = 0L, scheduler: Scheduler, time: Time = SystemTime) extends Logging with KafkaMetricsGroup { //segments是一个map结构,记录了这个Log下的所有的LogSegment private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] ...... //topicAndPartition保存了topic和partition val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name) ...... }
Log中的topicAndPartition表示这个log对象对应的topic和partition,segments记录了这个log所有的LogSegment。LogSegment是Log的下一级单位,由于一个partition中的消息会非常多,所以系统不会把这些消息记录到一个文件里面,会将一个partition消息存放在多个文件,partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件
kafka是顺序写入,所以这些LogSegment是随着数据的写入而依次产生的。代码如下:
private val _size = if(isSlice) new AtomicInteger(end - start) // don't check the file size if this is just a slice view else new AtomicInteger(math.min(channel.size().toInt, end) - start) if (!isSlice) channel.position(channel.size) ...... def append(messages: ByteBufferMessageSet) { val written = messages.writeTo(channel, 0, messages.sizeInBytes) _size.getAndAdd(written) } ...... }
FileMessageSet 核心字段
file: 指向磁盘上日志文件
channel:FileChannel类型,用于读写对应的日志文件
start:FileMessageSet对象除了表示一个完整的日志文件,还可以表示日志文件的分片,start表示分片的开始位置
end:表示分片的结束位置
isSlice:表示当前FileMessageSet是否为日志文件的分片
_size:FileMessageSet大小,单位是字节,如果是分片则表示分片大小
从上面可以看出,append主要是将messages写入到FileChannel中,并且更新_size的大小。到这里,写入流程就结束了。但是现在messages只写入到内存中,还需要定时将调用函数flush将内存中的消息写入到磁盘,这个主要是LogManager控制的,这里不作介绍。
四、kafka消息的读取kafka消息的读取,会调用Log对象中的read方法来进行。下面来看一下Log的read方法。
def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = { val next = nextOffsetmetadata.messageOffset //如果是最新的offset,则无数据读取 if(startOffset == next) return FetchDataInfo(nextOffsetmetadata, MessageSet.Empty) //根据startOffset定位位于哪个LogSegment var entry = segments.floorEntry(startOffset) //异常判断,如果startOffset大于当前最大的偏移量或者没有找到具体的LogSegment,则抛出异常 if(startOffset > next || entry == null) throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next)) //调用LogSegment的read方法读取具体的数据 while(entry != null) { val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength) if(fetchInfo == null) { entry = segments.higherEntry(entry.getKey) } else { return fetchInfo } }
从上面的代码可以看出,主要分为两个步骤:
1、根据startOffset定位位于哪个LogSegment,Log中包含多个LogSegment,这些LogSegment保存在segments,segments的定义如下:
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
segments是一个ConcurrentSkipListMap,它的内部将key组织成一个跳跃表,segments的key就是LogSegment的baseOffset,即它的第一个消息的offset。 通过segments.floorEntry(startOffset)就能找到不大于当前startOffset的最大的baseOffset对应的entry,然后调用LogSegment的read方法读取对应的消息。LogSegment的read方法如下:
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = { if(maxSize < 0) throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize)) val logSize = log.sizeInBytes // this may change, need to save a consistent copy val startPosition = translateOffset(startOffset) //获取对应offset的读取点位置. // if the start position is already off the end of the log, return null if(startPosition == null) //没有读取点位置则返回空 return null val offsetmetadata = new LogOffsetmetadata(startOffset, this.baseOffset, startPosition.position) //定义offsetmetadata // if the size is zero, still return a log segment but with zero size if(maxSize == 0) //最大读取尺寸是0的话.返回空消息. return FetchDataInfo(offsetmetadata, MessageSet.Empty) // calculate the length of the message set to read based on whether or not they gave us a maxOffset val length = //计算最大读取的消息总长度. maxOffset match { case None => //未设置maxoffset则使用maxsize. // no max offset, just use the max size they gave unmolested maxSize case Some(offset) => { //如果设置了Maxoffset,则计算对应的消息长度. // there is a max offset, translate it to a file position and use that to calculate the max read size if(offset < startOffset) //maxoffset小于startoffset则返回异常 throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset)) val mapping = translateOffset(offset, startPosition.position) //获取相对maxoffset读取点. val endPosition = if(mapping == null) logSize // the max offset is off the end of the log, use the end of the file else mapping.position min(endPosition - startPosition.position, maxSize) //用maxoffset读取点减去开始的读取点.获取需要读取的数据长度.如果长度比maxsize大则返回maxsize } } FetchDataInfo(offsetmetadata, log.read(startPosition.position, length)) //使用FileMessageSet.read读取相应长度的数据返回FetchDataInfo的封装对象. }
读取函数通过映射offset到读取长度.来读取多个offset.
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = { //用来将offset映射到读取指针位置的函数. val mapping = index.lookup(offset) //通过查找index获取对应的指针对象. log.searchFor(offset, max(mapping.position, startingFilePosition)) //通过FileMessageSet获取对应的指针位置. }
最终还是通过调用FileMessageSet的read方法来进行读取。FileMessageSet的read方法如下:
def read(position: Int, size: Int): FileMessageSet = { if(position < 0) throw new IllegalArgumentException("Invalid position: " + position) if(size < 0) throw new IllegalArgumentException("Invalid size: " + size) new FileMessageSet(file, channel, start = this.start + position, end = math.min(this.start + position + size, sizeInBytes())) }
最终返回一个日志分片,然后组装成一个FetchDataInfo对象给请求端。自此,kafka的消息读取流程结束。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)