数据去重

2019-01-02  本文已影响0人  小月半会飞

要求实现功能:

将File1和File2合并成一个文件,将重复的部分去除
File1:

2017-3-1 a
2017-3-2 b
2017-3-3 c
2017-3-4 d
2017-3-5 a
2017-3-6 b
2017-3-7 c
2017-3-3 c

File2:

2017-3-1 b
2017-3-2 a
2017-3-3 b
2017-3-4 d
2017-3-5 a
2017-3-6 c
2017-3-7 d
2017-3-3 c

结果:

2017-3-1 a  
2017-3-1 b  
2017-3-2 a  
2017-3-2 b  
2017-3-3 b  
2017-3-3 c  
2017-3-4 d  
2017-3-5 a  
2017-3-6 b  
2017-3-6 c  
2017-3-7 c  
2017-3-7 d  

DistinctMapper部分代码:

package com.neusoft;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class DistinctMapper extends Mapper<LongWritable,Text,Text,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(new Text(value),new Text(""));
    }
}

DistinctReducer:

传过来的数据,key是不重复的,唯一的

package com.neusoft;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class DistinctReducer extends Reducer<Text,Text,Text,Text>{

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        context.write(key,new Text(""));
    }
}

DistinctDriver :

package com.neusoft;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DistinctDriver {

    public static void main(String[] args) throws Exception {
        System.setProperty("HADOOP_USER_NAME", "root") ;
        System.setProperty("hadoop.home.dir", "e:/hadoop-2.8.3");
        if (args == null || args.length == 0) {
            return;
        }
        FileUtil.deleteDir(args[1]);
        //该对象会默认读取环境中的 hadoop 配置。当然,也可以通过 set 重新进行配置
        Configuration conf = new Configuration();

        //job 是 yarn 中任务的抽象。
        Job job = Job.getInstance(conf);

        /*job.setJar("/home/hadoop/wc.jar");*/
        //指定本程序的jar包所在的本地路径
        job.setJarByClass(DistinctDriver.class);

        //指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(DistinctMapper.class);
        job.setReducerClass(DistinctReducer.class);

        //指定mapper输出数据的kv类型。需要和 Mapper 中泛型的类型保持一致
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //指定最终输出的数据的kv类型。这里也是 Reduce 的 key,value类型。
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        //指定Combiner,Reducer可以当做Combiner使用
//        job.setCombinerClass(DistinctReducer.class);

        //指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的输出结果所在目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
        // true表示一直等待map和reduce任务执行完成
        /*job.submit();*/
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

上一篇 下一篇

猜你喜欢

热点阅读