大数据 - 给你的Hbase提提速写入性能吧、产生环境级别
背景
之前我们的线上业务一直使用的是Hbase的单条put操作,为了提高程序的写入性能我们还针对业务进行了修改,将日志批量化,也就是hbase的put多条操作,后面发现hbase的客户端是支持本地批量操作,而且还挺多配置的,这次主要针对的是Hbase2.x
的版本进行演示说明,1.X
不在本文章的范围内,因为线上的Hbase没有这个版本,作为同龄的小伙伴们是知道 大猪佩琪 不会去线上安装这样一个版本来演示
开始直播
案发现场还原
说实话,这样子使用Hbase来写操作其实是最多人的,之前也包括我们在内:
val connection: Connection = ConnectionFactory.createConnection()
val table = connection.getTable(TableName.valueOf("logTable"))
table.put(单条/多条)
但是,这不是你不想加速
不想进步
的理由,看了 大猪佩琪 这篇文章之后 ,不想进步都难,又多了一个跟我抢饭碗的,看我下面煮的饭:
//①
val params = new BufferedMutatorParams(TableName.valueOf("logTable"))
//②
params.setWriteBufferPeriodicFlushTimeoutMs(TimeUnit.SECONDS.toMillis(3))
//③
params.setWriteBufferPeriodicFlushTimerTickMs(100)
//④
params.maxKeyValueSize(10485760)
//⑤
params.writeBufferSize(1024 * 1024 * 2)
//⑥
val table = connection.getBufferedMutator(params)
看到这几句,是不是有种相见恨晚的感觉?不闹了,容 大猪佩琪 慢慢道来:
-
这句太难解释,PASS。
-
设置自动提交的间隔,如果不设置,那你就要付出代价了,那就是手动:
table.flush()
,不信?看大猪从Hbase
拿出来的源码的默认配置,就在楼下。 -
这又是个什么鬼,又臭又长,名还跟第2句差不多一样,既然跟第一句差不多肯定是有联系的嘛,其实就是检查什么时候到
Flush
的定时器,默认是1秒检查一次,如果加上自身配置上面的配置至少要4秒或者大小为2M才能自动提交一次。 -
KV单条的最大Size,如果你像 大猪佩琪 上面一样配置为1,信不信分分钟钟让你体会
KeyValue size too large 案发现场KeyValue size too large
异常,就是下面的案发现场:
不设置就默认为 10M 大小。 -
看到情人了,一看就是我们最想要的设置,可以看楼下的
hbase writeBufferSizedoFlush
源码,
有三个地方会调用到这个方法:mutate
、close
、flush
只有第一个是会按照writeBufferSize
来大小来自动计算,其它两个都是flushAll
操作,属于不黄但很暴力的那种。
-
这句跟第1句一样看不懂,肥家。
使用
上面的解析已经完了,相信大家都会用了,还不会的话,请看下图 + 例子:
val row = new Put(Bytes.toBytes(rowkey))
row.addColumn(Bytes.toBytes("info"), Bytes.toBytes('name"), Bytes.toBytes("大猪佩琪"))
table.mutate(row)
// 程序结束前请调用
table.close()
源码
Hbase 2.x 配置
org.apache.hadoop.hbase.client.ConnectionConfiguration
头部默认配置
public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT = 0; // 0 == Disabled
public static final long WRITE_BUFFER_PERIODIC_FLUSH_TIMERTICK_MS_DEFAULT = 1000L; // 1 second
public static final int MAX_KEYVALUE_SIZE_DEFAULT = 10485760;
put验证
public static void validatePut(Put put, int maxKeyValueSize) throws IllegalArgumentException {
if (put.isEmpty()) {
throw new IllegalArgumentException("No columns to insert");
}
if (maxKeyValueSize > 0) {
for (List<Cell> list : put.getFamilyCellMap().values()) {
for (Cell cell : list) {
if (KeyValueUtil.length(cell) > maxKeyValueSize) {
throw new IllegalArgumentException("KeyValue size too large");
}
}
}
}
}
doFlush
private void doFlush(boolean flushAll) throws InterruptedIOException,
RetriesExhaustedWithDetailsException {
List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>();
while (true) {
if (!flushAll && currentWriteBufferSize.get() <= writeBufferSize) {
// There is the room to accept more mutations.
break;
}
AsyncRequestFuture asf;
try (QueueRowAccess access = createQueueRowAccess()) {
if (access.isEmpty()) {
// It means someone has gotten the ticker to run the flush.
break;
}
asf = ap.submit(createTask(access));
}
// DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't
// be released.
asf.waitUntilDone();
if (asf.hasError()) {
errors.add(asf.getErrors());
}
}
RetriesExhaustedWithDetailsException exception = makeException(errors);
if (exception == null) {
return;
} else if(listener == null) {
throw exception;
} else {
listener.onException(exception, this);
}
}