(一)单表关联--mapreduce关联性操作

2017-08-16  本文已影响0人  elrah
package mr;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   

public class MyGL {
    private static class MyGLMapper  extends  Mapper<LongWritable, Text, Text, Text>{  
        /*输入类型是LongWritable,Text(上下文);
                               输出类型是Text,Text(也就是Reduce的输入类型)*/
         public void map(LongWritable k1, Text v1, Context context)   
                             throws java.io.IOException, java.lang.InterruptedException
                                 //map()函数是固定模式的,三个参数
         {
             // 1 2
            String[]  lines= v1.toString().split("\t");  
                                              // \t 在同一个缓冲区内横向跳8个空格(Tab键);split()方法用于把一个字符串分割
                                              //成字符串数组;v1指的是一行,把第一行的两个单词存进lines
            if(lines.length!=2 || lines[0].trim().equals("child"))      
                return;    //child parent
        
            String word1=lines[0].trim();   //tom       ->去掉 lines[0]里面的空格符
            String word2=lines[1].trim();   //lucy  
            
            context.write(new Text(word1), new Text("1"+","+word1+","+word2));
                                          //第一个Text对应Mapper的第三个Text;第二个Text对应Mapper的第四个Text
            context.write(new Text(word2), new Text("2"+","+word1+","+word2));
            
            //tom,1+tom+lucy
             System.out.println("map......"+word1+"-"+word2);
         }
        
    }
    
    private static class  MyGLReduce extends Reducer<Text, Text, Text, Text>{
         public void reduce(Text key, Iterable<Text> values, Context context) 
                                       throws java.io.IOException, java.lang.InterruptedException
                                  //context:上下文对象,在整个wordcount运算生命周期内存活
         {
              List<String> grandch=new ArrayList();   //泛型
              List<String> grandpa=new ArrayList();
             
                                        /*  lucy 2+tom+lucy
             lucy 1+lucy+mary
             
             2->split[1]    tom     2的话取1
             1->split[2]    mary    1的话取2
             
             k3=tom  v3=mary   把这两个放在上下文
             */

             Iterator<Text>  it=values.iterator();  
                                                // Iterator<Text>--输进来的第二个值
             while(it.hasNext()){ 
                String lines= it.next().toString();      
                                                                         //2,tom,lucy(对应MyGLMapper的context.write())
                String [] words=lines.split(",");    
                                                                          //劈开 string 数组 ["2","tom","lucy"]
                if(words[0].equals("1")){
                    grandpa.add(words[2]);
                }else if(words[0].equals("2")){
                    grandch.add(words[1]);
                }
                else
                    return;
                
             }
              for(String ch:grandch)
             for(String pa:grandpa)  
             context.write(new Text(ch), new Text(pa));    
             
             System.out.println("reduce......");
         }
         protected void cleanup(Context context) 
                                      throws java.io.IOException, java.lang.InterruptedException{
             
         }
            
    }

    private static String INPUT_PATH="hdfs://master:9000/input/gl.dat";
    private static String OUTPUT_PATH="hdfs://master:9000/output/c/";

    public static void main(String[] args) throws Exception {   
        
        Configuration  conf=new Configuration();
        FileSystem  fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
     
        if(fs.exists(new Path(OUTPUT_PATH)))
                fs.delete(new Path(OUTPUT_PATH));
        
        Job  job=new Job(conf,"myjob");
        
        job.setJarByClass(MyGL.class);
        job.setMapperClass(MyGLMapper.class);
        job.setReducerClass(MyGLReduce.class);
        
         
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //job.setCombinerClass(MyReduce.class);
         
        
        FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        
        job.waitForCompletion(true);

    }

}
image.png

当line[0]=1,line[1]=child;
当line[0]=2,line[2]=grandpa;
测试数据:

child   parent 
Tom Lucy
Tom Jack
Jone    Lucy
Jone    Jack
Lucy    Mary
Lucy    Ben
Jack    Alice
Jack    Jesse
Terry   Alice
Terry   Jesse
Philip  Terry
Philip  Alma
Mark    Terry
Mark    Alma
上一篇 下一篇

猜你喜欢

热点阅读