Hadoop实现自定义对象序列化

2018-08-01  本文已影响0人  总有人被感动

1.需求
记录用户的流量使用情况
电话号码 上行 下行
13726230501 200 1100
13396230502 300 1200
13897230503 400 1300
13897230503 100 300
13597230534 500 1400
13597230534 300 1200
整合号码相同的情况,输出格式为
<key,value>=<phoneNum,<upFlowSum,downFlowSum,Sum>>
所以这里需要自定义一个数据类型,保存<upFlowSum,downFlowSum,Sum>
2.代码实现
构造新的数据类型,并支持序列化,BeanFlow.java
```
public class FlowBean implements Writable {

private long upFlow;
private long downFlow;
private long sumFlow;

FlowBean(){
    
}

FlowBean(long upFlow,long downFlow){
    this.upFlow = upFlow;
    this.downFlow = downFlow;
    this.sumFlow = upFlow+downFlow;
}

public long getUpFlow() {
    return upFlow;
}

public void setUpFlow(long upFlow) {
    this.upFlow = upFlow;
}

public long getDownFlow() {
    return downFlow;
}

public void setDownFlow(long downFlow) {
    this.downFlow = downFlow;
}

public void write(DataOutput out) throws IOException {
    out.writeLong(upFlow);
    out.writeLong(downFlow);
    out.writeLong(sumFlow);
    
}

public void readFields(DataInput in) throws IOException {
    upFlow = in.readLong();
    downFlow = in.readLong();
    sumFlow = in.readLong();
    
}

public String toString() {
    return upFlow + "\t" + downFlow + "\t" + sumFlow;
}

}
```
如果要支持序列化的话,需要实现Writable类,并且重写write()和readFileds()这两个方法。注意读和些的数据类型要一致,且要注意顺序。

实现MapReduce类

public class FlowCount {
    static class FlowCountMapper extends Mapper<LongWritable,Text,Text,FlowBean>{
    //参数说明:输入的key的类型为LongWritable(这里为phoneNum)
    //                 value的类型为Text
    //         输出的key为Text
    //               value为FlowBean对象
        
        protected void map(LongWritable key,Text value,Mapper<LongWritable,Text,Text,FlowBean>.Context context)
                                  throws IOException,InterruptedException{
            String line = value.toString();
            
            String[] fields = line.split("\t");
            
            String phoneNum = fields[0];
            
            long upFlow = Long.parseLong(fields[1]);
            long downFlow = Long.parseLong(fields[2]);
            
            context.write(new Text(phoneNum), new FlowBean(upFlow,downFlow));
            
        }
    }
        
        static class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean>{
            
            protected void reduce(Text key,Iterable<FlowBean> values, Reducer<Text,FlowBean,Text,FlowBean>
                                .Context context) throws IOException,InterruptedException{
                long sum_upFlow = 0;
                long sum_dFlow = 0;
                
                for(FlowBean bean : values) {
                    sum_upFlow += bean.getUpFlow();
                    sum_dFlow+=bean.getDownFlow();
                }
                
                FlowBean resultBean = new FlowBean(sum_upFlow,sum_dFlow);
                context.write(key, resultBean);;
            }
        }
        
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();;
            Job job = Job.getInstance(conf,"flow count");
    
            job.setJarByClass(FlowCount.class);
    
            job.setMapperClass(FlowCountMapper.class);
            job.setReducerClass(FlowCountReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);

            FileInputFormat.setInputPaths(job, new Path(args[0]));
    
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
            boolean res = job.waitForCompletion(true);
            System.exit(res?0:1);
            }
}

这里说明一下,context这个参数相当与是map和reduce中间的桥梁,所以不论是map还是reduce函数,都会有这个参数。reduce函数里面要传一个iterater的参数,原因是在经过map和shuffle阶段之后,输入的值会变成<Ukey,<vaue1,value2,value3...>>,即相同的key的value被整合到了一起,所以处理的时候要遍历相同key的所有value。

上一篇 下一篇

猜你喜欢

热点阅读