TopN案例

2019-03-09  本文已影响0人  bullion

需求

需求分析

实现

    1)根据序列化实操案例实现

    2)修改FlowBean类

    @Override

    public int compareTo(FlowBean bean) {

        int result;

        if (this.sumFlow > bean.getSumFlow()) {

            result = -1;

        }else if (this.sumFlow < bean.getSumFlow()) {

            result = 1;

        }else {

            result = 0;

        }

        return result;

    }

    3)修改TopNMapper类

    // 定义一个TreeMap作为存储数据的容器(天然按key排序)

    private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

@Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        ......

        // 4 向TreeMap中添加数据

        flowMap.put(kBean, v);

        // 5 限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据

        if (flowMap.size() > 10) {

            flowMap.remove(flowMap.lastKey());

        }

    }

@Override

    protected void cleanup(Context context) throws IOException, InterruptedException {

        // 6 遍历treeMap集合,输出数据

        Iterator<FlowBean> bean = flowMap.keySet().iterator();

        while (bean.hasNext()) {

            FlowBean k = bean.next();

            context.write(k, flowMap.get(k));

        }

    }

    4)修改TopNReducer类

    // 定义一个TreeMap作为存储数据的容器(天然按key排序)

    TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

@Override

    protected void reduce(FlowBean key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

        for (Text value : values) {

            FlowBean bean = new FlowBean();

            bean.set(key.getDownFlow(), key.getUpFlow());

            // 1 向treeMap集合中添加数据

            flowMap.put(bean, new Text(value));

            // 2 限制TreeMap数据量,超过10条就删除掉流量最小的一条数据

            if (flowMap.size() > 10) {

                // flowMap.remove(flowMap.firstKey());

                flowMap.remove(flowMap.lastKey());

            }

        }

    }

@Override

    protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

        // 3 遍历集合,输出数据

        Iterator<FlowBean> it = flowMap.keySet().iterator();

        while (it.hasNext()) {

            FlowBean v = it.next();

            context.write(new Text(flowMap.get(v)), v);

        }

    }

实际代码

FlowBean

public class FlowBean implements WritableComparable<FlowBean>{

    private long upFlow;

    private long downFlow;

    private long sumFlow;

    public FlowBean() {

        super();

    }

    public FlowBean(long upFlow, long downFlow) {

        super();

        this.upFlow = upFlow;

        this.downFlow = downFlow;

    }

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeLong(upFlow);

        out.writeLong(downFlow);

        out.writeLong(sumFlow);

    }

    @Override

    public void readFields(DataInput in) throws IOException {

        upFlow = in.readLong();

        downFlow = in.readLong();

        sumFlow = in.readLong();

    }

    public long getUpFlow() {

        return upFlow;

    }

    public void setUpFlow(long upFlow) {

        this.upFlow = upFlow;

    }

    public long getDownFlow() {

        return downFlow;

    }

    public void setDownFlow(long downFlow) {

        this.downFlow = downFlow;

    }

    public long getSumFlow() {

        return sumFlow;

    }

    public void setSumFlow(long sumFlow) {

        this.sumFlow = sumFlow;

    }

    @Override

    public String toString() {

        return upFlow + "\t" + downFlow + "\t" + sumFlow;

    }

    public void set(long downFlow2, long upFlow2) {

        downFlow = downFlow2;

        upFlow = upFlow2;

        sumFlow = downFlow2 + upFlow2;

    }

    @Override

    public int compareTo(FlowBean bean) {

        int result;

        if (this.sumFlow > bean.getSumFlow()) {

            result = -1;

        }else if (this.sumFlow < bean.getSumFlow()) {

            result = 1;

        }else {

            result = 0;

        }

        return result;

    }

}

TopNMapper

public class TopNMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

    // 定义一个TreeMap作为存储数据的容器(天然按key排序)

    private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

    private FlowBean kBean;

    @Override

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        kBean = new FlowBean();

        Text v = new Text();

        // 1 获取一行

        String line = value.toString();

        // 2 切割

        String[] fields = line.split("\t");

        // 3 封装数据

        String phoneNum = fields[0];

        long upFlow = Long.parseLong(fields[1]);

        long downFlow = Long.parseLong(fields[2]);

        long sumFlow = Long.parseLong(fields[3]);

        kBean.setDownFlow(downFlow);

        kBean.setUpFlow(upFlow);

        kBean.setSumFlow(sumFlow);

        v.set(phoneNum);

        // 4 向TreeMap中添加数据

        flowMap.put(kBean, v);

        // 5 限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据

        if (flowMap.size() > 10) {

// flowMap.remove(flowMap.firstKey());

            flowMap.remove(flowMap.lastKey());

        }

    }

    @Override

    protected void cleanup(Context context) throws IOException, InterruptedException {

        // 6 遍历treeMap集合,输出数据

        Iterator<FlowBean> bean = flowMap.keySet().iterator();

        while (bean.hasNext()) {

            FlowBean k = bean.next();

            context.write(k, flowMap.get(k));

        }

    }

}

TopNReducer

public class TopNReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    // 定义一个TreeMap作为存储数据的容器(天然按key排序)

    TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

    @Override

    protected void reduce(FlowBean key, Iterable<Text> values, Context context)throws IOException, InterruptedException {

        for (Text value : values) {

            FlowBean bean = new FlowBean();

            bean.set(key.getDownFlow(), key.getUpFlow());

            // 1 向treeMap集合中添加数据

            flowMap.put(bean, new Text(value));

            // 2 限制TreeMap数据量,超过10条就删除掉流量最小的一条数据

            if (flowMap.size() > 10) {

                // flowMap.remove(flowMap.firstKey());

                flowMap.remove(flowMap.lastKey());

            }

        }

    }

    @Override

    protected void cleanup(Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {

        // 3 遍历集合,输出数据

        Iterator<FlowBean> it = flowMap.keySet().iterator();

        while (it.hasNext()) {

            FlowBean v = it.next();

            context.write(new Text(flowMap.get(v)), v);

        }

    }

}

TopNDriver

public class TopNDriver {

    public static void main(String[] args) throws Exception {

        args  = new String[]{"e:/output1","e:/output3"};

        // 1 获取配置信息,或者job对象实例

        Configuration configuration = new Configuration();

        Job job = Job.getInstance(configuration);

        // 6 指定本程序的jar包所在的本地路径

        job.setJarByClass(TopNDriver.class);

        // 2 指定本业务job要使用的mapper/Reducer业务类

        job.setMapperClass(TopNMapper.class);

        job.setReducerClass(TopNReducer.class);

        // 3 指定mapper输出数据的kv类型

        job.setMapOutputKeyClass(FlowBean.class);

        job.setMapOutputValueClass(Text.class);

        // 4 指定最终输出的数据的kv类型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(FlowBean.class);

        // 5 指定job的输入原始文件所在目录

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行

        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);

    }

}

上一篇 下一篇

猜你喜欢

热点阅读