大数据BigData

MapReduce-API(2)找出每月气温最高的2天

2018-11-21  本文已影响5人  geekAppke
思路:
    每年、每个月
    最高
    2天
    1天多条记录?
进一部思考
    年月分组
    温度升序
    key中要包含时间和温度呀!
MR原语:相同的key分到一组
    通过GroupCompartor设置分组规则

思考

1,MR
    * 保证原语
    怎样划分数据,怎样定义一组
2,k:v映射的设计
    考虑reduce的计算复杂度
3,能不能多个reduce
    倾斜:抽样
    集群资源情况
4,自定义数据类型

自定义类型
分区
排序比较器

数据案例
1949-10-01 14:21:02  34c
1949-10-01 19:21:02  38c
1949-10-02 14:01:02  36c
1950-01-01 11:21:02  32c
1950-10-01 12:21:02  37c
1951-12-01 12:21:02  23c
1950-10-02 12:21:02  41c
1950-10-03 12:21:02  27c
1951-07-01 12:21:02  45c
1951-07-02 12:21:02  46c
1951-07-03 12:21:03  47c

top-K:找出每月气温最高的2天

public class MyTQ {
    public static void main(String[] args) throws Exception {
        
        //1,conf
        Configuration conf = new Configuration(true);
        
        //2,job
        Job job = Job.getInstance(conf);
        job.setJarByClass(MyTQ.class);
        
        //3,输入源输出元
        Path input = new Path("/data/tq/input");
        FileInputFormat.addInputPath(job, input);
        
        Path output = new Path("/data/tq/output");
        if(output.getFileSystem(conf).exists(output)){
            output.getFileSystem(conf).delete(output, true);
        }
        FileOutputFormat.setOutputPath(job, output );
        
        //4,map
        job.setMapperClass(TqMapper.class);
        job.setMapOutputKeyClass(TQ.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setPartitionerClass(TqPartitioner.class);
        job.setSortComparatorClass(TqSortComparator.class);
        job.setCombinerClass(TqReducer.class);

        //5,reduce
        // 分组比较器
        job.setGroupingComparatorClass(TqGroupingComparator.class);
        job.setReducerClass(TqReducer.class);
        job.setNumReduceTasks(2);
        job.setCombinerKeyGroupingComparatorClass(TqGroupingComparator.class);
        
        //7,submit
        job.waitForCompletion(true);        
    }
}

自定义类型

// 可比较、序列化的 数据结构
// 实现接口
public class TQ implements  WritableComparable<TQ>{

    private int year;
    private int month;
    private int day;
    private int wd;
    
    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public int getDay() {
        return day;
    }

    public void setDay(int day) {
        this.day = day;
    }

    public int getWd() {
        return wd;
    }

    public void setWd(int wd) {
        this.wd = wd;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeInt(month);
        out.writeInt(day);
        out.writeInt(wd);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.year=in.readInt();
        this.month=in.readInt();
        this.day=in.readInt();
        this.wd=in.readInt();
    }

    @Override
    public int compareTo(TQ that) {
        // 约定俗成:日期正序
        int c1=Integer.compare(this.getYear(), that.getYear());
        if(c1==0) {
            int c2 = Integer.compare(this.getMonth(), that.getMonth());
            if(c2==0) {
                // 比完日期,就没事了
                return Integer.compare(this.getDay(), that.getDay());
            }
            return c2;
        }
        return c1;
    }
}

Map阶段

public class TqMapper extends Mapper<LongWritable, Text, TQ, IntWritable> {
    // 放外面,不用每次都创建!
    TQ mkey = new TQ();
    IntWritable mval = new IntWritable();
    
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws Exception {
           
        try {
           // value:  1949-10-01 14:21:02   34c  >>  TQ
           String[] strs = StringUtils.split(value.toString(), '\t');

           SimpleDateFormat  sdf = new SimpleDateFormat("yyyy-MM-dd");
           Date date = sdf.parse(strs[0]);
            
           Calendar  cal = Calendar.getInstance();
           cal.setTime(date);
            
           mkey.setYear(cal.get(Calendar.YEAR));
           mkey.setMonth(cal.get(Calendar.MONTH)+1);
           mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
            
           int wd = Integer.parseInt(strs[1].substring(0, strs[1].length()-1));
           mkey.setWd(wd);
           mval.set(wd);
           context.write(mkey, mval);
        } catch (ParseException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

分组

对key做分组,规划
map输出的每一条记录<key, value>都要调用Partitioner 方法一次
不应该很复杂,不要造成数据倾斜!数据抽样!

public class TqPartitioner  extends  Partitioner<TQ, IntWritable> {
    @Override
    public int getPartition(TQ key, IntWritable value, int numPartitions) { 
        // return key.hashCode() % numPartitions;   
        return key.getYear() % numPartitions;
    }
}

排序

达到阈值时,开始溢写

// 继承类
public class TqSortComparator  extends  WritableComparator {
    public TqSortComparator() {
        super(TQ.class, true);
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        TQ t1 = (TQ)a;
        TQ t2 = (TQ)b;

        int c1=Integer.compare(t1.getYear(), t2.getYear());
        if(c1==0){
            int c2=Integer.compare(t1.getMonth(), t2.getMonth());
            if(c2==0){
                // 从大到小,倒序
                return -Integer.compare(t1.getWd(), t2.getWd());
            }
            return c2;
        }   
        return c1;  
    }
}

分组比较器

月份相同就是1组,并过滤同一天的数据

public class TqGroupingComparator  extends WritableComparator {
    public TqGroupingComparator() {
        super(TQ.class,true);
    }

    public int compare(WritableComparable a, WritableComparable b) {
            TQ t1 = (TQ)a;
            TQ t2 = (TQ)b;
    
            int c1=Integer.compare(t1.getYear(), t2.getYear());
            if(c1==0){
                return Integer.compare(t1.getMonth(), t2.getMonth());
            } 
            return c1;
    }
}

Reduce阶段

只找前2条记录!

public class TqReducer extends Reducer<TQ, IntWritable, Text, IntWritable> {
    Text rkey = new Text();
    IntWritable rval = new IntWritable();
    
    @Override
    protected void reduce(TQ key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {  
        int flg = 0;
        int day = 0;
        
        // 1970 01 20  34     34
        // 1970 01 12  28     28
        for (IntWritable v : values) {  // 根本就不用v,key跟着变动的
            if (flg == 0) {
                // 1970-01-20:34
                rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                rval.set(key.getWd());
                context.write(rkey,rval );

                day = key.getDay(); 
                flg++;
            }
            // 将同一天,多条记录排除
            if(flg!=0 && day != key.getDay()){
                rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay());
                rval.set(key.getWd());
                context.write(rkey,rval);

                break;
            }       
        }   
    }
}

输出结果

1949-10-1   38
1949-10-2   36
1950-1-1    32
1950-10-2   41
1950-10-1   37
1951-7-3    47
1951-7-2    46
1951-12-1   23

总结

自定义数据类型Weather
    包含时间
    包含温度
    自定义排序比较规则
自定义分组比较
    年月相同被视为相同的key
那么reduce迭代时,相同年月的记录有可能是同一天的
    reduce中需要判断是否同一天
    注意OOM
数据量很大
    全量数据可以切分成最少按一个月份的数据量进行判断
    这种业务场景可以设置多个reduce
    通过实现partition
image.png
上一篇下一篇

猜你喜欢

热点阅读