8_大数据之MapReduce_3

2019-06-24  本文已影响0人  十丈_红尘

Join多种应用

1️⃣Reduce Join

2️⃣Reduce Join案例实操
1.需求
//order.txt
1001   01  1
1002   02  2
1003   03  3
1004   01  4
1005   02  5
1006   03  6
//pd.txt
01 小米
02 华为
03 格力
将商品信息表中数据根据商品pid合并到订单数据表中。 2.需求分析
 通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联,如下图所示 3.代码实现
1)创建商品和订合并后的Bean
package com.xxx.reducejoin;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
   private String id;
   private String pid;
   private int amount;
   private String pname;

   @Override
   public String toString() {
       return id + "\t" + pname + "\t" + amount;
   }

   public String getId() {
       return id;
   }

   public void setId(String id) {
       this.id = id;
   }

   public String getPid() {
       return pid;
   }

   public void setPid(String pid) {
       this.pid = pid;
   }

   public int getAmount() {
       return amount;
   }

   public void setAmount(int amount) {
       this.amount = amount;
   }

   public String getPname() {
       return pname;
   }

   public void setPname(String pname) {
       this.pname = pname;
   }

   //按照Pid分组,组内按照pname排序,有pname的在前
   @Override
   public int compareTo(OrderBean o) {
       int compare = this.pid.compareTo(o.pid);
       if (compare == 0) {
           return o.getPname().compareTo(this.getPname());
       } else {
           return compare;
       }
   }

   @Override
   public void write(DataOutput out) throws IOException {
       out.writeUTF(id);
       out.writeUTF(pid);
       out.writeInt(amount);
       out.writeUTF(pname);
   }

   @Override
   public void readFields(DataInput in) throws IOException {
       id = in.readUTF();
       pid = in.readUTF();
       amount = in.readInt();
       pname = in.readUTF();
   }
}

2)编写TableMapper

package com.xxx.reducejoin;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

   private String filename;

   private OrderBean order = new OrderBean();

   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
       
       //获取切片文件名
       FileSplit fs = (FileSplit) context.getInputSplit();
       filename = fs.getPath().getName();
   }

   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       String[] fields = value.toString().split("\t");
       
       //对不同数据来源分开处理
       if ("order.txt".equals(filename)) {
           order.setId(fields[0]);
           order.setPid(fields[1]);
           order.setAmount(Integer.parseInt(fields[2]));
           order.setPname("");
       } else {
           order.setPid(fields[0]);
           order.setPname(fields[1]);
           order.setAmount(0);
           order.setId("");
       }

       context.write(order, NullWritable.get());
   }
}

3)编写TableReducer

package com.xxx.reducejoin;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

   @Override
   protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
       
       //第一条数据来自pd,之后全部来自order
       Iterator<NullWritable> iterator = values.iterator();
       
       //通过第一条数据获取pname
       iterator.next();
       String pname = key.getPname();
       
       //遍历剩下的数据,替换并写出
       while (iterator.hasNext()) {
           iterator.next();
           key.setPname(pname);
           context.write(key,NullWritable.get());
       }
   }


}

4)编写TableDriver

package com.xxx.reducejoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OrderDriver {
   public static void main(String[] args) throws IOException, ClassNotFoundException, >InterruptedException {
       Job job = Job.getInstance(new Configuration());
       job.setJarByClass(OrderDriver.class);

       job.setMapperClass(OrderMapper.class);
       job.setReducerClass(OrderReducer.class);
       job.setGroupingComparatorClass(OrderComparator.class);

       job.setMapOutputKeyClass(OrderBean.class);
       job.setMapOutputValueClass(NullWritable.class);

       job.setOutputKeyClass(OrderBean.class);
       job.setOutputValueClass(NullWritable.class);

       FileInputFormat.setInputPaths(job, new Path("d:\\input"));
       FileOutputFormat.setOutputPath(job, new Path("d:\\output"));

       boolean b = job.waitForCompletion(true);

       System.exit(b ? 0 : 1);

   }
}

