大数据,机器学习,人工智能大数据大数据 爬虫Python AI Sql

MapReduce(五):Map阶段

2019-04-29  本文已影响43人  b91cbec6a902

概述

基于Hadoop 2.x

Map阶段启动的大致流程是怎样的?

1、在MRAppMaster进程中通过ContainerLauncher向NodeManager发送Container启动命令,启动YarnChild进程org.apache.hadoop.mapred.YarnChild#main,通过启动命令传入了以下几个参数:
①MRAppMaster进程中的TaskAttemptListener组件提供的TaskUmbilicalProtocol服务的host和port。
②当前任务的TaskAttemptID。
③当前任务的JVMId。

class YarnChild {

    public static void main(String[] args) throws Throwable {

        String host = args[0];

        int port = Integer.parseInt(args[1]);

        final TaskAttemptID firstTaskid = TaskAttemptID.forName(args[2]);

        long jvmIdLong = Long.parseLong(args[3]);

        .....
    }

}

2、通过RPC协议TaskUmbilicalProtocol与MRAppMaster建立通讯。

final InetSocketAddress address =
        NetUtils.createSocketAddrForHost(host, port);

final TaskUmbilicalProtocol umbilical =
  taskOwner.doAs(new PrivilegedExceptionAction<TaskUmbilicalProtocol>() {
  @Override
  public TaskUmbilicalProtocol run() throws Exception {
    return (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
        TaskUmbilicalProtocol.versionID, address, job);
  }
});

3、通过RPC协议TaskUmbilicalProtocol获取要处理的任务,Map阶段获取的任务实例为org.apache.hadoop.mapred.MapTask,然后开始执行任务。

myTask = umbilical.getTask(context);

taskFinal.run(job, umbilical); // run the task

4、在MapTask任务真正执行之前,先准备运行的环境。
①初始化TaskReporter,随时向MRAppMaster进程汇报Map任务的进度。

TaskReporter reporter = startReporter(umbilical);

②初始化Job上下文信息、Task上下文信息、Map阶段结果提交器等。

initialize(job, getJobID(), reporter, useNewApi);
// 初始化Job上下文信息
jobContext = new JobContextImpl(job, id, reporter);

// 初始化Task上下文信息
taskContext = new TaskAttemptContextImpl(job, taskId, reporter);

// 初始化Map阶段结果提交器
outputFormat =
    ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
committer = outputFormat.getOutputCommitter(taskContext);

5、初始化数据源、数据输入器、数据处理器、数据输出器。

runNewMapper(job, splitMetaInfo, umbilical, reporter);
// 初始化数据源,也就是数据分片
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
    splitIndex.getStartOffset());

// 初始化数据输入器,通过配置的InputFormat获取
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
  (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
    ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
  new NewTrackingRecordReader<INKEY,INVALUE>
    (split, inputFormat, reporter, taskContext);

// 初始化数据处理器,也就是我们自定义的Mapper
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
  (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
    ReflectionUtils.newInstance(taskContext.getMapperClass(), job);

// 初始化数据输出器,当Mapper处理完数据后将数据交给output输出
org.apache.hadoop.mapreduce.RecordWriter output = null;
if (job.getNumReduceTasks() == 0) {
  output = 
    new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
  output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}

6、开始数据处理。到这里就很熟悉了,这里就是我们自己编写的Mapper,循环处理所有数据。

mapper.run(mapperContext);
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
}

7、Map处理结果输出
在我们自定义的Mapper都会有这样一行代码context.write(key, value),进行处理结果的收集和输出操作。

数据输出器的具体实现类为:org.apache.hadoop.mapred.MapTask.NewOutputCollector

private class NewOutputCollector<K,V>
    extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
    private final MapOutputCollector<K,V> collector;
    private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
    private final int partitions;

    @SuppressWarnings("unchecked")
    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
      collector = createSortingCollector(job, reporter);
      // 数据分区数为Reduce的数量
      partitions = jobContext.getNumReduceTasks();
      if (partitions > 1) {
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }
    }
  ...
}

从上面的NewOutputCollector的构造方法可以看出,数据分区数为Reduce的数量,假设有n个Reduce,那么每个Map产生的结果集数据要分成n份,每个Reduce对应一份。

@Override
public void write(K key, V value) throws IOException, InterruptedException {
  collector.collect(key, value,
                    partitioner.getPartition(key, value, partitions));
}

从上面的write(K key, V value)方法可以看出,每条数据都会计算它归属的分区。然后写入MapOutputCollector<K,V> collector 中。
具体实现为:org.apache.hadoop.mapred.MapTask.MapOutputBuffer
MapOutputBuffer内部是一个数组,在逻辑上是一个环形数组,这个就是MapReduce的环形缓冲区。

每条数据分为两部分写入MapOutputBuffer:
①真实的数据(Key-Value值)

// 写入key值
keySerializer.serialize(key);
// 写入value值
valSerializer.serialize(value);

②数据的元信息:属于哪个分区、key值在buffer中的起始位置、value值在buffer中的起始位置、value值的长度。这部分元信息起到索引的作用。

// 写入分区信息
kvmeta.put(kvindex + PARTITION, partition);
// 写入key值在buffer中的起始位置
kvmeta.put(kvindex + KEYSTART, keystart);
// 写入value值在buffer中的起始位置
kvmeta.put(kvindex + VALSTART, valstart);
// 写入value值的长度
kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
数据的存储 逻辑上的数据存储

环形缓存区的大小是有限的,当环形缓存区使用量达到阈值(默认80%)时,会将环形缓存区中的数据溢写到磁盘里(不是HDFS)。

org.apache.hadoop.mapred.MapTask.MapOutputBuffer#sortAndSpill

每次溢写都会生成类似的文件:sipll0.out ,文件里的数据是按照partition划分好的,每个partition中的数据都是已排序的。


溢写文件

当Map阶段数据处理全部完成,因为还有数据保存在环形缓冲区中,所有在结束的时候必定会产生一次溢写,将数据写入磁盘。

output.close(mapperContext);

@Override
public void close(TaskAttemptContext context
                  ) throws IOException,InterruptedException {
  try {
    collector.flush();
  } catch (ClassNotFoundException cnf) {
    throw new IOException("can't find class ", cnf);
  }
  collector.close();
}

由于最终的溢写文件>=1个,所以最后还需要把所有的溢写文件合并成一个。

// org.apache.hadoop.mapred.MapTask.MapOutputBuffer#flush
mergeParts();
merge

最终Map阶段会生成一个按分区划分,分区内已排序的数据文件,然后将数据文件交给Shuffle HTTP Server。

8、完成以上操作后,Map进程向MRAppMaster进程汇报任务结束,然后退出进程。

Map阶段生成的数据文件交给了谁管理?

交给了Shuffle HTTP Server服务来管理。这个服务是由NodeManager启动的。使用的是Yarn的AuxServices机制,NodeManager允许用户通过配置附属服务的方式扩展自己的功能,这使得每个节点可以定制一些特定框架需要的服务。附属服务需要在NM启动前配置好,并由NM统一启动和关闭。典型的应用是MapReduce框架中用到的Shuffle HTTP Server,其通过封装成一个附属服务由各个NodeManager启动。后续Reduce阶段也是从Shuffle HTTP Server来拉取数据的。

上一篇下一篇

猜你喜欢

热点阅读