【HDFS】--增量块汇报[IBR](1)

2021-03-12  本文已影响0人  小北觅

前言:
这是增量块汇报的第一篇文章,不讲述增量块汇报的NameNode侧的处理逻辑。主要介绍Datanode侧的逻辑。

我将带着如下问题去阅读源码:
1)什么时候会触发增量块汇报?
2)发送增量块汇报的处理逻辑是什么?
3)增量块汇报的内容是什么?

官方文档上有个参数:
dfs.blockreport.incremental.intervalMsec,默认值是0。单位ms。
这个参数的描述信息如下:

If set to a positive integer, the value in ms to wait between sending incremental block reports from the Datanode to the Namenode.
如果这个参数的值设置为一个正整数,那么就代表每次DN向NN发送增量块汇报时要中间间隔一段时间。

我们从逆向出发,在代码中搜索这个配置项,发现这个配置项最后是用来构造
org.apache.hadoop.hdfs.server.datanode.BPServiceActor#ibrManager这个成员变量了。

ibrManager这个成员变量的类型是IncrementalBlockReportManager。顾名思义,是增量块汇报管理者,用来统一管理增量块汇报的相关操作。

进到IncrementalBlockReportManager这个类,看一下都有什么方法,看方法名跟发送增量块汇报相关的方法我在图中圈出来了:

sendIBRs代码如下:

  /** Send IBRs to namenode. */
  void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
      String bpid) throws IOException {
    // Generate a list of the pending reports for each storage under the lock
    final StorageReceivedDeletedBlocks[] reports = generateIBRs();
    if (reports.length == 0) {
      // Nothing new to report.
      return;
    }

    // Send incremental block reports to the Namenode outside the lock
    if (LOG.isDebugEnabled()) {
      LOG.debug("call blockReceivedAndDeleted: " + Arrays.toString(reports));
    }
    boolean success = false;
    final long startTime = monotonicNow();
    try {
      namenode.blockReceivedAndDeleted(registration, bpid, reports);
      success = true;
    } finally {

      if (success) {
        dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime);
        lastIBR = startTime;
      } else {
        // If we didn't succeed in sending the report, put all of the
        // blocks back onto our queue, but only in the case where we
        // didn't put something newer in the meantime.
        putMissing(reports);
      }
    }
  }

关注一下putMissing方法:

If we didn't succeed in sending the report, put all of the blocks back onto our queue, but only in the case where we didn't put something newer in the meantime.

putMissing代码如下:

  //这个方法是个同步方法,会占用锁
  private synchronized void putMissing(StorageReceivedDeletedBlocks[] reports) {
    for (StorageReceivedDeletedBlocks r : reports) {
      pendingIBRs.get(r.getStorage()).putMissing(r.getBlocks());
    }
    if (reports.length > 0) {
      readyToSend = true;
    }
  }

putMissing主要就是把IBR根据Block放入PerStorageIBR#blocks这个Map类型的成员变量中。当然这里面有个判断,如果blocks中已经有了相应的块信息,那么就忽略掉。因为此时blocks中保存的对应块的IBR信息才是最新的。也就对应了上面putMissing方法注释中说明的情况。

sendImmediately方法的代码如下,这个方法的作用是判断是否需要立即发送IBR。用到了readyToSend变量、ibrInterval变量、lastIBR变量。这三个变量看下面的注释也很好理解。


  /**
   * If this flag is set then an IBR will be sent immediately by the actor
   * thread without waiting for the IBR timer to elapse.
   */
  private volatile boolean readyToSend = false;

  /** The time interval between two IBRs. */
  private final long ibrInterval;

  /** The timestamp of the last IBR. */
  private volatile long lastIBR;

  boolean sendImmediately() {
    return readyToSend && monotonicNow() - ibrInterval >= lastIBR;
  }

triggerIBR代码如下,这个方法的功能是用来触发IBR。首先设置volatile变量readyToSend的值为true,然后根据传入的force参数判断是否强制触发,如果是强制触发,则更新lastIBR的值为monotonicNow() - ibrInterval,也就是说人为更改上一次IBR的时间用以超过配置的增量块汇报的间隔。然后根据上面提到的sendImmediately函数的返回值决定是否notify在等待的线程,这里抛出个问题后面解答,什么线程在等待呢?

  synchronized void triggerIBR(boolean force) {
    readyToSend = true;
    if (force) {
      lastIBR = monotonicNow() - ibrInterval;
    }
    if (sendImmediately()) {
      notifyAll();
    }
  }

接下来解答上面的问题,什么线程wait了?
刚才截图类的structure时,除了上面这个三个看起来像IBR的函数外, 还有一个waitTillNextIBR方法,继续顾名思义,wait直到下一次IBR。先来看一下这个方法的实现,然后再看一下这个方法的调用栈,分析一下是什么位置调用了这个函数。

  synchronized void waitTillNextIBR(long waitTime) {
    if (waitTime > 0 && !sendImmediately()) {
      try {
        wait(ibrInterval > 0 && ibrInterval < waitTime? ibrInterval: waitTime);
      } catch (InterruptedException ie) {
        LOG.warn(getClass().getSimpleName() + " interrupted");
      }
    }
  }

wait函数里用了条件判断表达式,这里的ibrInterval我们按照默认值0来计算,所以直接取传入的waitTime参数的值。如果说传入waitTime大于0并且不需要立即发送IBR,那么线程就进行wait,时间为waitTime(ms)。

接着看waitTillNextIBR的调用点,来到了org.apache.hadoop.hdfs.server.datanode.BPServiceActor#offerService方法里。这个方法是BPServiceActor类的run方法中调用的方法,只要DN运行,这个offerService方法就不断循环的执行。offerService方法中按顺序进行heartbeat、增量块汇报、全量块汇报操作。

到这里基本上可以回答文章开头的两个问题了:

DataNode的BPServiceActor线程在DN启动后会一直执行,不断循环的发送heartbeat、IBR(增量块汇报)、FBR(全量块汇报)。然后根据增量块汇报会更新各种记录时间的变量用来辅助调用IBR。比如强制触发IBR等。

还有一个问题是:IBR中的内容是什么?回答这个问题需要看generateIBRs方法:

可以看到,本质上发送的内容来自于pendingIBRs这个Map,此Map的key是DatanodeStorage代表了Datanode的一个Storage,可以理解为一个磁盘;此Map的value是PerStorageIBR,代表了每个Storage上的IBR。PerStorageIBR这个类我们前面遇到过,还得他的blocks变量么?就是用来保存新增的IBR和处理失败的IBR的那个blocks呀!

好,本文完。下一篇将学习IBR在NN侧的处理过程。

上一篇下一篇

猜你喜欢

热点阅读