4.测试

1001    小米  1   
1001    小米  1   
1002    华为  2   
1002    华为  2   
1003    格力  3   
1003    格力  3

5.总结

3️⃣Map Join
  1. 使用场景 : Map Join适用于一张表十分小,一张表很大的场景;
  2. 优点 :
    思考:在Reduce端处理过多的表,非常容易产生数据倾斜。怎么办?
    Map端缓存多张表,提前处理业务逻辑,这样增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。

3.具体办法:采用DistributedCache
(1)在Mappersetup阶段,将文件读取到缓存集合中。
(2)在驱动函数中加载缓存。
  // 缓存普通文件到Task运行节点。
  job.addCacheFile(new URI("file://e:/cache/pd.txt"));
4️⃣Map Join案例实操

  1. 需求
// order.txt
1001   01  1
1002   02  2
1003   03  3
1004   01  4
1005   02  5
1006   03  6
// pd.txt
01 小米
02 华为
03 格力

将商品信息表中数据根据商品pid合并到订单数据表中。

// 最终数据形式
id     pname   amount
1001   小米        1
1004   小米        4
1002   华为        2
1005   华为        5
1003   格力        3
1006   格力        6

2.需求分析 : MapJoin适用于关联表中有小表的情形。

3.实现代码
(1)先在驱动模块中添加缓存文件
package test;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class DistributedCacheDriver {

  public static void main(String[] args) throws Exception {
      
// 0 根据自己电脑路径重新配置
args = new String[]{"e:/input/inputtable2", "e:/output1"};

// 1 获取job信息
      Configuration configuration = new Configuration();
      Job job = Job.getInstance(configuration);

      // 2 设置加载jar包路径
      job.setJarByClass(DistributedCacheDriver.class);

      // 3 关联map
      job.setMapperClass(DistributedCacheMapper.class);
      
// 4 设置最终输出数据类型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(NullWritable.class);

      // 5 设置输入输出路径
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 6 加载缓存数据
      job.addCacheFile(new URI("file:///e:/input/inputcache/pd.txt"));
      
      // 7 Map端Join的逻辑不需要Reduce阶段,设置reduceTask数量为0
      job.setNumReduceTasks(0);

      // 8 提交
      boolean result = job.waitForCompletion(true);
      System.exit(result ? 0 : 1);
  }
}

(2)读取缓存的文件数据

package com.xxx.mapjoin;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

public class MjMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

   //pd表在内存中的缓存
   private Map<String, String> pMap = new HashMap<>();

   private Text line = new Text();

   //任务开始前将pd数据缓存进PMap
   @Override
   protected void setup(Context context) throws IOException, InterruptedException {
       
       //从缓存文件中找到pd.txt
       URI[] cacheFiles = context.getCacheFiles();
       Path path = new Path(cacheFiles[0]);

       //获取文件系统并开流
       FileSystem fileSystem = FileSystem.get(context.getConfiguration());
       FSDataInputStream fsDataInputStream = fileSystem.open(path);

       //通过包装流转换为reader
       BufferedReader bufferedReader = new BufferedReader(
               new InputStreamReader(fsDataInputStream, "utf-8"));

       //逐行读取,按行处理
       String line;
       while (StringUtils.isNotEmpty(line = bufferedReader.readLine())) {
           String[] fields = line.split("\t");
           pMap.put(fields[0], fields[1]);
       }

       //关流
       IOUtils.closeStream(bufferedReader);

   }

   @Override
   protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       String[] fields = value.toString().split("\t");

       String pname = pMap.get(fields[1]);

       line.set(fields[0] + "\t" + pname + "\t" + fields[2]);

       context.write(line, NullWritable.get());

   }
}

二 计数器应用


三 数据清洗(ETL)

  在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
① 数据清洗案例实操-简单解析版

  1. 需求 : 去除日志中字段长度小于等于11的日志;
    (1)输入数据
