大数据,机器学习,人工智能大数据程序员

Hadoop MapReduce 学习笔记

2018-07-02  本文已影响22人  LY丶Smile

前言

本文是个人之前纪录的MapReduce学习笔记,主要涉及到MapReduce基本概念、Hadoop 经典示例WordCount的使用解析、hdfs与hbase的简单了解使用。现在整理了一下分享出来,希望对别人有所帮助。

学习MapReduce一定要理解这种Map、Reduce的编程模型以及Mapper、Reducer数据处理的原理,否则只是一味的复制粘贴可能比较难上手。

同时学习大数据的知识,一定要将自己对分布式的理解研究透彻。

一、概念理解

二、编程模型

三、运行机制

  1. 输入分片(input split)

    map计算之前,MapReduce会根据输入文件计算输入分片(input -> spliting),每个input split针对一个map任务。split存储的并不是数据,而是一个分片长度和一个记录数据的位置的数组

  2. map阶段

    map阶段的操作一般都是在数据存储节点上操作,所以有时候为了能够减轻数据传输的网络压力,可以先combiner阶段处理一下数据,在进行reduce

  3. combiner阶段

    此阶段是可选的,不是必须经过的一个阶段,combiner其实也是一种reduce操作,可以说combiner是一种本地化的reduce操作,是map运算的后续操作,可以减轻网络传输的压力。但是combiner的使用需要注意不要影响到reduce的最终结果,比如计算平均值的时候如果使用combiner就会影响最终的结果,但是计算总数的话则对最终结果没影响

  4. shuffle阶段

    将map的输出作为reduce的输入,这个过程就是shuffle,是MapReduce优化的重要阶段。

  5. reduce阶段

    reducer阶段,输入是shuffle阶段的输出,对每个不同的键和该键对应的值的数据流进行独立、并行的处理。

四、WordCount--官方提供的example

代码

package com.smile.test;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    private static final String INPUT_PATH = "/user/cdh/yjq/input/words.txt";
    //hdfs输出路径
    private static final String OUTPUT_PATH = "/user/cdh/yjq/output/";
    
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        // Text 实现了BinaryComparable类可以作为key值
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            // 解析键值对
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            
            for (IntWritable val : values) {
                sum += val.get();
            }

            result.set(sum);
            context.write(key, result);
        }
    }

    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {

        String[] paths = {INPUT_PATH,OUTPUT_PATH};
        //获得Configuration配置 Configuration: core-default.xml, core-site.xml 
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, paths).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        // 设置Mapper类
        job.setMapperClass(TokenizerMapper.class);
        // 设置Combiner类 
        job.setCombinerClass(IntSumReducer.class); 
        // 设置Reduce类
        job.setReducerClass(IntSumReducer.class); 
         // 设置输出key的类型,注意跟reduce的输出类型保持一致
        job.setOutputKeyClass(Text.class);
        // 设置输出value的类型,注意跟reduce的输出类型保持一致
        job.setOutputValueClass(IntWritable.class);
        // 设置输入路径
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
      // 设置输出路径
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

解析

    scan.setCacheBlocks(false); 

默认是true,分内存,缓存和磁盘,三种方式,一般数据的读取为内存->缓存->磁盘;

setCacheBlocks不适合MapReduce工作:
MR程序为非热点数据,不需要缓存,因为Blockcache is LRU,也就是最近最少访问算法(扔掉最少访问的),那么,前一个请求(比如map读取)读入Blockcache的所有记录在后一个请求(新的map读取)中都没有用,就必须全部被swap,那么RegionServer要不断的进行无意义的swapping data,也就是无意义的输入和输出BlockCache,增加了无必要的IO。而普通读取时局部查找,或者查找最热数据时,会有提升性能的帮助。

runner方法中可以写定义多个job,job会顺序执行。

五、常用hadoop fs命令 (类似Linux的文件操作命令,可类比学习使用)

-help
功能:输出这个命令参数手册

-ls
功能:显示目录信息
示例: hadoop fs -ls /yjq

-mkdir 
功能:在hdfs上创建目录
示例:hadoop fs -mkdir -p /yjq/test

-moveFromLocal
功能:从本地剪切粘贴到hdfs
示例:hadoop fs -moveFromLocal /home/cdh/a.txt /yjq/test

-moveToLocal
功能:从hdfs剪切粘贴到本地
示例:hadoop fs -moveToLocal /yjq/test/a.txt /home/cdh/ 

-copyFromLocal
功能:从本地文件系统中拷贝文件到hdfs路径去
示例:hadoop fs -copyFromLocal /home/cdh/a.txt /yjq/test

-copyToLocal
功能:从hdfs拷贝到本地
示例:hadoop fs -copyToLocal /yjq/test/a.txt /home/cdh/ 

-get
功能:等同于copyToLocal,从hdfs下载文件到本地路径(.表示当前路径)
示例:hadoop fs -get /yjq/test/a.txt .

