Namenode写EditLog之logEdit与logSync

2020-08-31  本文已影响0人  小北觅

分享一波大数据&Java的学习视频和书籍:
### Java与大数据资源分享

namenode启动后加载完fsimage和editlog文件后,新的修改元数据的操作会被记录到editlog文件中,本文就来看看写editlog文件相关的逻辑。

首先来看logEdit方法,传入的参数是FSEditLogOp类型的,代表一种操作类型,例如:DeleteOp、AddBlockOp、MkdirOp等等,这个跟RPC操作大致是对应的。

  /**
   * Write an operation to the edit log.
   * <p/>
   * Additionally, this will sync the edit log if required by the underlying
   * edit stream's automatic sync policy (e.g. when the buffer is full, or
   * if a time interval has elapsed).
   *首先看doc描述,此方法功能是向edit log中写一个operation信息。
   *另外,如果底层的eidt stream的自动同步策略要求同步edit log的话,这个方法 
   *还会进行sync edit log
   */
  void logEdit(final FSEditLogOp op) {
    boolean needsSync = false;
    synchronized (this) {
      //判断editlog的状态是否openForWrite
      assert isOpenForWrite() :
        "bad state: " + state;
      
      // wait if an automatic sync is scheduled
      waitIfAutoSyncScheduled();

      // check if it is time to schedule an automatic sync
      needsSync = doEditTransaction(op);
      if (needsSync) {
        //唯一一处给isAutoSyncScheduled赋值为true的地方,
        //赋值为true之后上面的waitIfAutoSyncScheduled()方法内就要调用wait方法了。
        isAutoSyncScheduled = true;
      }
    }

    // Sync the log if an automatic sync is required.
    if (needsSync) {
      //进行一次log sync
      logSync();
    }
  }

再细节过一下logEdit方法逻辑:

初始化needsSync为false;如果isAutoSyncScheduled为true,说明edit log的autoSync已经被调度了,则需要调用waitIfAutoSyncScheduled方法等待。

waitIfAutoSyncScheduled方法代码如下:

接着更新needSync变量,更新值是doEditTransaction方法的返回值。看doEditTransaction方法:

可以看到doEditTransaction方法中开启事务和结束事务中间做了把op写到editLog文件流的操作。也即调用了editLogStream.write(op)。write方法就不继续看了,就是往outputStream里写一些数据。最后return shouldForceSync(),这个方法的返回值会赋值给needSync变量,进而影响isAutoSyncScheduled的值。shouldForceSync()就是判断当前流的buffer size是不是大于初始的默认buffer大小,如果是就是返回true,表示缓冲区中数据够多了,可以sync了。

回到主线如果needSync被赋值为true,那么就更新isAutoSyncScheduled=true,然后调用logSync。因为后面我们调用了logSync方法,所以需要更新isAutoSyncScheduled变量。

最后看一下logSync()方法,这个方法很重要。介绍这个方法之前我们了解一些前置知识:双缓冲区和logSync的同步策略。我们知道运行时,有多个线程会同时写editlog文件。而HDFS为了提高并发度,并没粗粒度的加锁同步,而是使用了双缓冲区和自己的同步策略。

首先来看双缓冲区,双缓冲区是由类EditsDoubleBuffer实现的。

注释写的很清楚,bufCurrent是当前用来写的buffer,bufReady是用来flush到磁盘的buffer。使用双buffer的好处是,在进行同步到磁盘的操作时,不影响其他线程继续写到bufCurrent。双buffer的思想有点像JVM年轻代中的survivor 0区和survivor 1区的设计思想。不过需要注意的是,在更换缓冲区角色时,是要加锁的。

聊完双buffer,接着说一下logSync方法使用的同步策略吧,有三点

  1. 所有操作同步写入到内存中的EditLogOutputStream时,会被分配一个唯一的txid。
  2. 当一个线程要同步流中的内容到磁盘时,logSync会使用ThreadLocal类型的变量myTransactionId获取当前线程需要同步的txid。如果当前线程需要同步的txid大于已经同步完成的txid(editlog中的txid,利用synctxid变量记录),则说明当前线程的内容是新的,可以进行同步到磁盘操作。如果当前线程需要同步的txid小于已经同步完成的txid,说明当前线程需要同步的内容已经被同步过了,所以直接跳了(return)
  3. logSync中使用isSyncRunning表明当前是否有线程正在进行同步操作刷盘。isSyncRunning是个volatile boolean类型的变量。如果当前线程准备同步内容到磁盘时,发现这个变量为true时,就会wait。
    介绍完前置内容,下面进入logSync源码的介绍部分:
 /**
   * Sync all modifications done by this thread.
   *
   * The internal concurrency design of this class is as follows:
   *   - Log items are written synchronized into an in-memory buffer,
   *     and each assigned a transaction ID.
   *   - When a thread (client) would like to sync all of its edits, logSync()
   *     uses a ThreadLocal transaction ID to determine what edit number must
   *     be synced to.
   *   - The isSyncRunning volatile boolean tracks whether a sync is currently
   *     under progress.
   *
   * The data is double-buffered within each edit log implementation so that
   * in-memory writing can occur in parallel with the on-disk writing.
   *
   * Each sync occurs in three steps:
   *   1. synchronized, it swaps the double buffer and sets the isSyncRunning
   *      flag.
   *   2. unsynchronized, it flushes the data to storage
   *   3. synchronized, it resets the flag and notifies anyone waiting on the
   *      sync.
   *
   * The lack of synchronization on step 2 allows other threads to continue
   * to write into the memory buffer while the sync is in progress.
   * Because this step is unsynchronized, actions that need to avoid
   * concurrency with sync() should be synchronized and also call
   * waitForSyncToFinish() before assuming they are running alone.
   */
    //建议认真看下英文Doc,关于同步策略的描述
  public void logSync() {
    // Fetch the transactionId of this thread.
    // 传入当前线程ThreadLocal变量myTransactionId对象里面的txid字段。
    logSync(myTransactionId.get().txid);
  }

