离线计算组件篇-MapReduce-自定义InputFormat

2022-11-26  本文已影响0人  CoderInsight

5.自定义 inputFotmat

通过自定义 inputFormat 来实现“将小文件批量合并成 SequenceFIle 格式的单个文件(文件内容是:文件名 bytes流(二进制流))!”

(1),MyInputFormat

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

/**
 * 自定义 InputFormat
 * 这里的 key value 类型的数据是 NullWritable, ByteWritable(byte[] 数组)
 */
public class MyInputFormat extends FileInputFormat<NullWritable, ByteWritable> {


    @Override
    public RecordReader<NullWritable, ByteWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // 调用自定义读取文件的类
        MyRecordReader myRecordReader = new MyRecordReader();
        // 初始化 RecordReader
        myRecordReader.initialize(split, context);
        // 将自定义RecorderReader返回
        return myRecordReader;
    }


    /**
     * 注意这个方法,是决定我们的文件是否切分的,如果不切分直接返回false
     * 那么在读取这个文件的时候,就会一次性的将文件中的内容全部读取出来
     * @param context
     * @param filename
     * @return
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }
}

(2).MyRecordReader

核心实现方法,主要实现思路就是根据依次读取每个文件,而且一次将文件中的内容全部读完;读取的方式是以二进制流的方式进行读取。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyRecordReader extends RecordReader {

    private FileSplit fileSplit;  // 文件切分的类
    private Configuration configuration;
    private BytesWritable bytesWritable;

    // 定义读取文件是否结束的标志位
    private boolean flag = false;


    /**
     * 初始化方法: 只在程序初始化的时候调用一次,只要拿到了文件的切片,就拿到了文件的内容
     * @param split
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        // 初始化文件切片对象
        this.fileSplit = (FileSplit) split;
        // 初始化配置文件对象
        this.configuration = context.getConfiguration();
        // 初始化流对象
        bytesWritable = new BytesWritable();
    }

    /**
     * 读取数据
     * 返回值是 boolean 类型,如果返回 true,表示文件已经读完了,不能再继续往下读了
     * 如果返回是 false ,表示文件没有读取完成,继续读取
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (!flag){
            long length = fileSplit.getLength();
            byte[] bytes = new byte[(int) length];

            // 先去获取指定路径下的文件输入流
            Path path = fileSplit.getPath();
            FileSystem fileSystem = path.getFileSystem(configuration);
            FSDataInputStream open = fileSystem.open(path);

            // 将流当中的数据拷贝到字节数组当中(inputStream  -->  bytes[])
            IOUtils.readFully(open, bytes, 0 ,(int)length);
            // bytes[]  --->  BytesWritable
            bytesWritable.set(bytes, 0, (int)length);

            flag = true;
            return true;
        }
        return false;
    }

    /**
     * 获取当前的Key1 : hadoop序列化的空
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public Object getCurrentKey() throws IOException, InterruptedException {

        return NullWritable.get();
    }

    /**
     * 获取当前的value1 : hadoop序列化的字节数组
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public Object getCurrentValue() throws IOException, InterruptedException {
        return bytesWritable;
    }

    /**
     * 读取文件的进度,我们反正要么不读,要么全部读完,所以此时没用
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return flag ? 1.0f : 0.0f;
    }

    /**
     * 关闭资源的连接啥的,此时我们也不用关闭,所以直接默认实现一个空方法即可
     * @throws IOException
     */
    @Override
    public void close() throws IOException {
    }
}

(3).MyInputFormatMapper

主要逻辑是 直接采用读取文件切分的类,依次将所有的文件读取到map过程,然后在map过程中实现自定义逻辑,也就是实现需求中,文件和文件内容合并存储为SequenceFile格式的文件

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;


import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class MyInputFormatMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {

    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        // 创建文件切分的类
        FileSplit inputSplit = (FileSplit) context.getInputSplit();

        String name = inputSplit.getPath().getName();
        context.write(new Text(name), value);
    }
}

(4).MyInputFormatMain

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyInputFormatMain  extends Configured implements Tool {


    @Override
    public int run(String[] args) throws Exception {

        Job job = Job.getInstance(super.getConf(),"mergeSmallFile");
        // 此时我们调用的输入格式是自定义的读取小文件的格式化类
        job.setInputFormatClass(MyInputFormat.class);
        MyInputFormat.addInputPath(job, new Path("file:///D:\\BigData\\testFileDir\\in"));

        job.setMapperClass(MyInputFormatMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);

        // 此时没有Reduce过程,所以没有设置Reduce函数
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setOutputPath(job, new Path("file:///D:\\BigData\\testFileDir\\out"));

        boolean b = job.waitForCompletion(true);

        return b ? 0 : 1;
    }


    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new MyInputFormatMain(), args);

        System.exit(run);
    }
}

上一篇 下一篇

猜你喜欢

热点阅读