kafka的日志模块log源码解析

kafka的日志模块log源码解析,第1张

kafka的日志模块log源码解析 一、背景

作为一个分布式的消息队列,kafka的日志模块主要用来存储、读取消息,它为kafka集群的高可用、高性能提供了基础。下面结合kafka的源码分析一下日志模块的设计思路。

二、日志格式

kafka的以topic为单位组织消息,为了提高系统的吞吐率,将一个topic分为N个partition。以partition为单位接收、消费消息。一个partition有多个分片,主分片接收生成者发送的消息,同时副分片从主分片出拉取消息,消费者也从主分片出消费消息。其工作原理如下图。

图1-kafka消息流转过程

kafka集群中的每个broker中都会创建对应的日志文件来存储partition中的数据,每个partition都会有一个Log对象来进行管理。Log的定义如下:

class Log(val dir: File,
          @volatile var config: LogConfig,
          @volatile var recoveryPoint: Long = 0L,
          scheduler: Scheduler,
          time: Time = SystemTime) extends Logging with KafkaMetricsGroup {
  
       //segments是一个map结构,记录了这个Log下的所有的LogSegment
        private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
       ......
  
        //topicAndPartition保存了topic和partition
        val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(name)
        ......
}

Log中的topicAndPartition表示这个log对象对应的topic和partition,segments记录了这个log所有的LogSegment。LogSegment是Log的下一级单位,由于一个partition中的消息会非常多,所以系统不会把这些消息记录到一个文件里面,会将一个partition消息存放在多个文件,partition是分段的,每个段叫LogSegment,包括了一个数据文件和一个索引文件,下图是某个partition目录下的文件

kafka是顺序写入,所以这些LogSegment是随着数据的写入而依次产生的。代码如下:

  private val _size = 
    if(isSlice)
      new AtomicInteger(end - start) // don't check the file size if this is just a slice view
    else
      new AtomicInteger(math.min(channel.size().toInt, end) - start)

  
  if (!isSlice)
    
    channel.position(channel.size)
......
def append(messages: ByteBufferMessageSet) {
    val written = messages.writeTo(channel, 0, messages.sizeInBytes)
    _size.getAndAdd(written)
  }
......
}

FileMessageSet 核心字段

file: 指向磁盘上日志文件

channel:FileChannel类型,用于读写对应的日志文件

start:FileMessageSet对象除了表示一个完整的日志文件,还可以表示日志文件的分片,start表示分片的开始位置

end:表示分片的结束位置

isSlice:表示当前FileMessageSet是否为日志文件的分片

_size:FileMessageSet大小,单位是字节,如果是分片则表示分片大小

从上面可以看出,append主要是将messages写入到FileChannel中,并且更新_size的大小。到这里,写入流程就结束了。但是现在messages只写入到内存中,还需要定时将调用函数flush将内存中的消息写入到磁盘,这个主要是LogManager控制的,这里不作介绍。

四、kafka消息的读取

kafka消息的读取,会调用Log对象中的read方法来进行。下面来看一下Log的read方法。

def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): FetchDataInfo = {

    val next = nextOffsetmetadata.messageOffset
   //如果是最新的offset,则无数据读取
    if(startOffset == next)
      return FetchDataInfo(nextOffsetmetadata, MessageSet.Empty)

    //根据startOffset定位位于哪个LogSegment
    var entry = segments.floorEntry(startOffset)
      
    //异常判断,如果startOffset大于当前最大的偏移量或者没有找到具体的LogSegment,则抛出异常
    if(startOffset > next || entry == null)
      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, segments.firstKey, next))
    
    //调用LogSegment的read方法读取具体的数据
    while(entry != null) {
      val fetchInfo = entry.getValue.read(startOffset, maxOffset, maxLength)
      if(fetchInfo == null) {
        entry = segments.higherEntry(entry.getKey)
      } else {
        return fetchInfo
      }
    }

从上面的代码可以看出,主要分为两个步骤:

      1、根据startOffset定位位于哪个LogSegment,Log中包含多个LogSegment,这些LogSegment保存在segments,segments的定义如下:

 private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

segments是一个ConcurrentSkipListMap,它的内部将key组织成一个跳跃表,segments的key就是LogSegment的baseOffset,即它的第一个消息的offset。 通过segments.floorEntry(startOffset)就能找到不大于当前startOffset的最大的baseOffset对应的entry,然后调用LogSegment的read方法读取对应的消息。LogSegment的read方法如下:

def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): FetchDataInfo = {
    if(maxSize < 0)
      throw new IllegalArgumentException("Invalid max size for log read (%d)".format(maxSize))

    val logSize = log.sizeInBytes // this may change, need to save a consistent copy
    val startPosition = translateOffset(startOffset)  //获取对应offset的读取点位置.

    // if the start position is already off the end of the log, return null
    if(startPosition == null) //没有读取点位置则返回空
      return null

    val offsetmetadata = new LogOffsetmetadata(startOffset, this.baseOffset, startPosition.position) //定义offsetmetadata

    // if the size is zero, still return a log segment but with zero size
    if(maxSize == 0)  //最大读取尺寸是0的话.返回空消息.
      return FetchDataInfo(offsetmetadata, MessageSet.Empty)

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    val length =   //计算最大读取的消息总长度.
      maxOffset match {
        case None => //未设置maxoffset则使用maxsize.
          // no max offset, just use the max size they gave unmolested
          maxSize
        case Some(offset) => { //如果设置了Maxoffset,则计算对应的消息长度.
          // there is a max offset, translate it to a file position and use that to calculate the max read size
          if(offset < startOffset)  //maxoffset小于startoffset则返回异常
            throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
          val mapping = translateOffset(offset, startPosition.position) //获取相对maxoffset读取点.
          val endPosition = 
            if(mapping == null)
              logSize // the max offset is off the end of the log, use the end of the file
            else
              mapping.position
          min(endPosition - startPosition.position, maxSize) //用maxoffset读取点减去开始的读取点.获取需要读取的数据长度.如果长度比maxsize大则返回maxsize
        }
      }
    FetchDataInfo(offsetmetadata, log.read(startPosition.position, length)) //使用FileMessageSet.read读取相应长度的数据返回FetchDataInfo的封装对象.
  }

读取函数通过映射offset到读取长度.来读取多个offset.

private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {  //用来将offset映射到读取指针位置的函数.
    val mapping = index.lookup(offset) //通过查找index获取对应的指针对象.
    log.searchFor(offset, max(mapping.position, startingFilePosition)) //通过FileMessageSet获取对应的指针位置.
  }

最终还是通过调用FileMessageSet的read方法来进行读取。FileMessageSet的read方法如下:

def read(position: Int, size: Int): FileMessageSet = {
    if(position < 0)
      throw new IllegalArgumentException("Invalid position: " + position)
    if(size < 0)
      throw new IllegalArgumentException("Invalid size: " + size)
    new FileMessageSet(file,
                       channel,
                       start = this.start + position,
                       end = math.min(this.start + position + size, sizeInBytes()))
  }

最终返回一个日志分片,然后组装成一个FetchDataInfo对象给请求端。自此,kafka的消息读取流程结束。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5705791.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-18

发表评论

登录后才能评论

评论列表(0条)

保存