追到带参数的logSync方法里:

protected void logSync(long mytxid) {
    long syncStart = 0;
    boolean sync = false;
    //用于记录这次同步到磁盘多少条txid,会被metrics记录。
    long editsBatchedInSync = 0;
    try {
      EditLogOutputStream logStream = null;
      synchronized (this) {
        try {
          printStatistics(false);

          // if somebody is already syncing, then wait
          while (mytxid > synctxid && isSyncRunning) {
            try {
              wait(1000);
            } catch (InterruptedException ie) {
            }
          }
  
          //
          // If this transaction was already flushed, then nothing to do
          //
          if (mytxid <= synctxid) {
            return;
          }

          // now, this thread will do the sync.  track if other edits were
          // included in the sync - ie. batched.  if this is the only edit
          // synced then the batched count is 0
          editsBatchedInSync = txid - synctxid - 1;
          //记录当前最高的txid
          syncStart = txid;
          //更新isSyncRunning为true,然后下面就开始互换双buffer
          isSyncRunning = true;
          sync = true;

          // swap buffers
          try {
            if (journalSet.isEmpty()) {
              throw new IOException("No journals available to flush");
            }
            //将两个buffer互换,此时还在synchronized里
            editLogStream.setReadyToFlush();
          } catch (IOException e) {
            final String msg =
                "Could not sync enough journals to persistent storage " +
                "due to " + e.getMessage() + ". " +
                "Unsynced transactions: " + (txid - synctxid);
            LOG.error(msg, new Exception());
            synchronized(journalSetLock) {
              IOUtils.cleanupWithLogger(LOG, journalSet);
            }
            terminate(1, msg);
          }
        } finally {
          // Prevent RuntimeException from blocking other log edit write 
          doneWithAutoSyncScheduling();
        }
        //editLogStream may become null,
        //so store a local variable for flush.
        logStream = editLogStream;
      }
      
      // do the sync
      //这部分代码没加锁,因为操作的是写磁盘buffer,不影响写内存的buffer
      long start = monotonicNow();
      try {
        if (logStream != null) {
          //把内存中的editlog刷盘
          logStream.flush();
        }
      } catch (IOException ex) {
        synchronized (this) {
          final String msg =
              "Could not sync enough journals to persistent storage. "
              + "Unsynced transactions: " + (txid - synctxid);
          LOG.error(msg, new Exception());
          synchronized(journalSetLock) {
            IOUtils.cleanupWithLogger(LOG, journalSet);
          }
          terminate(1, msg);
        }
      }
      long elapsed = monotonicNow() - start;
  
      if (metrics != null) { // Metrics non-null only when used inside name node
        metrics.addSync(elapsed);
        metrics.incrTransactionsBatchedInSync(editsBatchedInSync);
        numTransactionsBatchedInSync.addAndGet(editsBatchedInSync);
      }
      
    } finally {
      // Prevent RuntimeException from blocking other log edit sync 
      synchronized (this) {
        if (sync) {
          //如果同步成功,更新synctxid(stores the last synced transactionId)
          synctxid = syncStart;
          for (JournalManager jm : journalSet.getJournalManagers()) {
            /**
             * {@link FileJournalManager#lastReadableTxId} is only meaningful
             * for file-based journals. Therefore the interface is not added to
             * other types of {@link JournalManager}.
             */
            if (jm instanceof FileJournalManager) {
              ((FileJournalManager)jm).setLastReadableTxId(syncStart);
            }
          }
          isSyncRunning = false;
        }
        this.notifyAll();
     }
    }
  }

<--END-->
毁灭吧,赶紧的,累了。

上一篇下一篇

猜你喜欢

热点阅读