HBase 源码阅读-put操作(region server 端

2018-12-10  本文已影响0人  pcqlegend

HBase的源代码还是比较长的,为了方便大家找到入口,这儿简单列列举client的调用栈,然后主要看server端的代码

client端

1 org.apache.hadoop.hbase.client.HTable#put(org.apache.hadoop.hbase.client.Put)
2 org.apache.hadoop.hbase.client.BufferedMutatorImpl#backgroundFlushCommits
3 org.apache.hadoop.hbase.client.AsyncProcess#submit
4 org.apache.hadoop.hbase.client.AsyncProcess#submitMultiActions
5 org.apache.hadoop.hbase.client.AsyncProcess#createAsyncRequestFuture
6 org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl#sendMultiAction
org.apache.hadoop.hbase.client.MultiServerCallable#call
7org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl.SingleServerRequestRunnable#run
8 org.apache.hadoop.hbase.client.MultiServerCallable#call
9 org.apache.hadoop.hbase.client.RegionServerCallable#getStub
10 org.apache.hadoop.hbase.regionserver.RSRpcServices#mutate

server 端

11 org.apache.hadoop.hbase.regionserver.HRegion#put(org.apache.hadoop.hbase.client.Put)
首先看下put主要逻辑

 */
  OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
    boolean initialized = false;
    Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
    startRegionOperation(op);
    try {
      while (!batchOp.isDone()) {
        if (!batchOp.isInReplay()) {
          checkReadOnly();
        }
        checkResources();

        if (!initialized) {
          this.writeRequestsCount.add(batchOp.operations.length);
          if (!batchOp.isInReplay()) {
            doPreMutationHook(batchOp);
          }
          initialized = true;
        }
        doMiniBatchMutation(batchOp);
        long newSize = this.getMemstoreSize();
        if (isFlushSize(newSize)) {
          requestFlush();
        }
      }
    } finally {
      closeRegionOperation(op);
    }
    return batchOp.retCodeDetails;
  }

以下是主要的doMiniBatchMutation代码逻辑,代码很长,直接翻译的代码中的英文(代码注释还是很多的 ).

代码很长,这儿我只把源码中的几个步骤拿出来说一下,总共9步骤
STEP1 尽可能多的获取行锁,至少获取一个
STEP2 更新每个cell的时间戳
STEP3 创建WAL日志
STEP4 将最后的修改添加到wal中,但是不做同步
STEP5 写入memorystore
STEP6 释放行锁
STEP7 同步wal
STEP8 更新mvcc
STEP9 运行协处理器post hooks,必须在wal同步之后才能执行(协处理器参考https://blog.csdn.net/m0_37636453/article/details/79284138

然后再看下flush的request

  @Override
  public void requestFlush(Region r, boolean forceFlushAllStores) {
    synchronized (regionsInQueue) {
      if (!regionsInQueue.containsKey(r)) {
        // This entry has no delay so it will be added at the top of the flush
        // queue.  It'll come out near immediately.
        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
        this.regionsInQueue.put(r, fqe);
        this.flushQueue.add(fqe);
      }
    }
  }

加入到flust队列,然后看下flush的线程org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushHandler#run

 @Override
    public void run() {
      while (!server.isStopped()) {
        FlushQueueEntry fqe = null;
        try {
          wakeupPending.set(false); // allow someone to wake us up again
          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
          if (fqe == null || fqe instanceof WakeupFlushThread) {
            if (isAboveLowWaterMark()) {
              LOG.debug("Flush thread woke up because memory above low water="
                  + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
              if (!flushOneForGlobalPressure()) {
                // Wasn't able to flush any region, but we're above low water mark
                // This is unlikely to happen, but might happen when closing the
                // entire server - another thread is flushing regions. We'll just
                // sleep a little bit to avoid spinning, and then pretend that
                // we flushed one, so anyone blocked will check again
                Thread.sleep(1000);
                wakeUpIfBlocking();
              }
              // Enqueue another one of these tokens so we'll wake up again
              wakeupFlushThread();
            }
            continue;
          }
          FlushRegionEntry fre = (FlushRegionEntry) fqe;
          if (!flushRegion(fre)) {
            break;
          }
        } catch (InterruptedException ex) {
          continue;
        } catch (ConcurrentModificationException ex) {
          continue;
        } catch (Exception ex) {
          LOG.error("Cache flusher failed for entry " + fqe, ex);
          if (!server.checkFileSystem()) {
            break;
          }
        }
      }
      synchronized (regionsInQueue) {
        regionsInQueue.clear();
        flushQueue.clear();
      }

      // Signal anyone waiting, so they see the close flag
      wakeUpIfBlocking();
      LOG.info(getName() + " exiting");
    }
  }

然后调用了flushRegion方法
org.apache.hadoop.hbase.regionserver.MemStoreFlusher#flushRegion(org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry)
这会有两个逻辑,如果region下的store files文件太多,则会将flush加入队列,否则直接进行flush,

  private boolean flushRegion(final Region region, final boolean emergencyFlush,
      boolean forceFlushAllStores) {
    long startTime = 0;
    synchronized (this.regionsInQueue) {
      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
      // Use the start time of the FlushRegionEntry if available
      if (fqe != null) {
        startTime = fqe.createTime;
      }
      if (fqe != null && emergencyFlush) {
        // Need to remove from region from delay queue.  When NOT an
        // emergencyFlush, then item was removed via a flushQueue.poll.
        flushQueue.remove(fqe);
     }
    }
    if (startTime == 0) {
      // Avoid getting the system time unless we don't have a FlushRegionEntry;
      // shame we can't capture the time also spent in the above synchronized
      // block
      startTime = EnvironmentEdgeManager.currentTime();
    }
    lock.readLock().lock();
    try {
      notifyFlushRequest(region, emergencyFlush);
      FlushResult flushResult = region.flush(forceFlushAllStores);
      boolean shouldCompact = flushResult.isCompactionNeeded();
      // We just want to check the size
      boolean shouldSplit = ((HRegion)region).checkSplit() != null;
      if (shouldSplit) {
        this.server.compactSplitThread.requestSplit(region);
      } else if (shouldCompact) {
        server.compactSplitThread.requestSystemCompaction(
            region, Thread.currentThread().getName());
      }
      if (flushResult.isFlushSucceeded()) {
        long endTime = EnvironmentEdgeManager.currentTime();
        server.metricsRegionServer.updateFlushTime(endTime - startTime);
      }
    } catch (DroppedSnapshotException ex) {
      // Cache flush can fail in a few places. If it fails in a critical
      // section, we get a DroppedSnapshotException and a replay of wal
      // is required. Currently the only way to do this is a restart of
      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
      // where hdfs was bad but passed the hdfs check).
      server.abort("Replay of WAL required. Forcing server shutdown", ex);
      return false;
    } catch (IOException ex) {
      LOG.error("Cache flush failed" + (region != null ? (" for region " +
          Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
        RemoteExceptionHandler.checkIOException(ex));
      if (!server.checkFileSystem()) {
        return false;
      }
    } finally {
      lock.readLock().unlock();
      wakeUpIfBlocking();
    }
    return true;
  }
flush主要逻辑

首先获取memStore的读锁, lock.readLock().lock();
进行flush FlushResult flushResult = region.flush(forceFlushAllStores);
判断是否需要compaction boolean shouldCompact = flushResult.isCompactionNeeded();
判断是否需要split则调用split的线程的requestSplit方法 this.server.compactSplitThread.requestSplit(region);
如果是需要进行合并则调用合并算法
server.compactSplitThread.requestSystemCompaction

上一篇 下一篇

猜你喜欢

热点阅读