MapReduce实现join
需求分析
一个电商网站后台数据存在两个表(可以看为两个文件):
用户表信息:用户ID、用户名、电话
订单表信息:用户ID、订单ID、商品价格、订单日期
如果想把两张表关联成:用户ID、用户名、电话、订单ID,价格,日期,并且按照需求对其输出。
如何用mapreduce实现?
实现逻辑分析
-
在map端读取两个表,设置输出格式为 <用户ID,(用户名,电话)> 或者为 <用户ID,(订单ID,商品价格,订单日期)>。所以map输出的key值为用户ID,value值为用户信息或者订单表信息构成的字符串。
join1.png -
shuffle分组会合并相同key值的value。
join2.png -
reduce端输出,为了输出的方便,需要知道values序列里面哪一个来自于用户表,哪一个来自于订单表。这里可以自定义数据,在map输出时给每一个信息定一个标签,reduce端根据这个标签识别(这里需要自定义数据类型)。
join3.png
代码实现
- 测试数据
- customers.csv文件
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000
- orders.csv文件
3,A,12.95,02-Jun-2008
1,B,88.25,20-May-2008
2,C,32.00,30-Nov-2007
3,D,25.02,22-Jan-2009
- 自定义数据类型关键代码
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class JoinWritable implements Writable {
// 标签,用于区分data来自于用户表还是订单表
private String tag;
// 数据
private String data;
public JoinWritable() {
}
public JoinWritable(String tag, String data) {
this.set(tag, data);
}
public void set(String tag, String data) {
this.setTag(tag);
this.setData(data);
}
public void write(DataOutput arg0) throws IOException {
arg0.writeUTF(this.getTag());
arg0.writeUTF(this.getData());
}
public void readFields(DataInput arg0) throws IOException {
this.setTag(arg0.readUTF());
this.setData(arg0.readUTF());
}
public String getTag() {
return tag;
}
public void setTag(String tag) {
this.tag = tag;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
- map端关键代码
private static class ModuelMapper extends Mapper<LongWritable, Text, LongWritable, JoinWritable> {
private LongWritable mapOutPutKey = new LongWritable();
private JoinWritable mapOutPutValue = new JoinWritable();
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String lineValue = value.toString();
//分割CSV文件
String[] values = lineValue.split(",");
//过滤脏数据
int length = values.length;
if(length != 3 && length != 4) {
return;
}
// 两个文件第一列都为ID,第二列为name
Long cid = Long.valueOf(values[0]);
String name = values[1];
// 读取用户表设置key和value
if(length == 3) {
String phoneNum = values[2];
mapOutPutKey.set(cid);
// "customer"用于标记用户表
mapOutPutValue.set("customer", name + "," + phoneNum);
}
// 读取订单表设置key和value
if(length == 4) {
String price = values[2];
String date = values[3];
mapOutPutKey.set(cid);
// "order"用于标记订单表
mapOutPutValue.set("order", name + "," + price + "," + date);
}
context.write(mapOutPutKey, mapOutPutValue);
}
}
- reduce端代码
private static class ModuelReducer extends Reducer<LongWritable, JoinWritable, Text, NullWritable> {
private Text reduceOutPutKey = new Text();
public void reduce(LongWritable key, Iterable<JoinWritable> values, Context context)
throws IOException, InterruptedException {
// 用户信息
String customerInfo = null;
// 订单信息,一个用户可能对应多个订单信息,所以需要用容器
List<String> list = new ArrayList<String>();
// 判断values来自于用户信息表还是订单表,然后取出值。
for(JoinWritable value : values) {
if("customer".equals(value.getTag())) {
customerInfo = value.getData();
} else if("order".equals(value.getTag())) {
list.add(value.getData());
}
}
// 以 "cid,用户信息,订单信息" 的形式 设置key
for(String value : list) {
reduceOutPutKey.set(key.get() + "," + customerInfo + "," + value);
context.write(reduceOutPutKey, NullWritable.get());
}
}
}
- 测试结果
1,Stephanie Leung,555-555-5555,B,88.25,20-May-2008
2,Edward Kim,123-456-7890,C,32.00,30-Nov-2007
3,Jose Madriz,281-330-8004,D,25.02,22-Jan-2009
3,Jose Madriz,281-330-8004,A,12.95,02-Jun-2008