大数据BigData

hadoop入门-MapReduce实例(三)

2019-07-31  本文已影响0人  文贞武毅

这次尝试区分器的使用
很多证件的号码会根据前缀的几个字符来确定一些信息,比如省份等,手机号也有这样的特征,通过前缀来区分是移动还是电信联通等,将号码根据不通的前缀汇总到不通的文件输出,这就是区分器的用途,还和上次一样,咱们先来创建一个pom.xml,然后创建FlowBean对象用来保存上传下载流量。接下来做的就不同了,我们要自己定义一个区分器:

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;

/**
 * 区分器,
 */
public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    public static HashMap<String, Integer> proviceDict = new HashMap<String, Integer>();

    static{
        proviceDict.put("137",0);
        proviceDict.put("133",1);
        proviceDict.put("138",2);
        proviceDict.put("135",3);
    }

    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions){
        String prefix = key.toString().substring(0,3);
        Integer province = proviceDict.get(prefix);
        return province==null?4:province;
    }
}

区分器一般都是通过一个hashMap完成的,这里我们分成5个区,为啥不是4个?因为有其他前缀的号码会被归为最后一类,然后就可以来写mapreduce主程序了:


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowCount {
    static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("\t");
            String phone = fields[0];
            long upFlow = Long.parseLong(fields[1]);
            long dFlow = Long.parseLong(fields[2]);

            context.write(new Text(phone), new FlowBean(upFlow,dFlow));
        }
    }

    static class FlowCountReducer extends Reducer<Text, FlowBean, Text,Text>{
        @Override
        protected void reduce(Text key, Iterable<FlowBean> value, Context context ) throws IOException, InterruptedException {
            long sum_upFlow = 0;
            long sum_dFlow = 0;
            for (FlowBean bean:value){
                sum_upFlow+=bean.getUpFlow();
                sum_dFlow+=bean.getdFlow();
            }

            context.write(key, new Text((new FlowBean(sum_upFlow,sum_dFlow)).toString()));
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(FlowBean.class);
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);
        //指定分区器
        job.setPartitionerClass(ProvincePartitioner.class);
        //指定相应分区数量的reduceTask
        job.setNumReduceTasks(5);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);
    }
}

需要注意,主程序里面指定了区分器和数量
然后打包上传,让我们来看看效果怎么样吧

ubuntu@hadoop1:~/text$ hdfs dfs -ls /output/partitioner1
Found 6 items
-rw-r--r--   3 ubuntu supergroup          0 2019-07-31 20:06 /output/partitioner1/_SUCCESS
-rw-r--r--   3 ubuntu supergroup         28 2019-07-31 20:06 /output/partitioner1/part-r-00000
-rw-r--r--   3 ubuntu supergroup         84 2019-07-31 20:06 /output/partitioner1/part-r-00001
-rw-r--r--   3 ubuntu supergroup         28 2019-07-31 20:06 /output/partitioner1/part-r-00002
-rw-r--r--   3 ubuntu supergroup         28 2019-07-31 20:06 /output/partitioner1/part-r-00003
-rw-r--r--   3 ubuntu supergroup         80 2019-07-31 20:06 /output/partitioner1/part-r-00004

输出文件从之前的1个变成了5个,可见是成功了的,然后查看其中的一个,果然只保存他的分区的内容

ubuntu@hadoop1:~/text$ hdfs dfs -cat /output/partitioner1/part-r-00004
14838244322 (900    500 1400)
18273723427 (300    800 1100)
19283413241 (500    200 700)

上一篇下一篇

猜你喜欢

热点阅读