离线计算组件篇-MapReduce-自定义分区

2022-11-29  本文已影响0人  CoderInsight

8,MapReduce的分区 -- Partitioner

(1).需求

将流量汇总统计结果按照手机归属地不同省份输出到不同文件中。

(2).分析

(3).实现代码

1),FlowBean

主要任务就是序列化和反序列化变量,将其他的数据封装在一个实体类中然后实现数据在Map和Reduce端的传输。

package com.wangyq.class2.mypartition;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable {

    // 定义变量类型的时候,需要使用包装类实现
    private Integer upFlow;
    private Integer downFlow;
    private Integer upCountFlow;
    private Integer downCountFlow;

    // 序列化 write
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(upFlow);
        out.writeInt(downFlow);
        out.writeInt(upCountFlow);
        out.writeInt(downCountFlow);
    }

    // 反序列化,注意两者的顺序一定要保持一致  read
    @Override
    public void readFields(DataInput in) throws IOException {

        this.upFlow = in.readInt();
        this.downFlow = in.readInt();
        this.upCountFlow = in.readInt();
        this.downCountFlow = in.readInt();
    }

    public Integer getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Integer upFlow) {
        this.upFlow = upFlow;
    }

    public Integer getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Integer downFlow) {
        this.downFlow = downFlow;
    }

    public Integer getUpCountFlow() {
        return upCountFlow;
    }

    public void setUpCountFlow(Integer upCountFlow) {
        this.upCountFlow = upCountFlow;
    }

    public Integer getDownCountFlow() {
        return downCountFlow;
    }

    public void setDownCountFlow(Integer downCountFlow) {
        this.downCountFlow = downCountFlow;
    }

    @Override
    public String toString() {
        return "FlowBean{" +
                "upFlow=" + upFlow +
                ", downFlow=" + downFlow +
                ", upCountFlow=" + upCountFlow +
                ", downCountFlow=" + downCountFlow +
                '}';
    }
}

2),FlowMapper

主要任务是将数据从文件中读取出来,然后进行split 拆分,从而将我们要分区的字段作为key,然后其他字段封装到实体类中传递给Reduce阶段。

package com.wangyq.class2.mypartition;


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowMapper extends Mapper<LongWritable, Text, Text,FlowBean> {

    private FlowBean flowBean;
    private Text text;

    /**
     * Mapper过程的 初始化方法
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
       flowBean = new FlowBean();
       text = new Text();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String[] splits = value.toString().split("\t");

        String phoneNum = splits[1];
        String upFlow = splits[6];
        String downFlow = splits[7];
        String upCountFlow = splits[8];
        String downCountFlow = splits[9];

        text.set(phoneNum);

        flowBean.setUpFlow(Integer.parseInt(upFlow));
        flowBean.setDownFlow(Integer.parseInt(downFlow));
        flowBean.setUpCountFlow(Integer.parseInt(upCountFlow));
        flowBean.setDownCountFlow(Integer.parseInt(downCountFlow));

        context.write(text, flowBean);

    }
}

3),FlowReducer

主要任务是将Map过程传递过来的值,然后进行遍历,然后以我们想要保存的数据格式进行保存。

package com.wangyq.class2.mypartition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowReducer extends Reducer<Text, FlowBean, Text,Text> {


    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {

        int upFlow = 0;
        int downFlow = 0;
        int upCountFlow = 0;
        int downCountFlow = 0;

        for (FlowBean value :
                values) {

            upFlow += value.getUpFlow();
            downFlow += value.getDownFlow();
            upCountFlow += value.getUpCountFlow();
            downCountFlow += value.getDownCountFlow();
        }

        context.write(key, new Text(upFlow+"\t"+downFlow+"\t"+upCountFlow+"\t"+downCountFlow));

    }
}

4),PartitionOwn

自定义分区实现类,继承自Partitioner<K2,V2>,然后重写 getPartition 方法,在其中编写分区的实现逻辑,可以直接使用if-else的方式,也可以使用HashMap和Static静态代码块配合实现。

package com.wangyq.class2.mypartition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * 分区过程是在Map过程中,然后在map函数处理之后,所以泛型的类型应该是 Mapper的 K2, V2
 * 当前自定义分类函数调用是在Main函数做如下执行:
 * job.setPartitionerClass(PartitionOwn.class);  // 设置分区的类
 * job.setNumReduceTasks(Integer.parseInt(args[2])); // 设置执行Reduce Task 的任务数量
 *   // 分区个数 = reducetask个数   正常执行
 *   // 分区个数 < reducetask个数   正常执行   只不过会有空的结果文件产生
 *   // 分区个数 > reducetask个数   错误       Illegal partition
 */
public class PartitionOwn extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text text, FlowBean flowBean, int numPartitions) {

        String phoneNum = text.toString();
        // 判断当 phoneNum 字段不为空的时候执行以下分区操作
        if (null != phoneNum && !phoneNum.equals("")){
            // 根据不同的起始值,返回不同的分区值
            if (phoneNum.startsWith("135")){
                return 0;
            }else if (phoneNum.startsWith("136")){
                return 1;
            }else if (phoneNum.startsWith("137")){
                return 2;
            }else if (phoneNum.startsWith("138")){
                return 3;
            }else if (phoneNum.startsWith("139")){
                return 4;
            }else {
                return 5;
            }
        }else{
            return 5;
        }
    }

}

5),FlowMain

主函数是模版函数,但是注意,此时我们使用的分区是自定义的,并且要配合设置reduceTask的个数,与分区的个数保持一致。

package com.wangyq.class2.mypartition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class FlowMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {


        Job job = Job.getInstance(super.getConf(),"flowCount");
        job.setJarByClass(FlowMain.class);

        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path(args[0]));

        job.setMapperClass(FlowMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 设置当前job的分区器是我们自定义的分区器
        job.setPartitionerClass(PartitionOwn.class);
        // 设置ReduceTask的个数,采用参数传递的方式指定,需要保证与分区的个数保持一致
        job.setNumReduceTasks(Integer.parseInt(args[2]));


        job.setReducerClass(FlowReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();

        configuration.set("mapreduce.framework.name","local");
        configuration.set("yarn.resourcemanager.hostname","local");

        // 通过ToolRunner.run方法启动程序
        int run = ToolRunner.run(configuration, new FlowMain(), args);
        // 退出
        System.exit(run);
    }
}

上一篇下一篇

猜你喜欢

热点阅读