杂物间我爱编程

HBase那些事

2017-07-13  本文已影响600人  分痴

HBase那些事

@(大数据工程学院)[HBase, Hadoop, 优化, HadoopChen, hbase]

[TOC]

HBase特性

HBase是什么

HBase(Hadoop Database),一个高可靠性、高性能、面向列、可伸缩、 实时读写的分布式数据库。

选择特性

HBase环境搭建

CDH

CM安装HBase比较简单,参照安装步骤即可:

image.png

分布式环境

前提条件

  1. Hadoop集群:hadoop01,hadoop02, hadoop03
  2. 用户互信
  3. HBase安装包
  4. JDK
  5. Zookeeper

安装部署

  1. 解压安装包:sudo tar -zxf hbase-1.2.4-bin.tar.gz -C /usr/local/hbase/
  2. 配置环境变量:vi /etc/profile ; export HBASE_HOME=/usr/local/hbase/
  3. 修改配置文件:$HBASE_HOME/conf/hbase-site.xml
<configuration>
  <property>
    <name>hbase.rootdir</name>
       <value>hdfs://nameservice1/hbase</value>
  </property>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>
  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
  </property>
</configuration>
  1. 修改启动脚本:$HBASE_HOME/conf/hbase-env.sh; export JAVA_HOME=/usr/java/jdk/
  2. 修改HMaster配置:$HBASE_HOME/conf/regionservers
hadoop01
hadoop02
hadoop03
  1. 新增备用HMater: $HBASE_HOME/conf/backup-masters
hadoop02
  1. 启动集群:$HBASE_HOME/bin/start-hbase.sh
  2. 验证环境:
create 'hello', 'cf'
put 'hello', 'one', 'cf:a', 'b'
get 'hello', 'one'

HBase架构

看图说话

HBase采用Master/Slave架构搭建集群,它隶属于Hadoop生态系统,由一下类型节点组成:HMaster节点、HRegionServer节点、ZooKeeper集群,而在底层,它将数据存储于HDFS中,因而涉及到HDFS的NameNode、DataNode等,总体结构如下:

image.png
  1. 管理HRegionServer,实现其负载均衡;
  2. 管理和分配HRegion;实现DDL操作;
  3. 管理namespace和table的元数据;权限控制;
  1. 存放和管理本地HRegion;
  2. 读写HDFS,管理Table中的数据;
  3. Client直接通过HRegionServer读写数据;
  1. 存放整个 HBase集群的元数据以及集群的状态信息;
  2. 实现HMaster主从节点的failover。
  1. HBase表数据按行切分成多个HRegion;
  2. HRegion按照列簇切分成多个Store;
  3. 每个Store由一个MemStore和多个StoreFile(HFile)组成;
  4. 每个HRegion还包含一个HLog文件,用于数据恢复;
  1. RegionServer通过DFS Client读写HDFS数据;
  2. RS和DN尽量保证在统一节点,确保数据本地化,提升读写性能;
  3. MemStore满足一定条件下会Flush到HDFS上的HFile文件。

HBaseTable

HBase表主要包含namespace(命名空间)、tableName(表名)、rowKey(主键)、columnFamily(列簇)、qualifier(列)、cell(值)、timeStamp(版本),用结构化的形式展现如下:

| NameSpace | TableName | RowKey | CF1:Name | CF2:Age
| -------- | -----: | :----: |
| Default | HelloWorld | 001 | Allen | 28
| Default | HelloWorld | 002 | Curry | 26
| Default | HelloWorld | 003 | Brant | 20
| Default | HelloWorld | 004 | Sean | 31

image.png

读流程

HBase表数据所在Region,Region所在RegionServer,均存放在HBase表hbase:meta,由于元数据较小,一个2G的HRegion大概可以支持4PB的数据,所以meta表是不可Split的。Meta表本身是一个HBase表,该表所在RS的地址存放在ZK,于是读数据的流程如下:

  1. 需求:从HellWorld表读取002这条记录;
  2. Client从ZK读取meta表所在RegionServer地址信息;
  3. Client和Meta表所在RS建立连接,获取HelloWorld表当中RowKey=002的记录所在的RS地址;并将该地址缓存在客户端;
  4. Client和002所在RS建立连接,申请查询002记录数据;
  5. 002所在RS根据RowKey获取Region信息,并初始化Scaner,Scaner优先扫描BlockCache,然后扫描MemStore,然后扫描StoreFile(HFile),直到找到002记录为止;
  6. RS返回结果数据给Client。

