Kafka源码分析-Server-日志存储(1)

2019-02-11  本文已影响0人  陈阳001

基本概念

Kafka使用日志文件的方式保存生产者发送的消息。每条消息都有一个offset值来表示它在分区的偏移量,这个offset值是逻辑值,并不是消息实际的存放物理地址。offset类似数据库表的主键,主键唯一确定了数据库表中的一条记录,offset唯一确定了分区的一条消息。Kafka存储机制在逻辑上如下图:


日志存储结构.png

为了提高写入的性能,同一个分区中的消息是顺序写入的,这就避免了随机写入带来的性能问题。一个topic可以有n个分区,每个分区也有多个副本。当一个分区的副本(无论是Leader副本还是Follower副本)被划分到某个Broker上时,Kafka就要在此Broker上为此分区建立相应的Log,生产者发送的消息会存储在Log里,然后被消费者拉取消费。
Kafka中存储的数据都是海量的,为了避免日志文件太大,Log并不是直接对应磁盘上的一个日志文件,而是对应磁盘上的一个目录,目录的命名规则是<topic_name>_<partition_id>,Log和分区直接的关系是一一对应的,对应分区的全部消息都存储在这个目录中。
Kafka通过分段的方式将Log分为多个LogSegment,LogSegment是一个逻辑上的概念,一个LogSegment对应磁盘上的一个日志文件和一个索引文件,,日志文件用于记录消息,索引文件保存消息的索引。日志文档到一个阈值时,就会创建新的日志文件继续写入后续的消息和索引信息。日志文件的文件名的命名规则是[baseOffset].log,baseOffset是日志文件中的第一条消息的offset。下面是Log的结构:


Log结构.png
为了提高查询消息的效率,每个日志文件都对应一个索引文件,这个索引文件并没有为每条消息都建索引,而是使用稀疏索引方式为日志文件中的部分消息建立了索引。下面的图展示了所有文件和日志文件的关系:
log的索引.png

FileMessageSet

Kafka使用FileMessageSet管理日志文件,它对应磁盘上一个真正的日志文件。FileMessageSet继承了MessageSet抽象类,如下图:


FileMessageSet类图.png

MessageSet中保存的数据格式有三个部分:8个字节的offset值,4个字节的size表示message data的大小,这两个结合成为LogOverhead, message data保存了消息的数据,逻辑上对应一个Message对象:


FileMessageSet定义的消息结构.png
Kafka使用Message类表示消息,Message类使用ByteBuffer保存数据,格式和各个部分的含义如下:
Message类定义的保存数据.png
/** Write the messages in this set to the given channel starting at the given offset byte. 
    * Less than the complete amount may be written, but no more than maxSize can be. The number
    * of bytes written is returned
    * 将当前MessageSet中的消息写入到Channel中
    * */
  def writeTo(channel: GatheringByteChannel, offset: Long, maxSize: Int): Int
/**
   * Provides an iterator over the message/offset pairs in this set
    * 提供迭代器,顺序读取MessageSet中的消息
   */
  def iterator: Iterator[MessageAndOffset]

这两个方法说明MessageSet具有顺序写入消息和顺序读取的特性。后面介绍FileMessageSet和ByteBufferMessageSet时会说明这两个方法的实现。

分析FileMessageSet实现类

核心字段:
*file: java.io.File类型,指向磁盘上对应的日志文件。
*channel:FileChannel类型,用于读写对应的日志文件。
*start和end:MessageSet对象除了表示一个完整的日志文件,还可以表示日志文件分片(Slice),start和end分别表示分片的起始位置和结束位置。文件分配的相关概念可以找资料了解下。

 * Create a file message set with no slicing, and with initFileSize and preallocate.
   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
   * with one value (for example 512 * 1024 *1024 ) can improve the kafka produce performance.
   * If it's new file and preallocate is true, end will be set to 0.  Otherwise set to Int.MaxValue.
   */
  def this(file: File, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean) =
      this(file,
        //如果使用preallocate进行预分配,end会初始化为零
        channel = FileMessageSet.openChannel(file, mutable = true, fileAlreadyExists, initFileSize, preallocate),
        start = 0,
        end = ( if ( !fileAlreadyExists && preallocate ) 0 else Int.MaxValue),
        isSlice = false)
/**
   * Open a channel for the given file
   * For windows NTFS and some old LINUX file system, set preallocate to true and initFileSize
   * with one value (for example 512 * 1025 *1024 ) can improve the kafka produce performance.
   * @param file File path
   * @param mutable mutable
   * @param fileAlreadyExists File already exists or not
   * @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
   * @param preallocate Pre allocate file or not, gotten from configuration.
    * FileMessageSet.openChannel()方法的具体实现。
   */
  def openChannel(file: File, mutable: Boolean, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): FileChannel = {
    if (mutable) {//根据mutable参数创建的FileChannel是否可写
      if (fileAlreadyExists)
        new RandomAccessFile(file, "rw").getChannel()
      else {
        if (preallocate) {//进行文件预分配
          val randomAccessFile = new RandomAccessFile(file, "rw")
          randomAccessFile.setLength(initFileSize)
          randomAccessFile.getChannel()
        }
        else
          new RandomAccessFile(file, "rw").getChannel()//创建可读可写的FileChannel
      }
    }
    else
      new FileInputStream(file).getChannel()//创建只读的FileChannel
  }

在FileMessageSet对象初始化的过程中,会移动FileChannel的position指针,原因是为了每次写入的消息都在日志文件的尾部,避免重启服务后的写入操作覆盖之前的操作。对应新创建的且进行了预分配空间的日志文件,其end会初始化为0,所以也是从文件的起始写入数据的。

