MapReduce(二):MapReduce序列化
2021-12-08 本文已影响0人
codeMover
序列化概述
什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络。
反序列化就是将字节序列(或其他数据传输协议)或是从磁盘持久化数据,转换成内存的对象。
为什么要序列化
一般来说,活的对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储或的对象,可以将或的对象发送到远程计算机。
为什么不用Java序列化
Java序列化是一个重量级序列化框架(Serizlizable),一个对象被序列化后,会发你很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。
hadoop序列化优势
- 结构紧凑,存储空间少
- 传输快速
- 互操作性,支持多语言使用
自定义Bean对象实现序列化接口 WritableComparable
企业开发中的基本类型不能满足所有需求。
Java类型 | Hadoop Writable类型 |
---|---|
Boolean | BooleanWritable |
Byte | ByteWritable |
Integer | IntWritable |
Float | FloatWritable |
Double | DoubleWritable |
String | Text |
Map | MapWritable |
Array | MapWritable |
Null | NullWritable |
- 必须实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须有空参构造
- 重写序列化方法
- 重写反序列化方法
- 反序列化的顺序和序列化的顺序完全一致
- 要想把结果展示在文件中,需要重写toString(),可用"\t"分开,方便后续用
- 如果需要将自定义的bean放在key中传输,则还需要实现comparable接口,因为MapReduce框架中的shuffle过程要求对key必须能排序。
案例
统计一个文件中的上行流量、下行流量以及总流量。
FlowBean.java
public class FlowBean implements Writable {
private long upFlow;
private long downFlow;
private long sumFlow;
public FlowBean() {
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
@Override
public String toString() {
return upFlow +
"\t" + downFlow +
"\t" + sumFlow;
}
}
FlowMapper.java
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* @Description TODO
* @Author magaowei
* @Date 2021/11/21 11:38 下午
* @Version 1.0
*/
public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> {
private Text outK = new Text();
private FlowBean outV = new FlowBean();
@Override
protected void map(LongWritable key, Text value,
Mapper<LongWritable, Text, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 切割
String[] split = line.split(" ");
System.out.println(split.length);
// 3 抓取数据
String phone = split[0];
String upFlow = split[split.length-3];
String downFlow = split[split.length-2];
// 4 封装
outK.set(phone);
outV.setUpFlow(Long.parseLong(upFlow));
outV.setDownFlow(Long.parseLong(downFlow));
outV.setSumFlow();
context.write(outK,outV);
}
}
FlowReducer.java
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* @Description TODO
* @Author magaowei
* @Date 2021/11/21 11:47 下午
* @Version 1.0
*/
public class FlowReducer extends Reducer<Text,FlowBean, Text,FlowBean> {
private FlowBean outV = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values,
Reducer<Text, FlowBean, Text, FlowBean>.Context context)
throws IOException, InterruptedException {
// 1 遍历集合类价值
long totalUp = 0;
long totalDown = 0;
for (FlowBean value : values) {
totalUp += value.getUpFlow();
totalDown += value.getDownFlow();
}
// 3 封装
outV.setUpFlow(totalUp);
outV.setDownFlow(totalDown);
outV.setSumFlow();
// 4 写出
context.write(key,outV);
}
}
FlowDriver.java
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @Description TODO
* @Author magaowei
* @Date 2021/11/21 11:51 下午
* @Version 1.0
*/
public class FlowDriver {
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
// 1 获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 设置jar
job.setJarByClass(FlowDriver.class);
// 3 关联mapper和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 4 设置mapper输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 5 设置最终输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 6 设置数据的输入路径和输出路径
FileInputFormat.setInputPaths(job, new Path(System.getProperty("user.dir")+"/input/writable"));
FileOutputFormat.setOutputPath(job, new Path(System.getProperty("user.dir")+"/output/writable"));
// 7 提交job
Boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
小结
本节复习了前一节hadoop类型,并和Java中原有数据类型做对比。Java(Serializable)序列化时一个重量级序列化框架,不便在网络传输;而hadoop(Writable)具有紧凑、快速等优势。最后通过案例实战方式统计输入数据。