写流程

  1. 需求:写入005到HelloWorld表;
  2. Client从ZK去读Meta表所在RS地址;
  3. Client读取Meta表数据,确认005应该写入RS地址;
  4. Client将005数据写入RS02的HLog,用于灾备恢复,然后写入RegionB的memStore;此刻写操作已完成,反馈给client;
  5. 当memStore满足一定条件下会Flush到hdfs,hdfs再根据写策略复制副本。

Split和Compaction

HBase扩展和负载均衡的基本单位是Region。Region从本质上说是行的集合。当Region的大小达到一定的阈值,该Region会自动分裂(split),或者当该Region的HFile数太多,也会自动合并(compaction)。

对于一张表(HTable)而言,初始时只会有一个Region。表的数据量不断增加,系统会监控此表以确保数据量不会超过一个配置的阈值。如果系统发现表容量超过了限制,该Region会被一分为二。分裂主要看行键(row key),从Region正中的键开始分裂,并创建容量大致相等的两个Region。

根据上述写流程会发现HBase是一种Log-Structured Merge Tree架构模式,用户数据写入先写WAL,再写缓存,满足一定条件后缓存数据会执行flush操作真正落盘,形成一个数据文件HFile。随着数据写入不断增多,flush次数也会不断增多,进而HFile数据文件就会越来越多。然而,太多数据文件会导致数据查询IO次数增多,因此HBase尝试着不断对这些文件进行合并,这个合并过程称为Compaction。

BlockCache

上述读流程中RS会优先扫描BlockCache,BlockCache是一个读缓存,即“引用局部性”原理(也应用于CPU,分空间局部性和时间局部性,空间局部性是指CPU在某一时刻需要某个数据,那么有很大的概率在一下时刻它需要的数据在其附近;时间局部性是指某个数据在被访问过一次后,它有很大的概率在不久的将来会被再次的访问),将数据预读取到内存中,以提升读的性能。

HBase数据按照block块存储,默认是64K,HBase中Block分为四种类型:

HBase中提供两种BlockCache的实现:默认on-heap LruBlockCache和BucketCache(通常是off-heap)。通常BucketCache的性能要差于LruBlockCache,然而由于GC的影响,LruBlockCache的延迟会变的不稳定,而BucketCache由于是自己管理BlockCache,而不需要GC,因而它的延迟通常比较稳定,这也是有些时候需要选用BucketCache的原因。

image.png image.png image.png

MemStore

HBase写入数据会先写WAL,再写缓存,WAL是用于RS故障后的数据恢复,而缓存MemStore则是为了提高写入数据的性能。但MemStore是基于内存,一方面空间有限,一方面数据容易丢失,所以RegionServer会在满足一定条件下讲MemStore数据Flush到HDFS,条件如下:

  1. Memstore级别限制:当Region中任意一个MemStore的大小达到了上限(hbase.hregion.memstore.flush.size,默认128MB),会触发Memstore刷新。

  2. Region级别限制:当Region中所有Memstore的大小总和达到了上限(hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size,默认 2* 128M = 256M),会触发memstore刷新。

  3. Region Server级别限制:当一个Region Server中所有Memstore的大小总和达到了上限(hbase.regionserver.global.memstore.upperLimit * hbase_heapsize,默认 40%的JVM内存使用量),会触发部分Memstore刷新。Flush顺序是按照Memstore由大到小执行,先Flush Memstore最大的Region,再执行次大的,直至总体Memstore内存使用量低于阈值(hbase.regionserver.global.memstore.lowerLimit * hbase_heapsize,默认 38%的JVM内存使用量);HBase1.0后使用新参数hbase.regionserver.global.memstore.size。

  4. 当一个Region Server中HLog数量达到上限(可通过参数hbase.regionserver.maxlogs配置)时,系统会选取最早的一个 HLog对应的一个或多个Region进行flush

  5. HBase定期刷新Memstore:默认周期为1小时,确保Memstore不会长时间没有持久化。为避免所有的MemStore在同一时间都进行flush导致的问题,定期的flush操作有20000左右的随机延时。

  6. 手动执行flush:用户可以通过shell命令 flush ‘tablename’或者flush ‘region name’分别对一个表或者一个Region进行flush。