//web.log
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:50:08 +0000] "-" 400 0 "-" "-"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
66.249.66.84 - - [18/Sep/2013:06:50:28 +0000] "GET /page/6/ HTTP/1.1" 200 27777 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"

(2)期望输出数据 : 每行字段长度都大于11
2.需求分析 : 需要在Map阶段对输入的数据根据规则进行过滤清洗。
3.实现代码
(1)编写LogMapper

package com.xxx.mapreduce.weblog;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
  
  Text k = new Text();
  
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      
      // 1 获取1行数据
      String line = value.toString();
      
      // 2 解析日志
      boolean result = parseLog(line,context);
      
      // 3 日志不合法退出
      if (!result) {
          return;
      }
      
      // 4 设置key
      k.set(line);
      
      // 5 写出数据
      context.write(k, NullWritable.get());
  }

  // 2 解析日志
  private boolean parseLog(String line, Context context) {

      // 1 截取
      String[] fields = line.split(" ");
      
      // 2 日志长度大于11的为合法
      if (fields.length > 11) {

          // 系统计数器
          context.getCounter("map", "true").increment(1);
          return true;
      }else {
          context.getCounter("map", "false").increment(1);
          return false;
      }
  }
}

(2)编写LogDriver

package com.xxx.mapreduce.weblog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogDriver {

  public static void main(String[] args) throws Exception {

// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
       args = new String[] { "e:/input/inputlog", "e:/output1" };

      // 1 获取job信息
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf);

      // 2 加载jar包
      job.setJarByClass(LogDriver.class);

      // 3 关联map
      job.setMapperClass(LogMapper.class);

      // 4 设置最终输出类型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(NullWritable.class);

      // 设置reducetask个数为0
      job.setNumReduceTasks(0);

      // 5 设置输入和输出路径
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 6 提交
      job.waitForCompletion(true);
  }
}

② 数据清洗案例实操-复杂解析版
1.需求 : 对Web访问日志中的各字段识别切分,去除日志中不合法的记录。根据清洗规则,输出过滤后的数据。
(1)输入数据

