自定义输入格式

2019-03-11  本文已影响0人  苏坡闷

首先,我们需要弄懂为什么要为什么要用RecordReader,还要知道为什么要自定义RecordReader。

在我们对文件切片之后,我们需要将切片后的文件转为key-value的键值对。RecordReader的作用就是这个。
RecordReader是一个抽象类,在实际运用中,常见的子类有LineRecordReader、SequenceFileRecordReader、CombineFileRecordReader、KeyValueLineRecordReader、DBRecordReader

1.LineRecordReader:是用每行的偏移量作为map的key,每行的内容作为map的value
2.CombineFileRecordReader:处理CombineInputSplit里的每一个chunk的RecordReader,CombineInputSplit包含不同的小文件chunk信息
3.KeyValueLineRecordReader:根据指定的分割符却切分每一行数据,如果没有指定分割符,那么key就是整行的文本,value就是空
4.DBRecordReader:从数据库表中读取数据
上述的这些内置的RecordReader能解决部分问题,但是在实际运用中还会有许多特殊的使用场景,此时我们就需要自定义RecordReader来处理问题。

1.自定义RecordReader:

继承抽象类RecordReader,实现RecordReader的一个实例;
实现自定义InputFormat类,重写InputFormat中createRecordReader()方法,返回值是自定义的RecordReader实例;
配置job.setInputFormatClass()设置自定义的InputFormat实例;


image.png

2.RecordReader类为一个抽象类,共有下面这些抽象方法,

//实现了java7的新特性AuotCloseable
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {

    //初始化RecordReader,只能被调用一次。
    public abstract void initialize(InputSplit split,
                                  TaskAttemptContext context
                                  ) throws IOException, InterruptedException;
    //读取下一个key/value键值对
    public abstract boolean nextKeyValue() throws IOException, InterruptedException;

    //获取当前的key
    public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
  
    //获取当前的vlaue
    public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
  
    //进度
    public abstract float getProgress() throws IOException, InterruptedException;
  
    //关闭RecordReader
    public abstract void close() throws IOException;
}

3.案例

将多个小文件合并成一个SequenceFile文件(SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式),SequenceFile里面存储着多个文件,存储的形式为文件路径+名称为key,文件内容为value。

// 定义类继承FileInputFormat
public class WholeFileInputformat extends FileInputFormat<Text, BytesWritable>{
    
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)   throws IOException, InterruptedException {
        
        WholeRecordReader recordReader = new WholeRecordReader();
        recordReader.initialize(split, context);
        
        return recordReader;
    }
}

public class WholeRecordReader extends RecordReader<Text, BytesWritable>{

    private Configuration configuration;
    private FileSplit split;
    
    private boolean isProgress= true;
    private BytesWritable value = new BytesWritable();
    private Text k = new Text();

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        //InputSplit类里面没有split这个切片的详细信息,经查看在他的子类FileSplit中有这个属性,所以要强转
        this.split = (FileSplit)split;
        configuration = context.getConfiguration();
    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        
        if (isProgress) {
            // 1 定义缓存区
            byte[] contents = new byte[(int)split.getLength()];
            
            FileSystem fs = null;
            FSDataInputStream fis = null;
            
            try {
                // 2 获取文件系统
                Path path = split.getPath();
                fs = path.getFileSystem(configuration);
                
                // 3 读取数据
                fis = fs.open(path);
                
                // 4 读取文件内容
                IOUtils.readFully(fis, contents, 0, contents.length);
                
                // 5 输出文件内容
                value.set(contents, 0, contents.length);

                // 6 获取文件路径及名称
                String name = split.getPath().toString();

                 // 7 设置输出的key值
                 k.set(name);

            } catch (Exception e) {
                
            }finally {
                IOUtils.closeStream(fis);
            }
            
            isProgress = false;
            
            return true;
        }
        
        return false;
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return k;
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    @Override
    public void close() throws IOException {
    }
}

public class SequenceFileMapper extends Mapper<Text, BytesWritable, Text, BytesWritable>{
    
    @Override
    protected void map(Text key, BytesWritable value,           Context context)        throws IOException, InterruptedException {

        context.write(key, value);
    }
}

public class SequenceFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {

    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context)        throws IOException, InterruptedException {

        context.write(key, values.iterator().next());
    }
}

public class SequenceFileDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        
       // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args = new String[] { "e:/input/inputinputformat", "e:/output1" };

       // 1 获取job对象
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

       // 2 设置jar包存储位置、关联自定义的mapper和reducer
        job.setJarByClass(SequenceFileDriver.class);
        job.setMapperClass(SequenceFileMapper.class);
        job.setReducerClass(SequenceFileReducer.class);

       // 7设置输入的inputFormat
        job.setInputFormatClass(WholeFileInputformat.class);
        // 8设置输出的outputFormat
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
       
        // 3 设置map输出端的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        
       // 4 设置最终输出端的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

       // 5 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

       // 6 提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

上一篇下一篇

猜你喜欢

热点阅读