/* if this is not a slice, update the file pointer to the end of the file
  * 将position移动到最后一个字节,之后从此position开始写消息,这样防止重启后覆盖之前的操作
  *
  * */
  if (!isSlice)
    /* set the file position to the last byte in the file */
    channel.position(math.min(channel.size.toInt, end))

介绍完FileMessageSet的构造过程,下面介绍其读写过程。FileMessageSet.append()方法实现了写日志文件的功能,其参数必须是ByteBufferMessageSet对象,下面是FileMessageSet.append()方法的代码:

/**
   * Append these messages to the message set
   */
  def append(messages: ByteBufferMessageSet) {
    val written = messages.writeFullyTo(channel)//写文件
    _size.getAndAdd(written)//修改FileMessageSet的大小
  }
/** Write the messages in this set to the given channel 
    * 下面是 ByteBufferMessageSet.writeFullyTo()方法
    * */
  def writeFullyTo(channel: GatheringByteChannel): Int = {
    buffer.mark()
    var written = 0
    while (written < sizeInBytes)//将ByteBufferMessageSet中的数据全部写入文件
      written += channel.write(buffer)
    buffer.reset()
    written
  }

查找指定消息的功能在FileMessageSet.searchFor()方法中实现。searchFor()方法的逻辑是从指定的startPosition开始逐条遍历FileMessageSet中的消息,并将每个消息的offset和targetOffset,最后返回查找到的offset。在遍历过程中不会将消息的key和value读取到内存,只是只读取LogOverhead(即offset和size),并通过size定位到下一条消息的开始位置。FileMessageSet.searchFor()方法代码如下:

/**
   * Search forward for the file position of the last offset that is greater than or equal to the target offset
   * and return its physical position. If no such offsets are found, return null.
   * @param targetOffset The offset to search for.
   * @param startingPosition The starting position in the file to begin searching from.
   */
  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
    var position = startingPosition //起始位置
    //创建用于读取 LogOverhead(即offset和size)的ByteBuffer(长度12)
    val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
    val size = sizeInBytes()//当前FileMessageSet的大小,单位是字节
    //从position开始逐条消息遍历
    while(position + MessageSet.LogOverhead < size) {
      buffer.rewind()//重置ByteBuffer的position指针,准备读入数据
      //读取LogOverhead。这里会确保startingPosition位于一个消息的开头,否则
      //读取到的并不是 LogOverhead,这个条件的保证会在后面提到
      channel.read(buffer, position)
      if(buffer.hasRemaining)//未读取到12个字节的LogOverhead,抛出异常
        throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
                                        .format(targetOffset, startingPosition, file.getAbsolutePath))
      buffer.rewind()//重置ByteBuffer的position指针,准备从ByteBuffer中读取数据
      val offset = buffer.getLong()//读取消息的offset,8个字节
      if(offset >= targetOffset)//判断是否符合退出条件
        return OffsetPosition(offset, position)//得到消息的位置
      val messageSize = buffer.getInt()//获取消息的size,4个字节
      if(messageSize < Message.MinMessageOverhead)
        throw new IllegalStateException("Invalid message size: " + messageSize)
      //移动Position,准备读取下个消息
      position += MessageSet.LogOverhead + messageSize
    }
    null//找不到offset大于等于targetOffset,则返回Null
  }

FileMessageSet.writeTo()方法是将FileMessageSet中的数据写入指定的其他Channel中,这里先了解此方法的功能,具体实现会在后面介绍“零拷贝”的时候一起介绍。FileMessageSet.read*()方法是从FileMessageSet中读取数据,可以将FileMessageSet中的数据读入到别的ByteBuffer中返回,也可以按照指定位置和长度形成分片的FileMessageSet对象返回。FileMessageSet.delete()方法是将整个日志文件删除。
FileMessageSet还有一个truncateTo()方法,主要负责将日志文件截断到targetSize大小。此方法在后面介绍分区中Leader副本切换时还会提到。下面是truncateTo()方法的具体实现:

/**
   * Truncate this file message set to the given size in bytes. Note that this API does no checking that the
   * given size falls on a valid message boundary.
   * In some versions of the JDK truncating to the same size as the file message set will cause an
   * update of the files mtime, so truncate is only performed if the targetSize is smaller than the
   * size of the underlying FileChannel.
   * It is expected that no other threads will do writes to the log when this function is called.
   * @param targetSize The size to truncate to. Must be between 0 and sizeInBytes.
   * @return The number of bytes truncated off
   */
  def truncateTo(targetSize: Int): Int = {
    val originalSize = sizeInBytes
    if(targetSize > originalSize || targetSize < 0)//检测targetSize的有效性
      throw new KafkaException("Attempt to truncate log segment to " + targetSize + " bytes failed, " +
                               " size of this log segment is " + originalSize + " bytes.")
    if (targetSize < channel.size.toInt) {
      channel.truncate(targetSize)//裁剪文件
      channel.position(targetSize)//移动position
      _size.set(targetSize)//修改_size
    }
    originalSize - targetSize//返回剪裁掉的字节数
  }

FileMessageSet还实现了iterator()方法,返回一个迭代器。FileMessageSet迭代器读取消息的逻辑是:先读取消息的LogOverhead部分,然后按照size分配合适的ByteBuffer,再读取message data部分,最后将message data和offset封装成MessageOffset对象返回。迭代器的实现和searchFor()方法类似。

上一篇下一篇

猜你喜欢

热点阅读