大数据BigData

MapReduce-API(3)好友推荐FOF

2018-11-23  本文已影响13人  geekAppke

是简单的好友列表的差集吗?
最应该推荐的好友TopN,如何排名?

思路:
推荐者与被推荐者一定有一个或多个相同的好友
全局去寻找好友列表中两两关系
去除直接好友
统计两两关系出现次数

API:
map:按好友列表输出两俩关系
reduce:sum两两关系
再设计一个MR
生成详细报表

好友数据

tom hello hadoop cat
world hadoop hello hive
cat tom hive
mr hive hello
hive cat hadoop world hello mr
hadoop tom hive world
hello tom world hive mr
推荐好友的好友

好友推荐FOF
只读1行,得直接好友关系 & 间接好友关系!(有一部分是直接好友关系,要去掉)

好友推荐

public class MyFOF {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration(true);
        
        Job job = Job.getInstance(conf);
        job.setJarByClass(MyFOF.class);
        
        Path input = new Path("/data/fof/input");
        FileInputFormat.addInputPath(job, input);
        
        Path output = new Path("/data/fof/output");
        if (output.getFileSystem(conf).exists(output)) {
            output.getFileSystem(conf).delete(output, true);
        }
        FileOutputFormat.setOutputPath(job, output );
        
        job.setMapperClass(FMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(FReducer.class);
        
        job.waitForCompletion(true);
    }
}

Map阶段

public class FMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    Text mkey= new Text();
    IntWritable mval = new IntWritable();
    
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        
        // value:
        // tom hello hadoop cat  ---- tom:hello  0   ----  hello:hadoop  1

        String[] strs = StringUtils.split(value.toString(), ' ');

        for (int i=1; i<strs.length; i++) {
            mkey.set(getFof(strs[0], strs[i]));  
            mval.set(0); 
            context.write(mkey, mval);  
            
            for (int j = i+1; j < strs.length; j++) {
                mkey.set(getFof(strs[i], strs[j]));  
                mval.set(1);  
                context.write(mkey, mval);
            }
        }
    }
    
    public static String getFof(String str1, String str2) {
        if(str1.compareTo(str2) > 0) {
            // hello, hadoop
            return str2+":"+str1;
            // hadoop : hello
        }

        // hadoop, hello
        return str1+":"+str2;
        // hadoop : hello
    }   
}

Reduce阶段

public class FReducer  extends  Reducer<Text, IntWritable, Text, Text> {
    
    Text rval = new Text();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        // hadoop:hello  1
        // hadoop:hello  0
        // hadoop:hello  1
        // hadoop:hello  1
        int sum = 0;
        int flg = 0;
        for (IntWritable v : values) {
            if(v.get() == 0) {
                // hadoop:hello  0 是直接好友关系
                flg = 1; 
            }
            sum += v.get();
        }
        if(flg == 0) { // 只输出间接好友关系
            rval.set(sum+"");
            context.write(key, rval);
        }
    }
}

输出结果,间接好友个数

cat:hadoop 2
cat:hello 2
cat:mr 1
cat:world 1
hadoop:hello 3
hadoop:mr 1
hive:tom 3
mr:tom 1
mr:world 2
tom:world 2
上一篇 下一篇

猜你喜欢

热点阅读