离线计算组件篇-MapReduce-自定义outputForma

2022-12-02  本文已影响0人  CoderInsight

10.自定义 outputFormat

(1). 需求

(2). 分析

(3). 实现

(4),代码实现

第一步:编写自定义输出类: MyOutputFormat

package top.wangyq.myoutputFormat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 1, 继承接口 FileOutputFormat<K,V>
 * 2, 重写方法: getRecordWriter()
 * 3, 自定义 RecordWriter 类, 可以使用内部类的形式定义,并且继承自 RecordWriter
 * 4, 根据我们要自定义输出的文件个数初始化基本 文件输出流 (FSDataOutputStream) 对象
 * 5, 重写 write() 方法 和 close() 方法;分别用来指定写入文件的内容 和 关闭 文件输出流对象
 */
public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {


    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {

        FileSystem fs = FileSystem.get(context.getConfiguration());


        Path goodCommont = new Path("D:\\BigData\\开课吧\\9-第九章-Hadoop\\1-MapReduce\\mr-录播\\5、自定义outputFormat\\mygood\\1.txt");
        Path bacCommonnt = new Path("D:\\BigData\\开课吧\\9-第九章-Hadoop\\1-MapReduce\\mr-录播\\5、自定义outputFormat\\mybad\\2.txt");

        FSDataOutputStream goodOutputStream = fs.create(goodCommont);
        FSDataOutputStream badOutputstream = fs.create(bacCommonnt);

        return new MyRecorderWriter(goodOutputStream, badOutputstream);
    }



    static class MyRecorderWriter extends RecordWriter<Text, NullWritable>{

        FSDataOutputStream goodStream = null;
        FSDataOutputStream badStream = null;

        public MyRecorderWriter(FSDataOutputStream goodStream, FSDataOutputStream badStream) {
            this.goodStream = goodStream;
            this.badStream = badStream;
        }

        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            if (key.toString().split("\t")[9].equals("0")){
                // 好评
                goodStream.write(key.toString().getBytes());
                goodStream.write("\r\n".getBytes());
            }else {
                // 中评或者差评
                badStream.write(key.toString().getBytes());
                badStream.write("\r\n".getBytes());
            }
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if (badStream != null){
                badStream.close();
            }
            if (goodStream != null){
                goodStream.close();
            }
        }
    }
}

第二步:编写Main函数

package top.wangyq.myoutputFormat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class MyOwnOutputFormatMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf, MyOwnOutputFormatMain.class.getSimpleName());

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path(args[0]));

        job.setMapperClass(MyOwnMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(MyOutputFormat.class);
        MyOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setNumReduceTasks(2);
        boolean b = job.waitForCompletion(true);
        return b ? 0 :1;
    }


    public static class MyOwnMapper extends Mapper<LongWritable, Text, Text, NullWritable>{

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            context.write(value, NullWritable.get());
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        int run = ToolRunner.run(configuration, new MyOwnOutputFormatMain(), args);
        System.exit(run);

    }
}
上一篇 下一篇

猜你喜欢

热点阅读