大数据,机器学习,人工智能大数据大数据 爬虫Python AI Sql

大数据入门(Hadoop生态系统)

2019-04-13  本文已影响2人  董二弯

Hadoop生态系统为大数据领域提供了开源的分布式存储和分布式计算的平台,这一章我们进行Hadoop生态系统的入门学习,介绍其中分布式文件系统HDFS、分布式资源调度YARN、分布式计算框架MapReduce(包含Spark的入门以及和MapReduce的比较),最后通过Spring Boot集成Hadoop来访问文件系统。

大数据的应用

本人喜欢体育运动,以体育中来举列子。

足球点球大战

2006年世界杯中德国队和阿根廷队的点球大战中,德国队守门员教练科普克,给了门将莱曼一张纸条,莱曼看过纸条将计就计。阿根廷队的每个点球,几乎都被莱曼判断对了方向,并成功扑出坎比亚索和阿亚拉的点球,帮助德国队打进四强。而这张纸条就是根据数据分析记录了阿根廷队员点球的习惯方向。

金州勇士的崛起

如今越来越多的篮球队开始重视和应用篮球大数据,比如NBA中的金州勇士队。勇士队曾长期以来一直是NBA里最烂的球队之一,2009年它的成绩排名倒数第二。没有任何执教NBA经验的史蒂夫·科尔,因突出的投篮优势被委任为教练。科尔在执掌勇士队之后,坚持用数据说话而不是凭经验。他根据数据工程师对历年来NBA比赛的统计,发现最有效的进攻是眼花缭乱的传球和准确的投篮,而不是彰显个人能力的突破和扣篮。在这个思想的指导下,勇士队队员苦练神投技。这其中最亮眼的新打法是尽可能地从24英尺(大约7.3米)外的三分线投篮,这样可以得3分。于是开发了小球时代,在2015-2018摘下三枚总冠军,成功成为NBA的霸主。

大数据的基本概率

有人说大数据的特点就是数据量大,这个不是非常的正确,数据量大不是关键,通过数据分析在数据中提取出价值,最终带来商业上的利益,这才是大数据分析的最终目标。大数据有4个特点,一般我们称之为4V,分别为:

大数据涉及到的技术

分布式文件系统HDFS

HDFS概述及设计目标

HDFS源于Google的GFS论文,设计目标为

HDFS架构

HDFS是一种master/slave的架构。一个HDFS集群包含一个唯一的NameNode(NN),这个master server管理着整个文件系统的命名空间并且调节客户端对文件的访问。同时,还拥有一系列的DataNode(DN),每个都管理着他们运行的对应节点的数据存储。HDFS提供了一个文件系统的命名空间同时允许用户将数据存在这些文件上。通常,一个文件被拆分成一个或多个数据块,并且这些数据块被保存在一系列的DataNode上。NameNode执行文件系统的命名空间的相关操作比如打开、关闭、重命名目录或者文件。同时决定了数据块到DataNode的映射。DataNode为客户端的读取写入需求提供服务,同时处理NameNode发来的数据块的创建、删除、复制等需求。


image.png

HDFS副本机制

在前面说过HDFS使用相对廉价的计算机,那么宕机就是一种必然事件,我们需要让数据避免丢失,就只有采取冗余数据存储,而具体的实现就是副本机制。具体为把一个文件分为很多的块,一个块默认为128M,而这些块是以多副本的形式存储。比如存储三个副本:

HDFS环境搭建

Apache Hadoop 有版本管理混乱,部署过程繁琐、升级过程复杂,兼容性差等缺点,而CDH是Hadoop众多分支中的一种,由Cloudera维护,基于稳定版本的Apache Hadoop构建。使用CDH可以避免在使用过程中的依赖包冲突问题,对版本的升级也很方便,所以我们使用Hadoop-2.6.0-cdh5.7.0版本进行安装。本人是使用虚拟机进行伪分布式模式的搭建,即在一台机器上安装,集群模式其实和伪分布式模式差不太多。

//把压缩包jdk-8u181-linux-i586.tar.gz上传到虚拟机
rz 选择文件上传
//在根目录下建立解压文件夹
 mkdir apps
