MapReduce中分组组件

2020-08-13  本文已影响0人  羋学僧

MapReduce中分组组件

一、GroupComparator

定义 分组:分组是mapreduce中shuffle组件当中reduce端的一个功能组件,主要的作用是决定哪些数据作 为一组。

我们可以自定义分组实现不同的key作为同一个组

二、分组 案例

需求

经典案例:求出每一个订单中成交金额最大的一笔交易

示例数据如下

    订单编号 商品编号 金额
    order_001 goods_001 100
    order_001 goods_002 200
    order_002 goods_003 300
    order_002 goods_004 400
    order_002 goods_005 500
    order_003 goods_001 100

预期结果:

order_001   goods_002   200
order_002   goods_005   500
order_003   goods_001   100

步骤:

1、定义实体类
2、定义Mapper
3、定义分区
4、定义分组
5、定义Reducer
6、定义主类

OrderBean.java

/**
 * 订单实体类
 */
public class OrderBean implements WritableComparable<OrderBean>{

    private String orderId; //订单编号
    private Double price; //订单中某个商品的价格

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return "OrderBean{" +
                "orderId='" + orderId + '\'' +
                ", price=" + price +
                '}';
    }

    /**
     * @param o 实体参数
     * @return
     * 指定排序的规则
     */
    @Override
    public int compareTo(OrderBean o) {
        //1、先比较订单的id,如果id一样,则将订单的金额排序(降序)
        int i = this.orderId.compareTo(o.orderId);
        if (i == 0){
            //因为是降序,所以有-1
            i = this.price.compareTo(o.price) * -1;
        }
        return i;
    }

    /**
     * 实现对象的序列化
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(price);
    }

    /**
     * 实现对象的反序列化
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.price = in.readDouble();
    }
}

OrderMapper.java

/**
 * Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 *     KEYIN:偏移量
 *     VALUEIN:一行文本
 *     KEYOUT:k2 OrderBean
 *     VALUEOUT:v2 文本
 */
public class OrderMapper extends Mapper<LongWritable,Text,OrderBean,Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1、拆分行文本数据,得到订单的id和订单的金额
        String[] split = value.toString().split("\t");

        //2、封装OrderBean实体类
        OrderBean orderBean = new OrderBean();
        orderBean.setOrderId(split[0]);
        orderBean.setPrice(Double.parseDouble(split[2]));

        //3、写入上下文
        context.write(orderBean,value);
    }
}

OrderPartition.java

/**
 * Partitioner<KEY, VALUE>
 *     KEY:k2
 *     VALUE:v2
 */
public class OrderPartition extends Partitioner<OrderBean,Text> {

    /**
     *
     * @param orderBean k2
     * @param text v2
     * @param numPartitions ReduceTask的个数
     * @return  返回的是分区的编号:比如说:ReduceTask的个数3个,返回的编号是 0 1 2
     */
    @Override
    public int getPartition(OrderBean orderBean, Text text, int numPartitions) {
        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }
}

OrderGroup.java

/**
 * 订单的分组类
 * 实现分组有固定的步骤:
 * 1、继承WritableComparator
 * 2、调用父类的构造器
 * 3、指定分组的规则,重写一个方法
 */
public class OrderGroup extends WritableComparator {

    //1、继承WritableComparator类
    //2、调用父类的构造器
    public OrderGroup(){
        //第一个参数就是分组使用的javabean,第二个参数就是布尔类型,表示是否可以创建这个类的实例
        super(OrderBean.class,true);
    }

    // 3、指定分组的规则,需要重写一个方法

    /**
     * @param a  WritableComparable是接口,Orderbean实现了这个接口
     * @param b WritableComparable是接口,Orderbean实现了这个接口
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //1、对形参a b 做强制类型转换
        OrderBean first = (OrderBean) a;
        OrderBean second = (OrderBean) b;

        //2、指定分组的规则
        return first.getOrderId().compareTo(second.getOrderId());
    }
}

OrderReducer.java

/**
 * Reducer
 *  Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
 *      KEYIN:k2
 *      VALUEIN:v2
 *      KEYOUT :k3 一行文本
 *      VALUEOUT:v3 NullWritable
 *
 */
public class OrderReducer extends Reducer<OrderBean,Text,Text,NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        int i = 0;
        //获取top N ,下面的代码就是取出来top1。
        for (Text value : values) {
            context.write(value,NullWritable.get());
            i++;
            if (i >= 1){
                break;
            }
        }
    }
}

JobMain.java

/**
 * 求订单最大值的主类
 */
public class JobMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //一、初始化一个job
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "mygroup");

        //二、配置Job信息
        //1、设置输入信息
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("D://input/orders.txt"));

        //2、设置mapper
        job.setMapperClass(OrderMapper.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(Text.class);

        //3 4 5 6  shuffle
        //分区设置
        job.setPartitionerClass(OrderPartition.class);

        //分组设置
        job.setGroupingComparatorClass(OrderGroup.class);

        //7、设置Reducer
        job.setReducerClass(OrderReducer.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        //8、设置输出
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("D://mygroup_out"));

        //三、等待完成
        boolean b = job.waitForCompletion(true);
        System.out.println(b);
        System.exit(b ? 0 : 1);
    }
}

实际结果:

上一篇 下一篇

猜你喜欢

热点阅读