flink 问题总结(4)如何读取sequence 文件
2020-05-11 本文已影响0人
ZYvette
问题: flink1.8 如何读HDFS的sequence 文件?
解决办法:
1.flink lib目录下需要放下,对应版本的jar包
flink-hadoop-compatibility_2.11-1.8.0.jar
flink-shaded-hadoop-2-uber-2.6.5-8.0.jar
- pom 文件中需要有对应haoop的jar包
3.代码如下
Job job = Job.getInstance();
FileInputFormat.addInputPath(job, new Path(path));
DataStream<String> dataStream = env.createInput(
HadoopInputs.createHadoopInput(
new SequenceFileInputFormat<IntWritable, Text>(), IntWritable.class, Text.class, job))
.flatMap(new FlatMapFunction<Tuple2<IntWritable, Text>, String>() {
@Override
public void flatMap(Tuple2<IntWritable, Text> value, Collector<String> collector) {
collector.collect(String.valueOf(value.f1));
}
});
注意:Text转字符串,直接用String.valueOf(value.f1)转换即可,转成字节再转成字符串会有转码问题。