HBase客户端

HBase提供了许多客户端,例如HBase Shell、JAVA API、REST、Thrift、Avro等等,另外MapReduce、Hive、Spark等分布式计算引擎均有接口读写HBase数据,详细细节参考《HBase In Action》或者《HBase权威指南》。

HBase优化

了解了HBase的基本概念之后,在使用过程中针对每个环节均有优化策略,最常见的优化策略如下。

调整GC策略

HBase内存使用情况参考上文,当吞吐量徒增或者运行一定时长后,大部分内存被BlockCache和MemStore占据,一旦FGC则会“stop the world”,影响RS的正常工作,如果GC时间超过ZK的心跳TimeOut时间,该服务会被ZK标记为BAD状态。所以合理高效的GC策略非常重要。

GC基本知识请参考《深入理解Java虚拟机》,针对HBase这里给出参考JVM参数:

-Xms10g -Xmx10g -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:MaxGCPauseMillis=60000 -XX:CMSInitiatingOccupancyFraction=70 -XX:PermSize=64m -XX:MaxPermSize=256m  -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=3072m -XX:+CMSIncrementalMode

BucketCache Off-Heap

如果存在批量读取HBase数据的请求,BlockCache可能会被迅速占满,GC不及时RS又要悲剧,所以采用BucketCache+Off-Heap是一个不错的优化点,或者不顾性能问题,关闭BlockCache。

<property>     
   <name>hbase.bucketcache.size</name>    
   <value>3072</value> 
</property>
<property>
    <name>hbase.bucketcache.ioengine</name>
    <value>offheap</value> 
</property>
CombinedBlockCache策略下还需要设置L1缓存大小:
<property>
    <name>hbase.bucketcache.combinedcache.enabled</name>
    <value>true</value> 
</property>
<property>
 <name>hfile.block.cache.size</name>
 <value>0.2</value>
</property>
jmap -heap pid  -- 内存使用情况
jmap -histo pid -- 内存粗略统计
jmap -F -dump:format=b,file=dump.file pid  -- dump文件
jhat dump.file -- 通过http://dshbase01:7000查看dump文件,但端口白名单导致无法访问

线程并发数

HBase是一个高并发的分布式数据库,牵扯到ZK、RegionServer、DataNode等组件,所以性能提升跟这些组件的并发线程数的控制离不开关系。

hbase.regionserver.handler.count=300
hbase.regionserver.metahandler.count=300
该配置定义了每个Region Server上的RPC Handler的数量。Region Server通过RPC Handler接收外部请求并加以处理。所以提升RPC Handler的数量可以一定程度上提高HBase接收请求的能力。
maxClientCnxns=60
dfs.datanode.max.transfer.threads=8192
dfs.datanode.handler.count=100

Balancer

HBase是一种支持自动负载均衡的分布式KV数据库,在开启balance的开关(balance_switch)后,HBase的HMaster进程会自动根据 指定策略 挑选出一些Region,并将这些Region分配给负载比较低的RegionServer上。官方目前支持两种挑选Region的策略,一种叫做DefaultLoadBalancer,另一种叫做StochasticLoadBalancer。由于HBase的所有数据(包括HLog/Meta/HStoreFile等)都是写入到HDFS文件系统中的, 因此HBase的Region移动其实非常轻量级。在做Region移动的时候,保持这个Region对应的HDFS文件位置不变,只需要将Region的Meta数据分配到相关的RegionServer即可,整个Region移动的过程取决于RegionClose以及RegionOpen的耗时,这个时间一般都很短。

Region切分

Region Split的依据是最大Region Size,调大该参数可以减少Region数量,同时也减少了MemStore数量和总空间。

hbase.hregion.max.filesize=2G

非本地化部署

HBase读取数据时会从HDFS上的HFile加载数据到BlockCache,如果是本地数据则非常高效,如果不同机器或不同机架,则会带来网络消耗,同样写流程当中的Flush过程一样会本地化的问题,所以建议DataNode和RegionServer部署在同一台机器,因为RS调用hdfs client去读写数据,hdfs client优先会读写本地磁盘。另外也可以通过HBase Web UI观察数据本地化情况。

