HBase

HBase 1.2.0源码分析:HRegionServer启动

2018-12-17  本文已影响1人  Alex90

RegionServer 实现类org.apache.hadoop.hbase.regionserver.HRegionServer.

类描述:HRegionServer 管理一些 HRegion,使其对 Client 可用。需要与 HMaster 通信,通知状态。(HRegionServer makes a set of HRegions available to clients. It checks in with the HMaster. There are many HRegionServers in a single HBase deployment.

0. HRegionServer 初始化

构造方法需要参数:
conf 对应配置文件,csm 是一个协调服务,提供启停 Server 等方法

public HRegionServer(Configuration conf) throws IOException, InterruptedException {
    // 构建默认 CoordinatedStateManager
    this(conf, CoordinatedStateManagerFactory.getCoordinatedStateManager(conf));
}

public HRegionServer(Configuration conf, CoordinatedStateManager csm)
      throws IOException, InterruptedException {
    
    // ...
    
    // 测试 hbase.regionserver.codecs 配置的编码方式是可用的
    checkCodecs(this.conf);
    
    // 初始化 userProvider 用于权限认证
    this.userProvider = UserProvider.instantiate(conf);
    
    // 设置 short circuit read buffer,即短路本地读
    // 校验是否跳过 checksum,默认是 false,不推荐跳过
    // dfs.client.read.shortcircuit.skip.checksum
    FSUtils.setupShortCircuitRead(this.conf);
    
    // 设置一系列配置项
    // 设置客户端连接重试次数 hbase.client.retries.number(31)
    this.numRetries = this.conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
    // 线程执行周期 hbase.server.thread.wakefrequency(10 * 1000)    
    // 周期内会执行 CompactionChecker
    this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
    // RegionServer 发消息给 Master 时间间隔,单位是毫秒
    this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
    this.sleeper = new Sleeper(this.msgInterval, this);

    // 设置 Nonce 标志,初始化 nonceManager
    // 客户端的每次申请及重复申请使用同一个 nonce 进行描述,解决 Client 重复操作提交的情况
    boolean isNoncesEnabled = conf.getBoolean(HConstants.HBASE_RS_NONCES_ENABLED, true);
    this.nonceManager = isNoncesEnabled ? new ServerNonceManager(this.conf) : null;

    // 向 Master 进行 report 的 region 个数
    // HBase Client 操作的 Timeout 时间
    // HBase rpc 短操作的 Timeout 时间
    // ...

    // 创建 RegionServer 的 RPC 服务端
    rpcServices = createRpcServices();
    
    // 工厂类实例化
    rpcControllerFactory = RpcControllerFactory.instantiate(this.conf);
    rpcRetryingCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);

    // 用来记录 region server 中所有的 memstore 所占大小
    regionServerAccounting = new RegionServerAccounting();

    // 表示使用 zk 管理状态,并不会对Region状态进行持久化
    // 默认是 true,hbase.assignment.usezk
    useZKForAssignment = ConfigUtil.useZKForAssignment(conf);

    // 初始化 fs,封装hdfs
    this.fs = new HFileSystem(this.conf, useHBaseChecksum);
    
    // 初始化 htable meta 信息
    this.tableDescriptors = new FSTableDescriptors(
      this.conf, this.fs, this.rootDir, !canUpdateTableDescriptor(), false);

    // 初始化rs处理任务的线程池,同master的任务线程池
    // 具体任务类型参考 ExecutorType
    service = new ExecutorService(getServerName().toShortString());

    if (!conf.getBoolean("hbase.testing.nocluster", false)) {
        // 创建当前rs与zk的连接
        zooKeeper = new ZooKeeperWatcher(conf, getProcessName() + ":" +
          rpcServices.isa.getPort(), this, canCreateBaseZNode());

        // csm在分布式环境下用来协调集群所有server的状态信息
        this.csm = (BaseCoordinatedStateManager) csm;
        this.csm.initialize(this);
        this.csm.start();

        // 基于zk实现的分布式的锁管理器,用于锁表
        tableLockManager = TableLockManager.createTableLockManager(
          conf, zooKeeper, serverName);

        // 创建master跟踪器,等待master的启动
        // 在zk节点上注册,zookeeper.znode.master
        masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this);
        masterAddressTracker.start();

        // 创建cluster的跟踪器,等待cluster的启动
        // master注册clusterid到zk节点(zookeeper.znode.state),表示集群已经启动
        clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this);
        clusterStatusTracker.start();
    }

    // 启动 Region Server RPC服务
    rpcServices.start();
    // 启动 jetty,Region Server WebUI
    putUpWebUI();
    
    // 负责当前 rs 上所有 wal 的log roll(滚动并清理日志)
    this.walRoller = new LogRoller(this, this);
    // 封装了线程池,负责周期性的调度任务
    // 心跳、检查compact、compact完成后hfile清理...
    this.choreService = new ChoreService(getServerName().toString(), true);

}

