离线计算组件篇-MapReduce-自定义分区
2022-11-29 本文已影响0人
CoderInsight
8,MapReduce的分区 -- Partitioner
(1).需求
将流量汇总统计结果按照手机归属地不同省份输出到不同文件中。
(2).分析
- Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。
- 默认的分发规则为:根据key的
(hashcode & Integer.MAX_VALUE) % reducetask
来分发给不同的 ReduceTask ,那么此时可以将所相同的key落到同一个分区中。
-
所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner,自定义一个CustomPartitioner继承抽象类:Partitioner,然后在job对象中,设置自定义partitioner,如下操作:
job.setPartitionerClass(CustomPartitioner.class)
job.setNumReduceTasks(6);
-
在设置reducetask 的时候注意设置方式:
每个ReduceTask对应一个结果文件,所以当ReduceTask数量大于分区个数的时候会有空的结果文件产生
- 分区个数 = reducetask个数 正常执行
- 分区个数 < reducetask个数 正常执行 但是此时会有空的结果文件产生
- 分区个数 > reducetask个数 错误 Illegal partition Exception
(3).实现代码
1),FlowBean
主要任务就是序列化和反序列化变量,将其他的数据封装在一个实体类中然后实现数据在Map和Reduce端的传输。
package com.wangyq.class2.mypartition;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class FlowBean implements Writable {
// 定义变量类型的时候,需要使用包装类实现
private Integer upFlow;
private Integer downFlow;
private Integer upCountFlow;
private Integer downCountFlow;
// 序列化 write
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(upFlow);
out.writeInt(downFlow);
out.writeInt(upCountFlow);
out.writeInt(downCountFlow);
}
// 反序列化,注意两者的顺序一定要保持一致 read
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readInt();
this.downFlow = in.readInt();
this.upCountFlow = in.readInt();
this.downCountFlow = in.readInt();
}
public Integer getUpFlow() {
return upFlow;
}
public void setUpFlow(Integer upFlow) {
this.upFlow = upFlow;
}
public Integer getDownFlow() {
return downFlow;
}
public void setDownFlow(Integer downFlow) {
this.downFlow = downFlow;
}
public Integer getUpCountFlow() {
return upCountFlow;
}
public void setUpCountFlow(Integer upCountFlow) {
this.upCountFlow = upCountFlow;
}
public Integer getDownCountFlow() {
return downCountFlow;
}
public void setDownCountFlow(Integer downCountFlow) {
this.downCountFlow = downCountFlow;
}
@Override
public String toString() {
return "FlowBean{" +
"upFlow=" + upFlow +
", downFlow=" + downFlow +
", upCountFlow=" + upCountFlow +
", downCountFlow=" + downCountFlow +
'}';
}
}
2),FlowMapper
主要任务是将数据从文件中读取出来,然后进行split 拆分,从而将我们要分区的字段作为key,然后其他字段封装到实体类中传递给Reduce阶段。
package com.wangyq.class2.mypartition;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class FlowMapper extends Mapper<LongWritable, Text, Text,FlowBean> {
private FlowBean flowBean;
private Text text;
/**
* Mapper过程的 初始化方法
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
flowBean = new FlowBean();
text = new Text();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split("\t");
String phoneNum = splits[1];
String upFlow = splits[6];
String downFlow = splits[7];
String upCountFlow = splits[8];
String downCountFlow = splits[9];
text.set(phoneNum);
flowBean.setUpFlow(Integer.parseInt(upFlow));
flowBean.setDownFlow(Integer.parseInt(downFlow));
flowBean.setUpCountFlow(Integer.parseInt(upCountFlow));
flowBean.setDownCountFlow(Integer.parseInt(downCountFlow));
context.write(text, flowBean);
}
}
3),FlowReducer
主要任务是将Map过程传递过来的值,然后进行遍历,然后以我们想要保存的数据格式进行保存。
package com.wangyq.class2.mypartition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class FlowReducer extends Reducer<Text, FlowBean, Text,Text> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
int upFlow = 0;
int downFlow = 0;
int upCountFlow = 0;
int downCountFlow = 0;
for (FlowBean value :
values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
upCountFlow += value.getUpCountFlow();
downCountFlow += value.getDownCountFlow();
}
context.write(key, new Text(upFlow+"\t"+downFlow+"\t"+upCountFlow+"\t"+downCountFlow));
}
}
4),PartitionOwn
自定义分区实现类,继承自Partitioner<K2,V2>,然后重写 getPartition 方法,在其中编写分区的实现逻辑,可以直接使用if-else的方式,也可以使用HashMap和Static静态代码块配合实现。
package com.wangyq.class2.mypartition;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* 分区过程是在Map过程中,然后在map函数处理之后,所以泛型的类型应该是 Mapper的 K2, V2
* 当前自定义分类函数调用是在Main函数做如下执行:
* job.setPartitionerClass(PartitionOwn.class); // 设置分区的类
* job.setNumReduceTasks(Integer.parseInt(args[2])); // 设置执行Reduce Task 的任务数量
* // 分区个数 = reducetask个数 正常执行
* // 分区个数 < reducetask个数 正常执行 只不过会有空的结果文件产生
* // 分区个数 > reducetask个数 错误 Illegal partition
*/
public class PartitionOwn extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
String phoneNum = text.toString();
// 判断当 phoneNum 字段不为空的时候执行以下分区操作
if (null != phoneNum && !phoneNum.equals("")){
// 根据不同的起始值,返回不同的分区值
if (phoneNum.startsWith("135")){
return 0;
}else if (phoneNum.startsWith("136")){
return 1;
}else if (phoneNum.startsWith("137")){
return 2;
}else if (phoneNum.startsWith("138")){
return 3;
}else if (phoneNum.startsWith("139")){
return 4;
}else {
return 5;
}
}else{
return 5;
}
}
}
5),FlowMain
主函数是模版函数,但是注意,此时我们使用的分区是自定义的,并且要配合设置reduceTask的个数,与分区的个数保持一致。
package com.wangyq.class2.mypartition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FlowMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(),"flowCount");
job.setJarByClass(FlowMain.class);
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
job.setMapperClass(FlowMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 设置当前job的分区器是我们自定义的分区器
job.setPartitionerClass(PartitionOwn.class);
// 设置ReduceTask的个数,采用参数传递的方式指定,需要保证与分区的个数保持一致
job.setNumReduceTasks(Integer.parseInt(args[2]));
job.setReducerClass(FlowReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.set("mapreduce.framework.name","local");
configuration.set("yarn.resourcemanager.hostname","local");
// 通过ToolRunner.run方法启动程序
int run = ToolRunner.run(configuration, new FlowMain(), args);
// 退出
System.exit(run);
}
}