MemStore Flush

MemStore的频繁Flush会消耗大量服务器资源,根据Flush条件合理控制Flush次数是一个经常要观察的优化点。

<property>
 <name>hbase.regionserver.global.memstore.size</name>
 <value>0.6</value>
</property>
hbase.regionserver.maxlogs=500

压缩

压缩通常会带来较好的性能,因为CPU压缩和解压消耗的时间比从磁盘中读取和写入数据消耗的时间更短,HBase支持多种压缩方式,例如snappy、lzo、gzip等等,不同压缩算法压缩比和效率有一定差异。开启表压缩的方式如下:

disable 'MILESTONE'
alter 'MILESTONE',{NAME=>'PUSH',COMPRESSION=>'snappy'}
alter 'MILESTONE',{NAME=>'BEHAVIOR',COMPRESSION=>'snappy'}
enable 'MILESTONE'

MSLAB

HBase内存回收策略CMS一定程度上降低了Stop时间,但却避免不了内存碎片的问题,HBase提供了以下几个参数,防止堆中产生过多碎片,大概思路也就是参考TLAB,叫做MSLAB,MemStore-Local Allocation Buffer。

一个regionserver的内存里各个region的数据混合在一起,当某个region被flush到磁盘时,就会形成很多堆碎片。其实这跟java中gc模型的假设是冲突的,同一时间创建的对象,会在同一时间消亡。这个问题可以通过Arena Allocation来解决,即每次分配内存时都是在一个更大的叫做arena的内存区域分配。一个典型的实现是TLAB,即每个线程维护自己的arena,每个线程用到的对象都在自己的arena区域分配。其实,jvm早已经实现了TLAB,但是这个对于hbase不适用,因为hbase的regionserver使用一个单独的线程来处理所有region的请求,就算这个线程用arena方式分配还是会把所有region的数据混在一起。因此hbase自己实现了MSLAB,即每个region的memStore自己实现了arena,使各个region的数据分开,就不会形成太细的碎片。Arena里存放的是KeyValue对象,如果这些KeyValue对象是一样大的,不会导致严重碎片,相反这些KeyValue对象引用的字节数组才是引起碎片的主因,因此要做的就是把这些字节数组分配在一起。

hbase.hregion.memstore.mslab.enabled=true // 开启MSALB
hbase.hregion.memstore.mslab.chunksize=2m // chunk的大小,越大内存连续性越好,但内存平均利用率会降低
hbase.hregion.memstore.mslab.max.allocation=256K // 通过MSLAB分配的对象不能超过256K,否则直接在Heap上分配,256K够大了

RowKey设计

HBase是根据Rowkey来进行检索的,所以合理的Rowkey设计可以提高查询效率。

  1. 数据的持久化文件HFile中是按照KeyValue存储的,如果Rowkey过长比如100个字节,1000万列数据光Rowkey就要占用100*1000万=10亿个字节,将近1G数据,这会极大影响HFile的存储效率;

  2. MemStore将缓存部分数据到内存,如果Rowkey字段过长内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此Rowkey的字节长度越短越好。

  3. 目前操作系统是都是64位系统,内存8字节对齐。控制在16个字节,8字节的整数倍利用操作系统的最佳特性。

客户端调优

Hive On HBase

NoSQL种类繁多,各有优势,HBase其中一个优势就是和HDFS的集成,想象下如果实时事务数据需要结合历史统计数据,则需要将Hive离线跑批的数据T+1日导入HBase,这个数据搬迁的过程,针对Redis\MongoDB\cassandra,则需要定制开发ETL工具。对于HBase,Hive提供了HBaseStorageHandle解决方案:

-- KV映射
CREATE TABLE helloWorld(key int, value string)
  STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
  WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
  TBLPROPERTIES ("hbase.table.name" = "helloWorld");
-- 列簇映射
CREATE TABLE helloWorld(key int, cf1 map<string,string>, cf2 map<string, string>)
  STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
  WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:")
  TBLPROPERTIES ("hbase.table.name" = "helloWorld");

然后基于HQL插入数据即可。

Spark On HBase