什么是短路本地读?

在 HDFS 中,读取操作都是通过 Datanode 来进行的。
当客户端向 Datanode 发起读取文件请求时,Datanode 从磁盘读取文件,并且通过 TCP socket 发送给客户端。
所谓的“短路”就是不通过 Datanode,允许客户端直接读取文件。
显然,这仅在客户端与数据位于同一位置的情况下才有可能。短路读取能让许多应用性能显著提升。

什么是Nonce机制

客户端发送 RPC 请求给服务器后,如果响应超时,那么客户端会重复发送请求,直到达到参数配置的重试次数上限。
客户端第一次发送和以后重发请求时,会附带相同的 nonce,服务端只要根据 nonce 进行判断,就能得知是否为同一请求,
并根据之前请求处理的结果,决定是等待、拒绝还是直接处理。

1. HRegionServer 运行

/**
 * The HRegionServer sticks in this loop until closed.
 */
@Override
public void run() {
    
    // 向HMaster注册之前完成一些初始化工作   
    // 在ZK节点 /hbase/rs 下创建当前region server信息的节点,HMaster 监听这个路径
    preRegistrationInitialization();

    try {
        if (!isStopped() && !isAborted()) {
            // 在ZK节点 /hbase/rs 下创建当前region server信息的节点
            createMyEphemeralNode();
            // 加载的 coprocessor,提供 coprocessor 的运行环境
            this.rsHost = new RegionServerCoprocessorHost(this, this.conf);
        }

        while (keepLooping()) { // !this.stopped && isClusterUp();
            // 通知master,region server启动成功
            RegionServerStartupResponse w = reportForDuty();
            if (w == null) {
                this.sleeper.sleep();
            } else {
                // Sets up wal and starts up all server threads.
                handleReportForDutyResponse(w);
                break;
            }
        }
  
        // 启动 rspmHost \ rsQuotaManager
        // ...

        // The main run loop.
        while (!isStopped() && isHealthy()) {
            // 监控ZK节点(zookeeper.znode.state)
            if (!isClusterUp()) {
                // .. 处理集群 down 的情况
                // 关闭所有的 user region
            }
            long now = System.currentTimeMillis();
            // 定期报告心跳
            if ((now - lastMsg) >= msgInterval) {
                tryRegionServerReport(lastMsg, now);
                lastMsg = System.currentTimeMillis();
                doMetrics();
            }
          
            // ...  
        } 
    } catch (Throwable t) {
        // ...
    }
    
    // Run shutdown ...
    // 关闭连接、服务、region、WAL、proxy
    // 清除ZK节点 /hbase/rs (强制删除 znode 存储文件)
    // 关闭rpcClient、rpcservice、monitor、线程池、Zookeeper
    // ...
}

1.1 preRegistrationInitialization

预初始化

