hadoop

MapReduce(二):MapReduce序列化

2021-12-08  本文已影响0人  codeMover

序列化概述

什么是序列化

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络。

反序列化就是将字节序列(或其他数据传输协议)或是从磁盘持久化数据,转换成内存的对象。

为什么要序列化

一般来说,活的对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储或的对象,可以将或的对象发送到远程计算机。

为什么不用Java序列化

Java序列化是一个重量级序列化框架(Serizlizable),一个对象被序列化后,会发你很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。

hadoop序列化优势

自定义Bean对象实现序列化接口 WritableComparable

企业开发中的基本类型不能满足所有需求。

Java类型 Hadoop Writable类型
Boolean BooleanWritable
Byte ByteWritable
Integer IntWritable
Float FloatWritable
Double DoubleWritable
String Text
Map MapWritable
Array MapWritable
Null NullWritable

案例

统计一个文件中的上行流量、下行流量以及总流量。

FlowBean.java

public class FlowBean implements Writable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    @Override
    public String toString() {
        return upFlow +
            "\t" + downFlow +
            "\t" + sumFlow;
    }
}

FlowMapper.java

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * @Description TODO
 * @Author magaowei
 * @Date 2021/11/21 11:38 下午
 * @Version 1.0
 */
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
    private Text outK = new Text();
    private FlowBean outV = new FlowBean();
    @Override
    protected void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, Text, FlowBean>.Context context)
        throws IOException, InterruptedException {
        // 1 获取一行
        String line = value.toString();
        // 2 切割
        String[] split = line.split(" ");
        System.out.println(split.length);
        // 3 抓取数据
        String phone = split[0];
        String upFlow = split[split.length-3];
        String downFlow = split[split.length-2];
        // 4 封装
        outK.set(phone);
        outV.setUpFlow(Long.parseLong(upFlow));
        outV.setDownFlow(Long.parseLong(downFlow));
        outV.setSumFlow();

        context.write(outK,outV);
    }
}

FlowReducer.java

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * @Description TODO
 * @Author magaowei
 * @Date 2021/11/21 11:47 下午
 * @Version 1.0
 */
public class FlowReducer extends Reducer<Text,FlowBean, Text,FlowBean> {
    private FlowBean outV = new FlowBean();
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values,
        Reducer<Text, FlowBean, Text, FlowBean>.Context context)
        throws IOException, InterruptedException {
        // 1 遍历集合类价值
        long totalUp = 0;
        long totalDown = 0;
        for (FlowBean value : values) {
            totalUp += value.getUpFlow();
            totalDown += value.getDownFlow();
        }
        // 3 封装
        outV.setUpFlow(totalUp);
        outV.setDownFlow(totalDown);
        outV.setSumFlow();
        // 4 写出
        context.write(key,outV);

    }
}

FlowDriver.java

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @Description TODO
 * @Author magaowei
 * @Date 2021/11/21 11:51 下午
 * @Version 1.0
 */
public class FlowDriver {

    public static void main(String[] args)
        throws IOException, InterruptedException, ClassNotFoundException {
        // 1 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 2 设置jar
        job.setJarByClass(FlowDriver.class);
        // 3 关联mapper和reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        // 4 设置mapper输出的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 5 设置最终输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 6 设置数据的输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/writable"));
        FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/writable"));
        // 7 提交job
        Boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

小结

本节复习了前一节hadoop类型,并和Java中原有数据类型做对比。Java(Serializable)序列化时一个重量级序列化框架,不便在网络传输;而hadoop(Writable)具有紧凑、快速等优势。最后通过案例实战方式统计输入数据。

上一篇下一篇

猜你喜欢

热点阅读