某些业务场景下面,需要结合多个数据源的数据,比如redis和hbase的数据进行join。第一个念头则是Spark DataFrame或DataSet,DF支持多种数据源,同时还提供了SparkSQL语法用于统计分析。

// 配置文件
val conf = new SparkConf()
      .setAppName(jobName)
      .setMaster("local[*]")
      .set("redis.host", redisHost)
      .set("redis.port", redisPort)
      .set("redis.db", redisDB)
      .set("redis.timeout", redisTimeOut)
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

// RDD读取   
var redisRDD = sc.fromRedisSet(groupID)
// 注册DF
val redisSchemaString = "USER_ID"
val redisSchema = StructType(redisSchemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = redisRDD.map(p => Row(p.toString.trim))
var redisDF = sqlContext.createDataFrame(rowRDD, redisSchema)
redisDF.registerTempTable(redisDFTableName)
  1. newAPIHadoopRDD:Spark官方提供的HDFS文件读写接口;
// 配置文件
    val hconf = HBaseConfiguration.create()
    hconf.set("hbase.zookeeper.property.clientPort", zkPort)
    hconf.set("hbase.zookeeper.quorum", zkQuorum)
    
// RDD读取
    hconf.set(TableInputFormat.INPUT_TABLE, tagTableName)
    hconf.set(TableInputFormat.SCAN_COLUMNS, hbaseColumns)
    val hbaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])...

// 注册DF:注册之前需要将HBaseRDD转换成RowRDD,此处省略
    val hbaseDF = sqlContext.createDataFrame(hbaseRDD, hbaseStruct)
    hbaseDF.registerTempTable(hbaseDFTableName)
  1. SHC :hortonworks提供的Spark读写HBase的方案,具体Demo见Git,SHC只支持spark2.0+版本;
// Schema定义
    def catalog =
      s"""{
         |"table":{"namespace":"default", "name":"tag"},
         |"rowkey":"key",
         |"columns":{
         |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |"col1":{"cf":"behavior", "col":"activate", "type":"string"}}
        }""".stripMargin

// 读取HBase数据
    def withCatalog(cat: String): DataFrame = {
          sqlContext
            .read
            .options(Map(HBaseTableCatalog.tableCatalog -> cat))
            .format("org.apache.spark.sql.execution.datasources.hbase")
            .load()
        }

// 注册DF
    val df = withCatalog(catalog)
    df.registerTempTable("userBehaviorTime")
  1. nerdammer : 写入操作必须是tunpleRDD,但Scala的tunple最大长度22,如果需要查询的HBase列数超过22,则无法使用该方案;
// nerdammer方案读取HBase数据:结果是tuple,长度有限制22.
val hbaseRDD = sc.hbaseTable[(Option[String], Option[String], Option[String], Option[String], Option[String],
Option[String], Option[String], Option[String], Option[String], Option[String], Option[String], Option[String],
Option[String])]("tag").select(columnStringArray: _*).inColumnFamily("behavior")

// DF只接收RowRDD
    val hbaseRowRDD = hbaseRDD.map(
      i => {
        var record = new ArrayBuffer[String]()
        i.productIterator.foreach { col => {
          record += col.asInstanceOf[Option[String]].get
        }
        }
        Row(record.toArray: _*)
      })

// RowRDD转成DF
    val hbaseDF = sqlContext.createDataFrame(hbaseRowRDD, hbaseSchema.struct)
    hbaseDF.registerTempTable("userBehaviorTime")
  1. unicredit太小众,坑太多,不建议使用;

  2. Phoenix : 两大优势Sql On HBase,HBase二级索引实现。HBase神器,但无法关联Redis数据和HBase数据。

以上解决方案均是采用传统的JOIN方式,这样会带来一个问题就是HBase全表扫描,性能低下。解决思路是先将RedisRDD进行repartition操作,针对每个partition形成HBase的List[Get]对象,进行批量读取。但同样批量读取需要考虑内存激增的情况,所以需要结合上面的优化点使用。

Phoenix On HBase

Phoenix是构建在HBase上的一个SQL层,能让我们用标准的JDBC API而不是HBase客户端API来增删查改HBase数据。相对原生HBase接口,Phoenix提供了以下特性:

有兴趣者查阅Phoenix官网。

上一篇 下一篇

猜你喜欢

热点阅读