//web.log
194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:42 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
101.226.68.137 - - [18/Sep/2013:06:49:45 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
60.208.6.156 - - [18/Sep/2013:06:49:48 +0000] "GET /wp-content/uploads/2013/07/rcassandra.png HTTP/1.0" 200 185524 "http://cos.name/category/software/packages/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:49:57 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "http://www.angularjs.cn/A00n" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/29.0.1547.66 Safari/537.36"
222.68.172.190 - - [18/Sep/2013:06:50:08 +0000] "-" 400 0 "-" "-"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
183.195.232.138 - - [18/Sep/2013:06:50:16 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
66.249.66.84 - - [18/Sep/2013:06:50:28 +0000] "GET /page/6/ HTTP/1.1" 200 27777 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)"

(2)期望输出数据 : 都是合法的数据
2.实现代码
(1)定义一个bean,用来记录日志数据中的各数据字段

package com.xxx.mapreduce.log;

public class LogBean {
  private String remote_addr;// 记录客户端的ip地址
  private String remote_user;// 记录客户端用户名称,忽略属性"-"
  private String time_local;// 记录访问时间与时区
  private String request;// 记录请求的url与http协议
  private String status;// 记录请求状态;成功是200
  private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
  private String http_referer;// 用来记录从那个页面链接访问过来的
  private String http_user_agent;// 记录客户浏览器的相关信息

  private boolean valid = true;// 判断数据是否合法

  public String getRemote_addr() {
      return remote_addr;
  }

  public void setRemote_addr(String remote_addr) {
      this.remote_addr = remote_addr;
  }

  public String getRemote_user() {
      return remote_user;
  }

  public void setRemote_user(String remote_user) {
      this.remote_user = remote_user;
  }

  public String getTime_local() {
      return time_local;
  }

  public void setTime_local(String time_local) {
      this.time_local = time_local;
  }

  public String getRequest() {
      return request;
  }

  public void setRequest(String request) {
      this.request = request;
  }

  public String getStatus() {
      return status;
  }

  public void setStatus(String status) {
      this.status = status;
  }

  public String getBody_bytes_sent() {
      return body_bytes_sent;
  }

  public void setBody_bytes_sent(String body_bytes_sent) {
      this.body_bytes_sent = body_bytes_sent;
  }

  public String getHttp_referer() {
      return http_referer;
  }

  public void setHttp_referer(String http_referer) {
      this.http_referer = http_referer;
  }

  public String getHttp_user_agent() {
      return http_user_agent;
  }

  public void setHttp_user_agent(String http_user_agent) {
      this.http_user_agent = http_user_agent;
  }

  public boolean isValid() {
      return valid;
  }

  public void setValid(boolean valid) {
      this.valid = valid;
  }

  @Override
  public String toString() {

      StringBuilder sb = new StringBuilder();
      sb.append(this.valid);
      sb.append("\001").append(this.remote_addr);
      sb.append("\001").append(this.remote_user);
      sb.append("\001").append(this.time_local);
      sb.append("\001").append(this.request);
      sb.append("\001").append(this.status);
      sb.append("\001").append(this.body_bytes_sent);
      sb.append("\001").append(this.http_referer);
      sb.append("\001").append(this.http_user_agent);
      
      return sb.toString();
  }
}

(2)编写LogMapper

package com.xxx.mapreduce.log;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
  Text k = new Text();
  
  @Override
  protected void map(LongWritable key, Text value, Context context)   throws IOException, InterruptedException {

      // 1 获取1行
      String line = value.toString();
      
      // 2 解析日志是否合法
      LogBean bean = parseLog(line);
      
      if (!bean.isValid()) {
          return;
      }
      
      k.set(bean.toString());
      
      // 3 输出
      context.write(k, NullWritable.get());
  }

  // 解析日志
  private LogBean parseLog(String line) {

      LogBean logBean = new LogBean();
      
      // 1 截取
      String[] fields = line.split(" ");
      
      if (fields.length > 11) {

          // 2封装数据
          logBean.setRemote_addr(fields[0]);
          logBean.setRemote_user(fields[1]);
          logBean.setTime_local(fields[3].substring(1));
          logBean.setRequest(fields[6]);
          logBean.setStatus(fields[8]);
          logBean.setBody_bytes_sent(fields[9]);
          logBean.setHttp_referer(fields[10]);
          
          if (fields.length > 12) {
              logBean.setHttp_user_agent(fields[11] + " "+ fields[12]);
          }else {
              logBean.setHttp_user_agent(fields[11]);
          }
          
          // 大于400,HTTP错误
          if (Integer.parseInt(logBean.getStatus()) >= 400) {
              logBean.setValid(false);
          }
      }else {
          logBean.setValid(false);
      }
      
      return logBean;
  }
}

(3)编写LogDriver

package com.xxx.mapreduce.log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LogDriver {
  public static void main(String[] args) throws Exception {
      
// 1 获取job信息
      Configuration conf = new Configuration();
      Job job = Job.getInstance(conf);

      // 2 加载jar包
      job.setJarByClass(LogDriver.class);

      // 3 关联map
      job.setMapperClass(LogMapper.class);

      // 4 设置最终输出类型
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(NullWritable.class);

      // 5 设置输入和输出路径
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      // 6 提交
      job.waitForCompletion(true);
  }
}

MapReduce开发总结


Hadoop数据压缩

1️⃣ 概述

2️⃣MR支持的压缩编码 3️⃣压缩方式选择
Gzip压缩 Bzip2压缩 Lzo压缩 Snappy压缩 4️⃣压缩位置选择 : 压缩可以在MapReduce作用的任意阶段启用,如下图所示。 5️⃣压缩参数配置 6️⃣ 压缩实操案例
  1. 数据流的压缩和解压缩