//解压jdk到apps目录
tar -zxvf jdk-8u181-linux-i586.tar.gz -C ~/apps/
//添加环境变量
vi ~/.bash_profile
//在编辑模式添加以下内容
export JAVA_HOME=/root/apps/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$PATH
//编辑完成后,保存退出
//使配置生效
source ~/.bash_profile
//验证
java -version
//若出现版本信息表示安装成功
openjdk version "1.8.0_181"
OpenJDK Runtime Environment (build 1.8.0_181-b13)
OpenJDK 64-Bit Server VM (build 25.181-b13, mixed mode)
//安装SSH
sudo yum install ssh
//生成密钥文件
ssh-keygen -t rsa
//配置免密钥登录:复制公钥到authorized_keys,因为NameNode(NN)需要连接访问DataNode(DN),配置后可直接访问不用登录,在集群环境下需要复制到其它节点。
cp ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys 
下载:直接去cdh网站下载 [http://archive.cloudera.com/cdh5/cdh/5/]
//解压
tar -zxvf hadoop-2.6.0-cdh5.7.0.tar.gz -C ~/app
//配置环境变量
export HADOOP_HOME=/root/apps/hadoop
export PATH=$HADOOP_HOME/bin:$PATH
//hadoop配置文件的修改(hadoop_home/etc/hadoop/)
hadoop-env.sh:
export JAVA_HOME=/root/apps/jdk1.8.0_181

core-site.xml:
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://192.168.30.130:8092</value>//配置NameNode(NN)地址
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/home/hadoop/app/tmp</value>//因为HDFS默认是存储在零时文件中,关机后会丢失,这个配置持久化文件
    </property>

hdfs-site.xml:
    <property>
        <name>dfs.replication</name>//设置副本系数,这里因为只有一个节点,所以设置为1
        <value>1</value>
    </property>

slaves:
    localhost
在里面配置节点,现配置为虚拟机本地。如果为集群模式,则需要在里面配置其它节点的地址。

启动hdfs:
//格式化文件系统(仅第一次执行即可,不要重复执行)
hdfs/hadoop namenode -format
//启动
hdfs: sbin/start-dfs.sh
//验证是否启动成功:
jps //该命令会有以下三个进程
    DataNode
    SecondaryNameNode
    NameNode
浏览器访问方式: http://虚拟机地址:50070 有hdfs主界面
//停止hdfs
sbin/stop-dfs.sh 

HDFS shell

HDFS shell的命令有很多,输入hadoop fs回车,会有有多命令的提示。

Usage: hadoop fs [generic options]
    [-appendToFile <localsrc> ... <dst>]
    [-cat [-ignoreCrc] <src> ...]
    [-checksum <src> ...]
    [-chgrp [-R] GROUP PATH...]
    [-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
    [-chown [-R] [OWNER][:[GROUP]] PATH...]
    [-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>]
    [-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
    [-count [-q] [-h] [-v] <path> ...]
    [-cp [-f] [-p | -p[topax]] <src> ... <dst>]
    [-createSnapshot <snapshotDir> [<snapshotName>]]
    [-deleteSnapshot <snapshotDir> <snapshotName>]
    [-df [-h] [<path> ...]]
    [-du [-s] [-h] <path> ...]
    [-expunge]
    [-find <path> ... <expression> ...]
    [-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
    [-getfacl [-R] <path>]
    [-getfattr [-R] {-n name | -d} [-e en] <path>]
    [-getmerge [-nl] <src> <localdst>]

这里介绍HDFS shell常用命令的使用(其实和Linux命令相似)

//查看文件目录ls
hadoop fs -ls /  查看根目录
//建立目录mkdir
hadoop fs -mkdir 文件目录
//上传文件 put
hadoop fs -put 本地文件路径  hdfs文件路径
//get 下载文件到本地
hadoop fs -get hdfs文件路径
//删除文件/文件夹rm
hadoop fs -rm 文件
hadoop fs -rm -R 文件夹  //级联删除

Java API操作HDFS

IDEA+Maven创建Java工程
添加HDFS相关依赖

<properties>
        <java.version>1.8</java.version>
        <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
        </dependency>
    </dependencies>

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>http://repository.cloudera.com/artifactory/cloudera-repos/</url>//hadoop仓库
        </repository>
    </repositories>

开发Java Api操作HDFS文件使用Junit测试的方式,代码如下

public class JunitTest {
    FileSystem fileSystem = null;
    Configuration configuration = null;
    public static final String URL = "hdfs://192.168.30.130:8092";
    @Before
    public void setUp() throws IOException, InterruptedException {
        //初始化配置
        configuration = new Configuration();
        //初始化FileSystem,其中包含了文件操作的函数,其中root为虚拟机用户名
        fileSystem = FileSystem.get(URI.create(URL),configuration,"root");
    }

    @Test
    public void mkdir() throws IOException {
       //创建目录
        fileSystem.mkdirs(new Path("/test"));
    }

    @Test
    public void create() throws IOException {
        //创建文件,得到输出流对象
        FSDataOutputStream fsDataOutputStream = fileSystem.create(new Path("/test/test.txt"));
        //写入文件内容
        fsDataOutputStream.write("hello dzy".getBytes());
        //关闭流
        fsDataOutputStream.close();
    }

    @After
    public void setDown() throws IOException {
        configuration.clear();
        fileSystem.close();
    }
}

在以上只是实验了几个API还有很多其它的API,感兴趣的可以继续学习。

HDFS文件读写流程

三个角色的交互 客户端/NameNode/DataNode

HDFS写流程
HDFS读流程

HDFS优缺点

优点
缺点

资源调度框架YARN

YARN概述

YARN是Hadoop 2.0中的资源管理系统,可以让不同计算框架(MapReduce\Spark等)可以共享同一个HDFS集群上的数据,享受整体的资源调度。

YARN架构

YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等几个组件构成。

YARN执行流程

YARN总体上仍然是master/slave结构,在整个资源管理框架中,resourcemanager为master,nodemanager是slave。Resourcemanager负责对各个nademanger上资源进行统一管理和调度。当用户提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的ApplicationMaster,它负责向ResourceManager申请资源,并要求NodeManger启动可以占用一定资源的任务。由于不同的ApplicationMaster被分布到不同的节点上,因此它们之间不会相互影响。

流程:

YARN环境搭建

使用的版本为hadoop-2.6.0-cdh5.7.0

//修改配置文件,配置如下(配置文件在/etc/hadoop下)
mapred-site.xml,在默认情况下是没有mapred-site.xml文件的,只有mapred-site.xml.template文件,所以在修改配置之前需要复制一份
cp mapred-site.xml.template mapred-site.xml
vi  mapred-site.xml
//添加配置
<property> 
    <name>mapreduce.framework.name</name> 
    <value>yarn</value>//配置计算框架运行在yarn之上,为固定配置
</property>

yarn-site.xml:
vi yarn-site.xml
//添加配置
<property>
     <name>yarn.nodemanager.aux-services</name>             
     <value>mapreduce_shuffle</value>//nodemanager的services,目前为固定配置,当学习Spark时,其中有动态资源调度,这里才需要修改
</property> 

//启动YARN相关的进程
sbin/start-yarn.sh

//验证 
jps 
    ResourceManager 
    NodeManager
可访问http://虚拟机IP:8088可以看到YARN的主界面。

//停止YARN相关的进程
 sbin/stop-yarn.sh

提交作业到YARN上执行

在 share目录为我们提供了MapReduce作业的案例,我们可以使用其中的作业在YARN上执行。

提交mr作业到YARN上运行,我的作业jar包路径为:
/root/apps/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar
//使用hadoop jar命令可提交作业到YARN,具体命令为
hadoop jar jar包路径  有效的程序名字 有效程序的参数

// 选用其中PI程序进行运行,在执行之前要记到启动hdfs和yarn
hadoop jar hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 2 3

//访问yarn主界面可看到作业的执行情况


image.png

分布式处理框架MapReduce/Spark

MapReduce概述

源于Google的MapReduce论文,Hadoop MapReduce是Google MapReduce的一个克隆版。MapReduce用于大规模数据集(通常大于1TB)的并行运算,实现了Map和Reduce两个功能。MapReduce的思想是“分而治之”。“分”是把复杂的任务分解为若干个“简单的任务”执行,由map负责。“简单的任务”指数据或计算规模相对于原任务要大大缩小;就近计算,即会被分配到存放了所需数据的节点进行计算;这些小任务可以并行计算,彼此间几乎没有依赖关系。Reducer负责对map阶段的结果进行汇总。
特点:

MapReduce编程模型

MapReduce编程模型给出了分布式编程方法的5个步骤:

MapReduce架构

和HDFS一样,MapReduce也是采用Master/Slave的架构
MapReduce包含四个组成部分,分别为Client、JobTracker、TaskTracker和Task

MapReduce编程

通过wordcount词频统计分析案例入门。
这个程序能够计算一个文本中相同单词出现的次数。
我们在之前演示用Java API操作HDFS的项目上编写代码。具体代码及解释如下

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * WordCount统计单词数量
 *
 * @author zhiying.dong 2019/04/09 20:50
 */
public class WordCountApp {
   /**
     * 第一步是对文件进行Map操作,继承Mapper类复写它的map方法即可
     * 参数是4个,前两个LongWritable表示文件偏移量,Text表示文件的数量
     * 后两个Text表示每一个单词,LongWritable为给每一个单词做一个赋值1的操作,而相同单词出现次数的累加在Reduce中
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        LongWritable one = new LongWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //把文件内容转换为字符串
            String line = value.toString();
            //通过空格分割文件文本(本人实验的文本单词之间是以空格分开)
            String[] words = line.split(" ");
            for (String word : words) {
                //为每个单词进行赋值操作,以map的形式输出
                context.write(new Text(word), one);
            }
        }
    }
    
     /**
     * 第二步是对文件进行Reduce操作,继承Reducer类复写它的reduce方法即可
     * 参数是4个,前两个Text表示输入map的key,在这里为单词字符串,LongWritable表示输入map的value,这里表示在上一步为每个单词的赋值
     * 后两个Text为输出map的key,这里表示每一个单词,LongWritable为输出map的value,这里表示每一个单词出现次数的累加
     */
    public static class MyReduce extends Reducer<Text, LongWritable, Text, LongWritable> {
        //注意这里reduce输入map的value为Iterable<LongWritable> 类型, 
         MapRedece为我们自动根据相同key进行了分类
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long sum = 0;
           //相同key出现次数的叠加
            for (LongWritable value : values) {
                sum += value.get();
            }
            context.write(key, new LongWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        //初始化配置
        Configuration configuration = new Configuration();
         //定义job
        Job job = Job.getInstance(configuration, "wordcount");
        //设置job的执行类
        job.setJarByClass(WordCountApp.class);
       //设置需要统计的文件路径,在执行时传入
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        //设置map操作时的相关参数
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //设置reduce操作时的相关参数
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //设置输出文件的路径,在执行时传入
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

开发完成后

编译:mvn clean package -DskipTests

上传到服务器然后运行(根据自己上传的路径进行命令参数的调整)

hadoop jar /root/hadoop/lib/hadoop-train-1.0.jar WordCountApp hdfs://192.168.30.130:9020/hello.txt  hdfs://192.168.30.130:9020/output/wc

运行完毕后可在输出文件中查看统计的结果(亲测可以成功)

注意
相同的代码和脚本再次执行,会报错

security.UserGroupInformation:
PriviledgedActionException as:hadoop (auth:SIMPLE) cause:
org.apache.hadoop.mapred.FileAlreadyExistsException: 
Output directory hdfs://92.168.30.130:9020/output/wc already exists
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: 
Output directory hdfs:/92.168.30.130:9020/output/wc already exists

在MR中,输出文件是不能事先存在的。有两种解决方式

1)先手工通过shell的方式将输出文件夹先删除
    hadoop fs -rm -r /output/wc
2) 在代码中完成自动删除功能: 推荐大家使用这种方式
    Path outputPath = new Path(args[1]);
    FileSystem fileSystem = FileSystem.get(configuration);
    if(fileSystem.exists(outputPath)){
        fileSystem.delete(outputPath, true);
        System.out.println("output file exists, but is has deleted");
    }

Spark简单入门

Spark概述

MapReduce框架局限性
Spark特点

环境搭建

//在安装之前先要安装maven和scala环境。由于在之前本人虚拟机上已经安装了maven所以跳过maven的安装。
// 解压scala安装包
tar -zxvf scala-2.11.8.tgz -C ../apps/
//配置环境变量
pwd  //查看路径
   /root/apps/scala-2.11.8
vi ~/.bash.profile
//添加配置
export SCALA_HOME=/root/apps/scala-2.11.8
export PATH=$SCALA_HOME/bin:$PATH
//使配置生效
source ~/.bash.profile
//检验是否安装成功
scala -version
//出现以下信息,说明安装成功
Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

//解压spark
tar -zxvf spark-2.1.0-bin-2.6.0-cdh5.7.0.tgz -C ../apps
//进入安装目录的bin目录,通过spark-shell可运行spark
//若不熟悉命令,可用以下方式
./spark-shell --help
--master MASTER_URL         spark://host:port, mesos://host:port, yarn, or local.
//表示在本地以两个线程启动
./spark-shell --master local[2]
local:默认一个线程
local[n]:n个线程
local[*]:全部线程

Spark编程

这里重新实现在上文实现的wordcount程序,代码如下

sc.textFile("file://root/data/hello.txt").flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_ + _).collect

没有看错,只是一行代码。因为Spark封装了很多方便的函数,相比于MapReduce开发更为方便。这也是它成为主流的原因。

Hadoop整合Spring-boot的使用

Spring Hadoop概述

Spring for hadoop提供了统一的配置模式以简化Apache Hadoop的开发,并也易于调用HDFS、Mapreduce、Pig和Hive的API。它还提供了与Spring生态圈的其他项目集成的能力,例如Spring Intergration 和Spring Batch,让你可以优雅地开发大数据的提取/导出和Hadoop工作流项目。

Spring Hadoop开发环境搭建及访问HDFS

第一步加入依赖

 <properties>
        <java.version>1.8</java.version>
        <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-hadoop-boot</artifactId>
            <version>2.5.0.RELEASE</version>
        </dependency>
    </dependencies>

    <!--配置下载的仓库-->
    <repositories>
        <repository>
            <id>cloudera</id>
            <url>http://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>

在配置文件中配置HDFS地址

spring:
  hadoop:
    fs-uri: hdfs://192.168.30.130:8092

在代码中访问HDFS

import org.apache.hadoop.fs.FileStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.data.hadoop.fs.FsShell;
import org.springframework.stereotype.Component;

/**
 * Spring Boot操作HDFS
 * @author zhiying.dong  2019/04/13 16:32
 */
@Component
public class SpringBootHadoopTest implements CommandLineRunner {
    //自动注入FsShell,其中封装了HDFS的访问函数
    @Autowired
    private FsShell fsShell;
    @Override
    public void run(String... strings) throws Exception {
        System.out.println("=========run start============");
        //获取根目录下的文件列表
        for (FileStatus fileStatus : fsShell.lsr("/")) {
            System.out.println(">" + fileStatus.getPath());
        }
        System.out.println("===========run end===========");
    }
}

执行代码后发现没有权限访问HDFS,本人解决的方式为在项目启动时模拟用户,网上有多种解决方式,可自行选择

VM options:
-DHADOOP_USER_NAME=虚拟机用户名

总结

大数据已经涉及到生活的方方面面,所以学习和了解大数据知识是很有必要的。Hadoop生态系统为大数据提供了开源的分布式存储和分布式计算的平台,而这一章只是对其中基础的知识进行了入门,后续还需要深入的学习。

上一篇下一篇

猜你喜欢

热点阅读