MapReduce实现WordCount
2019-01-12 本文已影响0人
alpha18
实现Mapper函数
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/* 参数 <KEYIN, VALUEIN, KEYOUT, VALUEOUT>*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] words = line.split(" ");
for (String word: words) {
context.write(new Text(word), new IntWritable(1));
}
}
}
实现Reducer函数
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
/* 参数 <KEYIN, VALUEIN, KEYOUT, VALUEOUT> */
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable value: values) {
count += value.get();
}
context.write(key, new IntWritable(count));
}
}
Runner函数
public class WordCountRunner {
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
String[] otherArgs = new String[] {"hdfs://localhost:9000/wordcount/input", "hdfs://localhost:9000/wordcount/output"};
Job wcjob = Job.getInstance(configuration);
wcjob.setJarByClass(WordCountRunner.class);
wcjob.setMapperClass(WordCountMapper.class);
wcjob.setReducerClass(WordCountReducer.class);
wcjob.setMapOutputKeyClass(Text.class);
wcjob.setMapOutputValueClass(IntWritable.class);
wcjob.setOutputKeyClass(Text.class);
wcjob.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(wcjob, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(wcjob, new Path(otherArgs[1]));
boolean res = wcjob.waitForCompletion(true);
System.exit(res?0:1);
}
}
详细代码见 https://github.com/freedommay/notebook/tree/master/wordcount