大数据分析Hbase 实战大数据,机器学习,人工智能

大数据 - 给你的Hbase提提速写入性能吧、产生环境级别

2019-03-24  本文已影响3人  大猪大猪

背景

之前我们的线上业务一直使用的是Hbase的单条put操作,为了提高程序的写入性能我们还针对业务进行了修改,将日志批量化,也就是hbase的put多条操作,后面发现hbase的客户端是支持本地批量操作,而且还挺多配置的,这次主要针对的是Hbase2.x的版本进行演示说明,1.X不在本文章的范围内,因为线上的Hbase没有这个版本,作为同龄的小伙伴们是知道 大猪佩琪 不会去线上安装这样一个版本来演示

Hbase 2.X 写性能优化 批量处理

开始直播

案发现场还原
说实话,这样子使用Hbase来写操作其实是最多人的,之前也包括我们在内:

val connection: Connection = ConnectionFactory.createConnection()
val table = connection.getTable(TableName.valueOf("logTable"))
table.put(单条/多条)

但是,这不是你不想加速不想进步的理由,看了 大猪佩琪 这篇文章之后 ,不想进步都难,又多了一个跟我抢饭碗的,看我下面煮的饭:

//①
val params = new BufferedMutatorParams(TableName.valueOf("logTable"))
//②
params.setWriteBufferPeriodicFlushTimeoutMs(TimeUnit.SECONDS.toMillis(3))
//③
params.setWriteBufferPeriodicFlushTimerTickMs(100)
//④
params.maxKeyValueSize(10485760)
//⑤
params.writeBufferSize(1024 * 1024 * 2)
//⑥
val table = connection.getBufferedMutator(params)

看到这几句,是不是有种相见恨晚的感觉?不闹了,容 大猪佩琪 慢慢道来:

  1. 这句太难解释,PASS。

  2. 设置自动提交的间隔,如果不设置,那你就要付出代价了,那就是手动:table.flush(),不信?看大猪从Hbase拿出来的源码的默认配置,就在楼下。

  3. 这又是个什么鬼,又臭又长,名还跟第2句差不多一样,既然跟第一句差不多肯定是有联系的嘛,其实就是检查什么时候到Flush的定时器,默认是1秒检查一次,如果加上自身配置上面的配置至少要4秒或者大小为2M才能自动提交一次。

  4. KV单条的最大Size,如果你像 大猪佩琪 上面一样配置为1,信不信分分钟钟让你体会 KeyValue size too large 异常,就是下面的案发现场:

    KeyValue size too large 案发现场
    不设置就默认为 10M 大小。
  5. 看到情人了,一看就是我们最想要的设置,可以看楼下的doFlush源码,
    有三个地方会调用到这个方法:mutatecloseflush
    只有第一个是会按照writeBufferSize来大小来自动计算,其它两个都是flushAll操作,属于不黄但很暴力的那种。

    hbase writeBufferSize
  6. 这句跟第1句一样看不懂,肥家。

使用

上面的解析已经完了,相信大家都会用了,还不会的话,请看下图 + 例子:

Hbase 批量写入
val row = new Put(Bytes.toBytes(rowkey))
row.addColumn(Bytes.toBytes("info"), Bytes.toBytes('name"), Bytes.toBytes("大猪佩琪"))
table.mutate(row)
// 程序结束前请调用
table.close()

源码

Hbase 2.x 配置
org.apache.hadoop.hbase.client.ConnectionConfiguration 头部默认配置

public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT = 0; // 0 == Disabled
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second
public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;

put验证

public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
    if (put.isEmpty()) {
      throw new IllegalArgumentException("No columns to insert");
    }
    if (maxKeyValueSize > 0) {
      for (List<Cell> list : put.getFamilyCellMap().values()) {
        for (Cell cell : list) {
          if (KeyValueUtil.length(cell) > maxKeyValueSize) {
            throw new IllegalArgumentException("KeyValue size too large");
          }
        }
      }
    }
  }

doFlush

private void doFlush(boolean flushAll) throws InterruptedIOException,
      RetriesExhaustedWithDetailsException {
    List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>();
    while (true) {
      if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) {
        // There is the room to accept more mutations.
        break;
      }
      AsyncRequestFuture asf;
      try (QueueRowAccess access = createQueueRowAccess()) {
        if (access.isEmpty()) {
          // It means someone has gotten the ticker to run the flush.
          break;
        }
        asf = ap.submit(createTask(access));
      }
      // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't
      // be released.
      asf.waitUntilDone();
      if (asf.hasError()) {
        errors.add(asf.getErrors());
      }
    }

    RetriesExhaustedWithDetailsException exception = makeException(errors);
    if (exception == null) {
      return;
    } else if(listener == null) {
      throw exception;
    } else {
      listener.onException(exception, this);
    }
  }
上一篇下一篇

猜你喜欢

热点阅读