数客联盟

hbase create table过程解析

2018-06-08  本文已影响45人  tinyMonkey

程序入口

在org.apache.hadoop.hbase.master.HMaster中定义了MasterRpcServices提供rpc服务
org.apache.hadoop.hbase.master.MasterRpcServices实现了接口MasterProtos.MasterService.BlockingInterface
其中使用了google的protobuf rpc通信,可以参见另一篇文章:hbase与客户端的通信过程解析,最终实现了createTable接口:

@Override
  public CreateTableResponse createTable(RpcController controller, CreateTableRequest req)
  throws ServiceException {
    HTableDescriptor hTableDescriptor = HTableDescriptor.convert(req.getTableSchema());
    byte [][] splitKeys = ProtobufUtil.getSplitKeysArray(req);
    try {
      long procId =
          master.createTable(hTableDescriptor, splitKeys, req.getNonceGroup(), req.getNonce());
      return CreateTableResponse.newBuilder().setProcId(procId).build();
    } catch (IOException ioe) {
      throw new ServiceException(ioe);
    }
  }

habse的procedure框架

 @Override
  public long createTable(
      final HTableDescriptor hTableDescriptor,
      final byte [][] splitKeys,
      final long nonceGroup,
      final long nonce) throws IOException {
    ...

    return MasterProcedureUtil.submitProcedure(
      new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
      @Override
      protected void run() throws IOException {
        ...
        ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch();
        submitProcedure(new CreateTableProcedure(
          procedureExecutor.getEnvironment(), hTableDescriptor, newRegions, latch));
        latch.await();
        ...
      }
    });
  }

这里引用了两个概念,一个是NonceProcedureRunnable,意思是以下的代码只能同时执行一次,避免重复执行同一个create table操作;另一个是Procedure,对一些必须保证事务性的操作,hbase实现了一套Procedure操作,方便rollback;

org.apache.hadoop.hbase.procedure2.ProcedureExecutor

定义了几个核心的方法:

org.apache.hadoop.hbase.procedure2.Procedure
org.apache.hadoop.hbase.procedure2.StateMachineProcedure

实现了一个按照state去以此执行的procedure

对procedure框架的使用:

具体的create table过程

@Override
  protected Flow executeFromState(final MasterProcedureEnv env, final CreateTableState state)
      throws InterruptedException {
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + " execute state=" + state);
    }
    try {
      switch (state) {
        case CREATE_TABLE_PRE_OPERATION:
          // Verify if we can create the table
          boolean exists = !prepareCreate(env);
          ProcedurePrepareLatch.releaseLatch(syncLatch, this);

          if (exists) {
            assert isFailed() : "the delete should have an exception here";
            return Flow.NO_MORE_STATE;
          }

          preCreate(env);
          setNextState(CreateTableState.CREATE_TABLE_WRITE_FS_LAYOUT);
          break;
        case CREATE_TABLE_WRITE_FS_LAYOUT:
          newRegions = createFsLayout(env, hTableDescriptor, newRegions);
          setNextState(CreateTableState.CREATE_TABLE_ADD_TO_META);
          break;
        case CREATE_TABLE_ADD_TO_META:
          newRegions = addTableToMeta(env, hTableDescriptor, newRegions);
          setNextState(CreateTableState.CREATE_TABLE_ASSIGN_REGIONS);
          break;
        case CREATE_TABLE_ASSIGN_REGIONS:
          assignRegions(env, getTableName(), newRegions);
          setNextState(CreateTableState.CREATE_TABLE_UPDATE_DESC_CACHE);
          break;
        case CREATE_TABLE_UPDATE_DESC_CACHE:
          updateTableDescCache(env, getTableName());
          setNextState(CreateTableState.CREATE_TABLE_POST_OPERATION);
          break;
        case CREATE_TABLE_POST_OPERATION:
          postCreate(env);
          return Flow.NO_MORE_STATE;
        default:
          throw new UnsupportedOperationException("unhandled state=" + state);
      }
    } catch (HBaseException|IOException e) {
      LOG.error("Error trying to create table=" + getTableName() + " state=" + state, e);
      setFailure("master-create-table", e);
    }
    return Flow.HAS_MORE_STATE;
  }

入代码所示,流程如下:

上一篇 下一篇

猜你喜欢

热点阅读