第2章 关于MapReduce 学习笔记

2019-04-12  本文已影响0人  主君_05c4

2.1 使用Hadoop分析数据

2.1.1 map和reduce

MapReduce包含map和reduce两个阶段,每阶段输入输出都为key-value

以下示例为计算每个月最高温度

1. 输入

温度原始数据input.txt,格式为日期,...,温度

2018-01-02,...,12
2018-01-03,...,8
2018-01-04,...,
2018-01-05,...,30
2018-02-01,...,8
2018-02-02,...,15
2018-02-03,...,12
2018-02-04,...,24
2.map
package com.zyf.study;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] values = line.split(",");
        if (values.length > 2) {
            int temperature = Integer.parseInt(values[2]);
            context.write(new Text(values[0].substring(0, 7)), new IntWritable(temperature));
        }
    }
}
3.reduce
package com.zyf.study;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {


    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
                   throws IOException, InterruptedException {
        int maxValue = Integer.MIN_VALUE;
        for (Iterator<IntWritable> it = values.iterator(); it.hasNext(); ) {
            int t = it.next().get();
            if (t > maxValue) {
                maxValue = t;
            }
        }

        context.write(key, new IntWritable(maxValue));
    }
}
4. Job启动类
package com.zyf.study;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MaxTemperature {
    public static void main( String[] args ) throws IOException, ClassNotFoundException, InterruptedException {
        if (args.length < 2) {
            System.err.println("Usage: MaxTemperature <input path> <output path>");
            System.exit(-1);
        }

        Job job = Job.getInstance();

        job.setJobName("MaxTemperature");
        job.setJarByClass(MaxTemperature.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
5.执行日志

hadoop jar hadoop-first-1.0-SNAPSHOT.jar D:\temp\hadoop-testdata D:\temp\hadoop-testdata\output
截取部分日志如下:

19/04/12 20:07:58 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local281339793_0001
19/04/12 20:07:58 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
19/04/12 20:07:58 INFO mapreduce.Job: Running job: job_local281339793_0001
19/04/12 20:07:59 INFO mapred.LocalJobRunner: Waiting for map tasks
19/04/12 20:07:59 INFO mapred.LocalJobRunner: Starting task: attempt_local281339793_0001_m_000000_0
19/04/12 20:07:59 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
19/04/12 20:07:59 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
19/04/12 20:07:59 INFO mapred.Task:  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@6a58318a
19/04/12 20:07:59 INFO mapred.MapTask: Processing split: file:/D:/temp/hadoop-testdata/input.txt:0+146
.
.
19/04/12 20:07:59 INFO mapred.Task: Task:attempt_local281339793_0001_m_000000_0 is done. And is in the process of committing
19/04/12 20:07:59 INFO mapred.LocalJobRunner: map
19/04/12 20:07:59 INFO mapred.Task: Task 'attempt_local281339793_0001_m_000000_0' done.
19/04/12 20:07:59 INFO mapred.Task: Final Counters for attempt_local281339793_0001_m_000000_0: Counters: 18
        File System Counters
                FILE: Number of bytes read=6900
                FILE: Number of bytes written=305142
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
        Map-Reduce Framework
                Map input records=8
                Map output records=7
                Map output bytes=84
                Map output materialized bytes=34
                Input split bytes=104
                Combine input records=7
                Combine output records=2
                Spilled Records=2
                Failed Shuffles=0
                Merged Map outputs=0
                GC time elapsed (ms)=0
                Total committed heap usage (bytes)=257425408
        File Input Format Counters
                Bytes Read=146
19/04/12 20:07:59 INFO mapred.LocalJobRunner: Finishing task: attempt_local281339793_0001_m_000000_0
19/04/12 20:08:00 INFO mapred.Task: Task 'attempt_local281339793_0001_r_000000_0' done.
19/04/12 20:08:00 INFO mapred.Task: Final Counters for attempt_local281339793_0001_r_000000_0: Counters: 24
        File System Counters
                FILE: Number of bytes read=7000
                FILE: Number of bytes written=305210
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
        Map-Reduce Framework
                Combine input records=0
                Combine output records=0
                Reduce input groups=2
                Reduce shuffle bytes=34
                Reduce input records=2
                Reduce output records=2
                Spilled Records=2
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=0
                Total committed heap usage (bytes)=257425408
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Output Format Counters
                Bytes Written=34
19/04/12 20:08:00 INFO mapred.LocalJobRunner: Finishing task: attempt_local281339793_0001_r_000000_0

通过上述日志也可以看到,combiner将原先的7条map输出,合并为2条

6.执行结果

结果文件保存于output目录

._SUCCESS.crc
.part-r-00000.crc
_SUCCESS
part-r-00000

part-r-00000文件内容如下:

2018-01 30
2018-02 24

2.2 横向扩展

为了实现横向扩展,需要将数据存储在分布式文件系统,YARN分发MapReduce计算至数据存储节点;

2.2.1 数据流

包括输入流、MapReduce函数、配置信息
分成若干Map、Reduce任务,任务由YARN调度在集群运行,任务失败将在其他节点自动重新调度运行;

  • 数据划分成等长小数据块,每个分片创建一个对应map任务,以实现更好的负载均衡,处理性能更高且空闲的机器,能处理的分片也更多,且map任务失败重试,单任务需要重新计算的数据量也会更少,分片越细,负载均衡质量越高,不过若切得太细,管理分片的时间越长,一般合理大小趋于HDFS块大小,默认128M.若分片跨越两个数据块,单HDFS节点不大可能同时存在这两个数据块,会存在网络传输问题。
  • 数据本地优化(data locality optimization),map任务调度至输入数据存储节点上执行,无须网络传输,以获得最佳性能,若数据副本所在节点非空闲,map任务将被调度至输入数据节点所在机架其他空闲map slot执行,非常少情况下,任务被调度至其他机架,这将导致机架间网络传输。
  • map任务结果写入到本地磁盘,该结果作为reduce输入,Job完成,该结果被删除,无须存在HDFS备份,从而减少网络开销,若map输出传送给reduce之前失败,其他节点将重运行map任务以构建中间结果
本地数据、本地机架、跨机架 map任务
  • 单个reduce的输入,是所有map的输出,排过序的map输出通过网络传输至reduce节点,不具备数据本地化优势;
  • 多个map输入,在reduce节点先合并,再由reduce函数处理;
  • reduce输出,存储于HDFS实现高可靠,第一个副本存储于当前节点,其他副本存储于同机架其他节点和其他机架随机节点;
一个reduce任务的MapReduce数据流.png
  • reduce任务数需要指定,而非基于输入数据量;
  • 多个reduce任务数据流如下,map任务根据reduce任务数量,创建对应数量的分区,将输出按分区函数放入对应的分区,默认partitioner通过哈希函数分区,也可自定义;
  • map与reduce之间的数据流称为shuffle(混洗),调整混洗参数对任务总执行时间影响很大;
  • 当数据处理可完全并行(即无需混洗),可能无需reduce,map将结果直接写入HDFS
多个reduce任务的MapReduce数据流.png 无reduce任务的MapReduce数据流.png
2.2.2 Combiner函数
  • 集群上可用带宽限制了MapReduce作业数量,最大化减少map和reduce间数据传输;
  • 可为map任务指定一个combiner以合并每个map输出,Hadoop无法确定对一个指定map任务输出记录调用多少次combiner,不论调多少次,不影响reduce输出;
  • 适用场景,如取最大值、最小值等,不适用场景包括取平均值等
1、
第一个map输出
    (2018-01, 12)
    (2018-01, 8)
    (2018-01, 30)
第二个map输出
    (2018-01, 16)
2、
reduce节点接收到上述4条记录,reduce调用时,输入合并为:
   (2018-01, [12, 8, 30,16])
3、
若调用了combiner且其逻辑与reduce一样
reduce节点只会接收到2条记录,内容如下,
  (2018-01, [30])
  (2018-01, [16])
reduce调用时,输入合并为:
  (2018-01, [30,16])
若取平均值,输出结果不符合预期
mean(12, 8, 30, 16) = 16.5
mean(mean(12, 8, 30), 16) = 16.3
2.2.3 运行分布式MapReduce作业

MapReduce可在本地开发调测,无须修改即可运行在集群,且集群可根据数据量水平扩展。

以上为《Hadoop权威指南》学习笔记

上一篇 下一篇

猜你喜欢

热点阅读