private void preRegistrationInitialization(){
    try {
        // 初始化 clusterConnection、metaTableLocator
        setupClusterConnection();
    
        // Health checker thread.
        if (isHealthCheckerConfigured()) {
            int sleepTime = this.conf.getInt(HConstants.HEALTH_CHORE_WAKE_FREQ,
            HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
            healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
        }
        this.pauseMonitor = new JvmPauseMonitor(conf);
        pauseMonitor.start();
    
        // 初始化ZK连接
        initializeZooKeeper();
        if (!isStopped() && !isAborted()) {
            // 初始化线程
            initializeThreads();
        }
    } catch (Throwable t) {
        // ...
    }
}

/**
 * 初始化一系列线程、monitor、和连接
 */
private void initializeThreads() throws IOException {
    // Cache flushing thread.
    this.cacheFlusher = new MemStoreFlusher(conf, this);

    // Compaction thread
    this.compactSplitThread = new CompactSplitThread(this);

    // check for compactions
    this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
    
    this.periodicFlusher = new PeriodicMemstoreFlusher(this.threadWakeFrequency, this);
    
    // creat lease monitor
    this.leases = new Leases(this.threadWakeFrequency);

    // Create the thread to clean the moved regions list
    movedRegionsCleaner = MovedRegionsCleaner.create(this);

    if (this.nonceManager != null) {
        // Create the scheduled chore that cleans up nonces.
        nonceManagerChore = this.nonceManager.createCleanupScheduledChore(this);
    }

    // Setup the Quota Manager
    rsQuotaManager = new RegionServerQuotaManager(this);
    
    // Setup RPC client for master communication
    rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
        rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());

    boolean onlyMetaRefresh = false;
    int storefileRefreshPeriod = conf.getInt(
        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD
      , StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
    if (storefileRefreshPeriod == 0) {
        storefileRefreshPeriod = conf.getInt(
          StorefileRefresherChore.REGIONSERVER_META_STOREFILE_REFRESH_PERIOD,
          StorefileRefresherChore.DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD);
        onlyMetaRefresh = true;
    }
    if (storefileRefreshPeriod > 0) {
        this.storefileRefresher = new StorefileRefresherChore(storefileRefreshPeriod,
          onlyMetaRefresh, this, this);
    }
    registerConfigurationObservers();
}

1.2 handleReportForDutyResponse

启动 WAL 和线程

// response包含conf:
// hbase.regionserver.hostname.seen.by.master
// fs.default.name
// hbase.rootdir
protected void handleReportForDutyResponse(final RegionServerStartupResponse c)
  throws IOException {
    try {
        for (NameStringPair e : c.getMapEntriesList()) {
            String key = e.getName();
            // The hostname the master sees us as.
            if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
                // master 为rs重新定义 hostname
                // rs得到新的 serverName
                String hostnameFromMasterPOV = e.getValue();
                this.serverName = ServerName.valueOf(hostnameFromMasterPOV,
                  rpcServices.isa.getPort(), this.startcode);
            
                // 校验 hostname 
                continue;
            }
            // 覆盖原有 conf
            String value = e.getValue();
            this.conf.set(key, value);
        }

        // ZK节点写到磁盘,用于处理程序异常情况
        ZNodeClearer.writeMyEphemeralNodeOnDisk(getMyEphemeralNodePath());

        // 创建 cacheConfig、walFactory
        this.cacheConfig = new CacheConfig(conf);
        this.walFactory = setupWALAndReplication();
        this.metricsRegionServer = new MetricsRegionServer(new MetricsRegionServerWrapperImpl(this));

        // 启动前一步创建的线程,启动ExecutorService
        startServiceThreads();
        startHeapMemoryManager();

        // 通知其他监听线程 rs online
        synchronized (online) {
            online.set(true);
            online.notifyAll();
        }
    } catch (Throwable e) {
        // ...
    }
}

2. HRegionServer主要干以下事情:

上一篇 下一篇

猜你喜欢

热点阅读