MapReduce单表连接

2017-08-16  本文已影响0人  月巴巴

例如给出表child-parent表,要求输出grandchildren-grandparent表
输入
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse

输出
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse
Tom Mary
Tom Ben
Jone Mary
Jone Ben

用输入的单表构建两个表,即child-parent表和parent-child表,将两个表自然连接,就可以得到结果。程序的关键在于在map中构建出左右两表。

代码

package com.hy.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;

public class SingleJoin {

    public static class TokenizerMapper
            extends Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, Context context
        ) throws IOException, InterruptedException {
            String[] lines=value.toString().split(" ");
            String parentName=lines[1];
            String childName=lines[0];
            context.write(new Text(parentName),new Text(""+1+" "+childName)); //右表
            context.write(new Text(childName),new Text(""+2+" "+parentName)); //左表
        }
    }

    public static class IntSumReducer
            extends Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterable<Text> values,
                           Context context
        ) throws IOException, InterruptedException {
            List<String> grandchild=new ArrayList<String>();
            List<String> grandparent=new ArrayList<String>();
            for (Text val : values) {
                String[] tmp=val.toString().split(" ");
                if(tmp[0].equals("1"))
                grandchild.add(tmp[1]);
                else
                    grandparent.add(tmp[1]);
            }
            if(grandchild.size()!=0&&grandparent.size()!=0){
                for (String gc:grandchild){
                    for(String gp:grandparent){
                        context.write(new Text(gc),new Text(gp));
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {
        BasicConfigurator.configure();
        Configuration conf = new Configuration();
        conf.set("mapreduce.cluster.local.dir", "/Users/hy/hadoop/var");
        Job job = Job.getInstance(conf, "Single join");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileSystem fs = FileSystem.get(conf);
        //如果输出文件夹存在,则删除
        if (fs.exists(new Path(args[1]))) {
            fs.delete(new Path(args[1]),true);
        }
        if(!job.waitForCompletion(true))
            System.exit(1);

        //输出结果
        BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(new Path(args[1]+"/part-r-00000"))));
        try {
            String line;
            line=br.readLine();
            while (line != null){
                System.out.println(line);
                line = br.readLine();
            }
        } finally {
            br.close();
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读