package com.xxx.mapreduce.compress;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class TestCompress {

  public static void main(String[] args) throws Exception {
      compress("e:/hello.txt","org.apache.hadoop.io.compress.BZip2Codec");
//     decompress("e:/hello.txt.bz2");
  }

  // 1、压缩
  private static void compress(String filename, String method) throws Exception {
      
      // (1)获取输入流
      FileInputStream fis = new FileInputStream(new File(filename));
      
      Class codecClass = Class.forName(method);
      
      CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, new Configuration());
      
      // (2)获取输出流
      FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));
      CompressionOutputStream cos = codec.createOutputStream(fos);
      
      // (3)流的对拷
      IOUtils.copyBytes(fis, cos, 1024*1024*5, false);
      
// (4)关闭资源
      cos.close();
      fos.close();
fis.close();
  }

  // 2、解压缩
  private static void decompress(String filename) throws FileNotFoundException, IOException {
      
      // (0)校验是否能解压缩
      CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());

      CompressionCodec codec = factory.getCodec(new Path(filename));
      
      if (codec == null) {
          System.out.println("cannot find codec for file " + filename);
          return;
      }
      
      // (1)获取输入流
      CompressionInputStream cis = codec.createInputStream(new FileInputStream(new File(filename)));
      
      // (2)获取输出流
      FileOutputStream fos = new FileOutputStream(new File(filename + ".decoded"));
      
      // (3)流的对拷
      IOUtils.copyBytes(cis, fos, 1024*1024*5, false);
      
      // (4)关闭资源
      cis.close();
      fos.close();
  }
}
  1. Map输出端采用压缩 : 即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对Map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可;
package com.xxx.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;   
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

      Configuration configuration = new Configuration();

      // 开启map端输出压缩
  configuration.setBoolean("mapreduce.map.output.compress", true);
      // 设置map端输出压缩方式
  configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

      Job job = Job.getInstance(configuration);

      job.setJarByClass(WordCountDriver.class);

      job.setMapperClass(WordCountMapper.class);
      job.setReducerClass(WordCountReducer.class);

      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(IntWritable.class);

      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);

      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));

      boolean result = job.waitForCompletion(true);

      System.exit(result ? 1 : 0);
  }
}

1.Mapper保持不变

package com.xxx.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{

Text k = new Text();
  IntWritable v = new IntWritable(1);

  @Override
  protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {

      // 1 获取一行
      String line = value.toString();

      // 2 切割
      String[] words = line.split(" ");

      // 3 循环写出
      for(String word:words){
k.set(word);
          context.write(k, v);
      }
  }
}
  1. Reducer保持不变
package com.xxx.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

  IntWritable v = new IntWritable();

  @Override
  protected void reduce(Text key, Iterable<IntWritable> values,
          Context context) throws IOException, InterruptedException {
      
      int sum = 0;

      // 1 汇总
      for(IntWritable value:values){
          sum += value.get();
      }
      
       v.set(sum);

       // 2 输出
      context.write(key, v);
  }
}
  1. Reduce输出端采用压缩
    1.修改驱动
package com.xxx.mapreduce.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {

  public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
      
      Configuration configuration = new Configuration();
      
      Job job = Job.getInstance(configuration);
      
      job.setJarByClass(WordCountDriver.class);
      
      job.setMapperClass(WordCountMapper.class);
      job.setReducerClass(WordCountReducer.class);
      
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(IntWritable.class);
      
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(IntWritable.class);
      
      FileInputFormat.setInputPaths(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      
      // 设置reduce端输出压缩开启
      FileOutputFormat.setCompressOutput(job, true);
      
      // 设置压缩的方式
      FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); 
//     FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); 
//     FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); 
      
      boolean result = job.waitForCompletion(true);
      
      System.exit(result?1:0);
  }
}

2.MapperReducer保持不变(详见6.2

上一篇下一篇

猜你喜欢

热点阅读