Hadoop学习之YARN及MapReduce

2018-01-24  本文已影响0人  发光如星_275d

YARN

YARN是Hadoop的一个资源调度框架,下面简单介绍下它的运行流程

yarn运行流程.jpg

以上是YARN的一个流程图,简单介绍

  1. Client提交任务执行请求
  2. ResourceManager接收到执行请求后,向Client返回一个作业ID,ResourceManager选择一个NodeManager分配一个Container,此NodeManager启动一个ApplicationMaster(AM)
  3. AM计算需要的资源及数据,向ResourceManager注册并申请计算所需要的资源
  4. ResourceManager分配合适的NodeManager给ApplicationMaster,ApplicationMaster与NodeManager协调分配container,启动相应的Task
  5. Application监控任务的执行情况并与Client通信,报告程序运行状态(作业运行失败,重新执行/在其他节点执行)
  6. 程序结束,相关Container及ApplicationMaster释放

MapReduce

MapReduce简介

MapReduce应用场景

MapReduce的短板

MapReduce编程模型

分为Map和Reduce阶段两部分组成

Map阶段

Map阶段由若干Map Task组成

Reduce阶段

由若干Reduce Task组成,输入为Map Task的输出

内部逻辑

以下为MapReduce简要的处理过程图


mapreduce内部逻辑.jpg
  1. 对输入数据进行分片(Split),一般以默认的block进行split,也可以自定义
  1. Mapper程序读取切分好的数据(通过InputFormat接口)
  2. Mapper程序对数据进行处理,按照key、value的形式进行整合
  3. Partitioner对Mapper输出的数据进行分区,决定由哪个Reduce进行处理
  1. 对数据进行Shuffle和Sort,分配到对应的Reducer进行处理
  2. 结果输出

编程接口

Java编程接口

WordCount

WordCount相当于普通编程语言的HelloWorld

需求:统计规律切分的大规模文本中单词出现次数

文本中单词例子:

hello   world   this    my
hh  dd  mm  dd  kk
思路:
  1. Map阶段每行数据按分隔符切分,单词作为key,1为value
  2. Reduce阶段:远程从Map阶段的输出结果copy数据,并进行整理排序,把相同的key合并,形成key对应一组value的数据集,即为reduce的输入,reduce针对每个key调用reduce方法,对value求和
编程:
  1. 编程采用Java进行,使用maven管理jar包,具体环境设置在以后有机会补充,目前请网上查看,资料也不少。需要注意的是由于众所周知的原因,请设置国内maven仓库镜像
    pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.nanri</groupId>
    <artifactId>bigdatalearn</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <executions>
                    <execution>
                        <id>default-compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <encoding>UTF-8</encoding>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

代码:具体见代码注释

package com.nanri.mapr01;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountApp {
//map阶段,输入key、value输出key、value类型
    static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //读取数据,将数据转换成字符串
            String line = value.toString();
            //拆分数据
            String[] words = line.split("\t");
            for(String word : words) {
                //输出需要序列化写出
                context.write(new Text(word), new IntWritable(1));
            }
        }
    }

    static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        // 输入:k:v1, v2, v3
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //定义一个计数器
            int count = 0;
            //遍历,将key出现的次数累加
            for(IntWritable value : values) {
                count += value.get();
            }
            context.write(key, new IntWritable(count));
        }
    }

//执行程序
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    //定义的job名称
        String jobName = args[0];
        //输入路径
        String inputPath = args[1];
        //输出路径,执行时此路径在hadoop上必须不存在
        String outputPath = args[2];
        //执行配置
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置作业名称
        job.setJobName(jobName);
        //设置具体执行类
        job.setJarByClass(WordCountApp.class);
        //mapper和reducer
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //设置mapper阶段的输出的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置reducer阶段的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //设置输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        // 作业完成退出
        System.exit(job.waitForCompletion(true)?0:1);
    }
}


代码编写完成后,进行打包,将编译好的工程jar包文件上传到服务器,运行hadoop jar bigdatalearn-1.0-SNAPSHOT.jar com.nanri.mapr01.WordCountApp wordcountapp /wordcount/input /wordcount/output,jar包后第一个参数为入口类所在的完整路径,之后参数为具体main方法中定义的参数,需确保输出路径不存在,稍等一会,即可看到给的跟踪地址及执行成功后的提示

上一篇 下一篇

猜你喜欢

热点阅读