我爱编程

MapReduce如何处理不可分割文件

2018-06-21  本文已影响0人  IT_小白

有时会有这样的逻辑需求,一个    map   任务需要处理一个文件中的所有内容或是

把整个文件作为一条记录处理。

即使不分割文件,仍然需要一个    RecordReader    来读取文件的所有内容作为record的值。

Hadoop-version    :    2.7.1

Jdk-version            :    1.8

Maven-version      :    3.3.3

完成此功能需要重写两个类

1.    继承    FileInputFormat    类  重写其中的两个方法以下代码为列子

public class WholeFileInputFormatextends FileInputFormat {

//    createRecordReader    方法是返回一个定制的RecordReader实现

@Override

    public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {

WholeFileRecordReader reader =new WholeFileRecordReader();

reader.initialize(split, context);

return reader;

}

//    isSplitable    方法是说明此文件是否分割    返回 true    为分割    返回 false 为不分割

@Override

    protected boolean isSplitable(JobContext context, Path filename) {

return false;

}

}

2.继承    RecordReader    类    重写其中的三个方法

public class WholeFileRecordReader extends RecordReader {

private FileSplitfileSplit;

private Configurationconf;

private Textkey =new Text();

private BytesWritablevalue =new BytesWritable();

private boolean processed =false;

@Override

    public void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException {

this.fileSplit = (FileSplit) split;

this.conf = context.getConfiguration();

}

/**

*  这个方法会被调用两次

*  第一次读取整个文件流 将其转换为    BytesWritable对象

*  并且将 processed  设置为 true

*  第二次 在  processed 为true时

*  将不会对数据进行加载

*  并且返回    false 代表这个文件已经加载完成

*

    * @return

    * @throws IOException

    * @throws InterruptedException

*/

    @Override

    public boolean nextKeyValue()throws IOException, InterruptedException {

if (!processed){

byte[] contents =new byte[(int)fileSplit.getLength()];

Path file =fileSplit.getPath();

FileSystem fs = file.getFileSystem(conf);

FSDataInputStream in =null;

try {

in = fs.open(file);

IOUtils.readFully(in, contents,0, contents.length);

key.set(file.toString());

value.set(contents,0 ,contents.length);

}finally {

IOUtils.closeStream(in);

}

processed =true;

return true;

}

return false;

}

@Override

    public Text getCurrentKey()throws IOException, InterruptedException {

return key;

}

@Override

    public BytesWritable getCurrentValue()throws IOException, InterruptedException {

return value;

}

@Override

    public float getProgress()throws IOException, InterruptedException {

return processed ?1.0f :0.0f;

}

@Override

    public void close()throws IOException {

}

}

WholeFileRecordReader    类负责将FileSplit转换成一条记录,该记录的键是文件的路径

如果不需要可以不对Key进行初始化返回    Null    即可,值是这个文件的内容,因为只有一条记录

WholeFileRecordReader 要么处理这条数据,要么不处理,所以他维护一个名称为    processed

的布尔变来表示这个文件是否被处理过。如果当    nextKeyValue()    方法被调用时,文件没有被处理过,

就打开文件,产生一个长度是文件长度的字节数组,并用    Hadoop    的    IOUtils    类把文件的内容放入字节数组。

然后在被传递到    next()    方法的    BytesWritable    实例上设置数组,返回    true    则表示成功读取记录。

                以上为所有的代码实例以及说明,下面来说一下所遇到的问题

1.此程序无法    Dubug    也就是说无法调试    (至今未解决)

2.在mapper阶段获取    value    时这个值不能直接使用,因为在调用    nextKeyValue()    方法时已经转换为    BytesWritable

  类型值的长度变长了,这是由于    Hadoop    里面    BytesWritable    的实现机制造成的,BytesWritable    的实现中,

保存了一个    byte[]    存放内容,一个    int size    表示    byte    数组里面前多少位是有效的,后面的是无效的,但是    ByteWritable    的getBytes()    方法返回的确实    byte    数组的全部内容(长度很可能大于size),所以在    Mapper    中进行处理的时候应该只操纵size大小的内容后面的应该无视掉,

转换为String类型如下:String str = new String(value.getBytes(),0,value.getLength());   

或是                                value.setCapacity(value.getLength());

如果文件过大程序将不能调试,会卡死文件晓得话可以调试,具体原因不清楚

刚刚开始写简书,有很多地方还不是很懂,有不好的地方还请您指出!

在此感谢您能看完此篇文章!!!

上一篇下一篇

猜你喜欢

热点阅读