-getmerge
功能:合并下载多个文件
示例:将目录下所有的TXT文件下载到本地,并合并成一个文件
hadoop fs -getmerge /yjq/test/*.txt /home/cdh/test.txt

-put
功能:等同于copyFromLocal
示例:hadoop fs -put /home/cdh/a.txt /yjq/test

-cp
功能:从hdfs的一个路径拷贝hdfs的另一个路径
示例: hadoop fs -cp /yjq/test1/a.txt /yjq/test2/

-mv
功能:在hdfs目录中移动文件
示例: hadoop fs -mv /yjq/test1/a.txt /yjq/test2/

-appendToFile
功能:追加一个文件到已经存在的文件末尾(本地文件追加到hdfs)
示例:Hadoop fs -appendToFile /home/cdh/a.txt /yjq/test1/a.txt

-cat
功能:显示文件内容
示例:hadoop fs -cat /yjq/test1/a.txt

-tail
功能:显示一个文件的末尾
示例:hadoop fs -tail /yjq/test1/a.txt

-text
功能:以字符形式打印一个文件的内容
示例:hadoop fs -text /yjq/test1/a.txt

-chgrp、-chmod、-chown
功能:修改文件所属权限 
示例:
hadoop fs -chmod 666 /yjq/test1/a.txt
# cdh为用户名,hadoop为用户组
hadoop fs -chown cdh:group /yjq/test1/a.txt

-rm
功能:删除文件或文件夹
示例:hadoop fs -rm -r /yjq/test/a.txt

-df
功能:统计文件系统的可用空间信息
示例:hadoop fs -df -h /

-du
功能:统计文件夹的大小信息
示例:
hadoop fs -du -s -h /yjq/*

-count
功能:统计一个指定目录下的文件节点数量
示例:hadoop fs -count /yjq/

六、HBase 相关操作

  1. 简介
    • HBase是一个分布式的、面向列的开源数据库
    • 表由行和列组成,列划分为多个列族/列簇(column family)
    • RowKey:是Byte array,是表中每条记录的“主键”,方便快速查找,Rowkey的设计非常重要。
    • Column Family:列族,拥有一个名称(string),包含一个或者多个相关列
    • Column:属于某一个columnfamily,familyName:columnName,每条记录可动态添加
    • Hbase--图片来源网络
  1. 编码

     Configuration conf = HBaseConfiguration.create();
    

    会自动读取hbase-site.xml配置文件

     Scan scan = new Scan();
     scan.setCaching(1000);
     scan.setStartRow(getBytes(startDate));
     scan.setStopRow(getBytes(endDate));
    
     TableMapReduceUtil.initTableMapperJob(HB_TABLE_NAME, scan, NewsStreamUrlMapper.class, Text.class, Text.class, job);
    

    参数:hbase table name,scan,mapper class,outputKeyClass,outputValueClass,job

七、hdfs操作

  1. 运算之前清除hdfs上的文件夹

     FileSystem fs = FileSystem.get(new Configuration());
     Path outputDir = new Path(OUTPUT_PATH);
     //运算之前如果文件夹存在则清除文件夹
     if(fs.exists(outputDir))
         fs.delete(outputDir, true);
    
  2. HDFS读流程

    • 客户端向NameNode发起读数据请求
    • NameNode找出距离最近的DataNode节点信息
    • 客户端从DataNode分块下载文件
  3. HDFS写流程

    • 客户端向NameNode发起写数据请求
    • 分块写入DataNode节点,DataNode自动完成副本备份
    • DataNode向NameNode汇报存储完成,NameNode通知客户端

八、多表操作

MultiTableInputFormat 支持多个mapper的输出混合到一个shuffle,一个reducer,其中每个mapper拥有不同的inputFormat和mapper处理类。
所有的mapper需要输出相同的数据类型,对于输出value,需要标记该value来源,以便reducer识别

List<Scan> scans = new ArrayList<Scan>();  

Scan scan1 = new Scan();  
scan1.setCaching(100);  
scan1.setCacheBlocks(false);  
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inTable.getBytes());  
scans.add(scan1);  
  
Scan scan2 = new Scan();  
scan2.setCaching(100);  
scan2.setCacheBlocks(false);  
scan2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inPhoneImsiTable.getBytes());  
scans.add(scan2);   

TableMapReduceUtil.initTableMapperJob(scans, ReadHbaseMapper.class, Text.class,Result.class, job);

九、错误处理

  1. ScannerTimeoutException:org.apache.hadoop.hbase.client.ScannerTimeoutException

    这是当从服务器传输数据到客户端的时间,或者客户端处理数据的时间大于了scanner设置的超时时间,scanner超时报错,可在客户端代码中设置超时时间

     Configuration conf = HBaseConfiguration.create()              
     conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,120000) 
    

    如果Mapper阶段对每条数据的处理时间过长,可以将scan.setCaching(1000)的值设置小一点,如果值设置太大,则处理时间会很长就会出现超时错误。

写在最后

很久之前写的学习笔记了,资料来源网络及项目组内的讨论,参考文献就不一一标注了,侵删~

如果您觉得本文对您有帮助,点个赞吧~~

上一篇下一篇

猜你喜欢

热点阅读