Flink实战之写Hive性能问题
2020-12-26 本文已影响0人
〇白衣卿相〇
上一篇中写了Kafka to Hive的样例,在实际测试过程中,发现性能比较差。
问题1
我使用的是Flink1.11.1版本,这个版本有个性能上的问题,见FLINK-19121。该问题已经在1.11.3版本中修复,
在HiveTableSink中沿用了FileSystemTableSink的TableRollingPolicy,再该policy中每条数据都会调用
@Override
public boolean shouldRollOnEvent(
PartFileInfo<String> partFileState,
RowData element) throws IOException {
return partFileState.getSize() > rollingFileSize;
}
在PartFileInfo对应Hivesink的实现是HadoopPathBasedPartFileWriter,最终会调用
@Override
public long getSize() throws IOException {
return fs.getFileStatus(inProgressPath).getLen();
}
每次都会取请求HDFS来获取文件的大小,这就使得很大的开销花费在和HDFS交互上,同时对HDFS造成了很大的IO压力。
问题2
将上面的bug修复之后进行测试,发现写数据的速度仍然要落后kafka生产速度,任务运行30分钟后就已经达到3分钟的落后,持续运行下去落后将越来越大。
官网有这样一个配置参数table.exec.hive.fallback-mapred-writer
默认是true 使用MR writer
设为false则使用flink native writer,即StreamingFileSink的BulkWriter。
所以将该参数设为false之后进行测试,写入速度基本和生产速度一致了。
既然MR writer相比BulkWriter性能差很多,为什么不默认使用BulkWriter主要有两个原因:
- 支持的数据类型没有MR writer的全
- BulkWriter 支持parquet和orc,但只支持orc的最新版本,写入低版本有兼容性问题