MapReduce分区组件
2020-08-03 本文已影响0人
羋学僧
MapReduce中分区组件
需求:
根据单词的长度给单词出现的次数的结果存储到不同文件中,以便于在快速查询
思路:
1、定义Mapper逻辑
2、定义Reducer逻辑
3、自定义分区Partitioner
4、主调度入口
MapReduce编程流程

首先看Mapper:WordMapper.java
/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KEYIN:是指框架读取到的数据的key的类型,在默认的InputFormat下,读取的key就是一行文本的起始的偏移量。所以key的类型Long
* VALUEIN:是指框架读取到的数据的value的类型,在默认的InputFormat下,读取的value就是一行文本。所以value的类型String
* KEYOUT:是指用户自定义的逻辑方法返回的数据中key的类型,由用户根据业务逻辑自己决定,在我们wordcount程序中,我们输出的key是单词,所以类型是String
* VALUEOUT:是指用户自定义的逻辑方法返回的数据中value的类型,由用户根据业务逻辑自己决定,在我们wordcount程序中,我们输出的value是次数,所以类型是Long
*
* 但是,String、Long 是jdk中自带的数据类型,在序列化的时候,效率低。
* hadoop为了提高效率,自定义了一套序列化的框架
* 在hadoop程序当中,如果要进行序列化(写磁盘、网络传输数据),一定要用hadoop实现的序列化的数据类型
*
* Long ====》 LongWritable
* String ====》 Text
* Integer ====》 IntWritable
* Null ====》 NullWritable
*
*/
public class WordMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
/**
*
* @param key 就是偏移量
* @param value 一行文本
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、单词的切分
String[] words = value.toString().split(" ");
//2、计数一次,帮单词转换成<hello,1>这样的key value类型对外输出
for (String word : words) {
//3、写入到上下文中
context.write(new Text(word),new LongWritable(1));
}
}
}
其次看Shuffle自定义分区Partitioner:MyPartitioner.java
/**
* Partitioner<KEY, VALUE>
* KEY: 单词
* VALUE 数
*/
public class MyPartitioner extends Partitioner<Text,LongWritable> {
@Override
public int getPartition(Text text, LongWritable longWritable, int i) {
if (text.toString().length() >= 5){
return 0;
}else {
return 1;
}
}
}
再看Reducer:WordReducer.java
/**
* Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KEYIN:map阶段的一个输出的key
* VALUEIN:LongWritable类型的数字
* KEYOUT:最终的结果单词,Text类型
* VALUEOUT:最终的单词的次数,LongWritable类型
*/
public class WordReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
/**
*
* @param key map阶段的一个输出的key
* @param values hello <1,1>
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//1、定义一个统计的变量
long count = 0;
//2、迭代
for (LongWritable value : values) {
count += value.get();
}
//3、写入到上下文
context.write(key,new LongWritable(count));
}
}
最后看Driver:JobMain.java
/**
* 主类:将map和reduce串接起来,并提供了运行入口
*/
public class JobMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
//一、初始化一个Job
Job job = Job.getInstance(configuration, "partitioner");
//二、设置Job的相关信息,配置8个小步骤
//1、设置输入路径
job.setInputFormatClass(TextInputFormat.class);
//TextInputFormat.addInputPath(job,new Path("D://input/test2.txt"));
TextInputFormat.addInputPath(job,new Path("hdfs://bigdata02:9000/wordcount.txt"));
//2、设置Mapper类,设置k2 v2
job.setMapperClass(WordMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//3 4 5 6 Shuffle阶段
//3、分区
job.setPartitionerClass(MyPartitioner.class);
//7、设置Reducer类,并设置k3 v3
job.setReducerClass(WordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置ReducerTask的个数
job.setNumReduceTasks(2);
//8、设置输出的路径
job.setOutputFormatClass(TextOutputFormat.class);
//TextOutputFormat.setOutputPath(job,new Path("D://word_out2"));
TextOutputFormat.setOutputPath(job,new Path("hdfs://bigdata02:9000/word_out2"));
//三、等待完成
boolean b = job.waitForCompletion(true);
System.out.println(b);
System.exit(b ? 0 : 1);
}
}


