Java操作parquet

2018-03-19  本文已影响2540人  明翼

这段时间因为项目,对parquet做了一系列研究,从写入跟踪到合并及spark使用等等场景。
选择parquet来对流数据进行序列化,用于后续离线分析的理由有以下几点:
1、流数据一般格式比较杂乱,可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量。
2、网络流量数据量非常的庞大,即使过滤部分,还是非常吓人,parquet压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码(例如RunLength Encoding和Delta Encoding)进一步节约存储空间。
3、后续分析,只读取需要的列,支持向量运算,能够获取更好的扫描性能

写入parquet的过程,代码部分还是比较简单的

1、创建schema

public void builderSchema(List<Map<String, String>> list) throws IllegalArgumentException, IOException {  
MessageTypeBuilder builder = Types.buildMessage();  
 list.get(0).forEach((key, value) -> {  
builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(key);  
  });  
 File file = new File("./conf/parquet.schema");  
  if (file.exists()) {  
 FileUtils.readLines(file).forEach(str -> {  
 builder.optional(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(str);  
 });  
  }  
  MessageType schema = builder.named("aus");  
  hdfsConfig.set("parquet.example.schema", schema.toString());  
  factory = new SimpleGroupFactory(schema);  
 }  
我这个是动态根据传入的数据,自动创建schema的,同时也支持手动配置

2、创建writer

 public void refreshWriter(String path) throws IllegalArgumentException, IOException {  
closeParquetWriter();  
if (factory != null) {  
 writer = new ParquetWriter<>(new Path(path + "/" + new Date().getTime() + ".parquet"), hdfsConfig,  
new GroupWriteSupport());  
 }  
 }  
ps:声明ParquetWriter有非常多的构造,我这个选择的参数最少的。原因后面分析会讲。

3、写入数据

public void writeParquet(List<Map<String, String>> list) throws IOException {  
 for (Map<String, String> map : list) {  
Group group = factory.newGroup();  
map.forEach((key, value) -> {  
group.append(key, value);  
 });  
 writer.write(group);  
  }  
}  
以上的代码便可以实现把数据序列化成parquet格式文件。

在实际开发过程中,遇到过好几个坑,首先开始我选择的是另外一个构造器,详情如下,此构造器可以手动指定block,pagesize等大小。
 public ParquetWriter(  
 Path file,  
ParquetFileWriter.Mode mode,  
 WriteSupport<T> writeSupport,  
 CompressionCodecName compressionCodecName,  
 int blockSize,  
 int pageSize,  
 int dictionaryPageSize,  
 boolean enableDictionary,  
boolean validating,  
WriterVersion writerVersion,  
  Configuration conf) throws IOException {  
  this(file, mode, writeSupport, compressionCodecName, blockSize,  
validating, conf, MAX_PADDING_SIZE_DEFAULT,  
 ParquetProperties.builder()  
 .withPageSize(pageSize)  
  .withDictionaryPageSize(dictionaryPageSize)  
  .withDictionaryEncoding(enableDictionary)  
  .withWriterVersion(writerVersion)  
 .build());  
  }  
在调试中,发现这几个参数死活都不生效,后来经过源码跟踪调试,发现,在parquet的底层他实现这样的功能,因为我HDFS的block大小为128M,而我代码中设置的为4K,所以死活都不生效,看着数据不停地写入,但是hdfs上文件大小没有丝毫变化,原来都是在内存中。
  1. // use the default block size, unless row group size is larger
    2 long dfsBlockSize = Math.max(fs.getDefaultBlockSize(file), rowGroupSize);

  2. 即获取了DFS的block块大小,然后跟设置的值比较取最大值。

    有一种办法可以强制性提交到服务器,那就是调用writer的close方法。但问题是,close当前这个文件之后,下次就不能再打开续写的,parquet只有两种模式,要么创建,要么覆盖。所以对于流数据场景,想要比较好的实时性,那就会创建非常多的小文件,这对hdfs的压力是非常大的。所以,在项目中,我选择了定时刷新writer,意思就是每隔一个小时,或者每隔一天来创建一个writer,这样可以保证一个文件不至于太小,且可以及时关闭掉,好让spark读取。(ps:未close掉的parquet文件,spark是没法加载的,会提示不是parquet格式的文件)

    还有一种办法可以合并parquet小文件,在spark研究中发现,有这样一个特性,coalesce函数可以指定block个数来输出,并且可以加载父目录下全部的parquet文件,所以可实现将多个parquet文件合并成一个文件。


    image.png

    可以看到之前目录下有4001个文件:


    image.png

处理之后,可以发现成功合并:


image.png

至于spark读取parquet文件,进行分析,就非常简单了,用spark-shell做一个简单的演示:


image.png

转个同事写的:http://blog.csdn.net/cyony/article/details/79608261

上一篇下一篇

猜你喜欢

热点阅读