Hadoop之HDFS详解(三)

2019-02-28  本文已影响0人  RalapHao

简介

  1. 分布式文件存储,文件在物理上是分块存储(block),可以通过dfs.blocksizes设定,2.x默认是128M,老版本是64M;
  2. 为客户端提供统一的抽象目录树,客户端通过路径来访问;
  3. 目录结构及分块信息(元数据)由namenode节点承担,namenode是HDFS集群的主节点,维护整个目录结构block块信息以及datanode信息;
  4. 文件的各个block的存储管理由datanode负责,DataNode是HDFS的从节点,每一个block都可以在多个DataNode上存储多个副本;
  5. 一次写入,多次读取,不支持修改;

注意:HDFS适合做数据分析,并不适合做网盘应用,不便修改,延迟 大,网络开销大,成本太高。

常用Shell命令

1. 显示目录信息
   hadoop fs -sl /
2. 创建目录
  hadoop fs -mkdir -p /dir1/dir2
3. 本地截切粘贴
   hadoop fs -moveFromLocal  /root/1.txt  /dir1/dir2
4. 追加文件到已存在文件末尾
  hadoop fs -appendToFile  /root/2.txt /dir1/dir2/1.txt
5. 查看内容
  hadoop fs -cat  /dir1/dir2/1.txt
  hadoop fs -tail  /dir1/dir2/1.txt
  hadoop fs -text /dir1/dir2/1.txt
6. 拷贝到HDFS
  hadoop fs -copyFromLocal ./1.txt  /dir1/dir2
  hadoop fs -put ./1.txt  /dir1/dir2
7. HDFS内部拷贝
  hadoop fs -cp /dir1/dir2/1.txt /dir1/dir2/2.txt
8. HDFS内部移动
  hadoop fs -mv ./2.txt /
9. 从HDFS拷贝到本地
  hadoop fs -copyToLocal  /2.txt
  hadoop fs -get  /2.txt
10. 合并下载多个文件
  hadoop fs -getmerage /dir1/dir2/1.txt /2.txt
11. 删除文件或文件夹
  hadoop fs -rm -r  /dir1/dir2
  hadoop fs -rmdir /dir1/dir2(删除空文件夹)
12. 统计文件系统可用空间
  hadoop fs -df -h /
  hadoop fs -du  -h /dir1/dir2
13. 统计一个指定目录下的文件节点数据量
  hadoop fs -count  /dir1/
14. 设置HDFS中文件的副本数量
  hadoop fs -setrep 3 /dir1/dir2/1.txt

HDFS 工作机制

  1. 文件写入过程
    客户端向HDFS写数据,首先要与NameNode通信,以确认可以写文件并获取DataNode节点信息,然后客户端根据DateNode信息按顺序上传数据,DataNode将Block副本复制到其他DataNode;
    1. 与NameNode 通信,请求上传数据,NameNode检测是否存在、父目录是否存在;
    2. 返回确认信息;
    3. 客户端请求NameNode,block1上传到那些DataNode;
    4. NameNode返回DataNode信息
    5. 客户端根据返回的DateNode信息,请求其中一台,上传数据(本质是RPC调用,建立pipeline),A到B,B到C根据副本数量依次类推,完成后逐级返回客户端。
    6. 客户端上传block,以packet为单位,A到B,B到C,A每传一个packet都会放入应答队列,等待应答。
    7. 当一个block传输完成,客户端上传第二个block。
  2. 文件读取流程
    客户端将要获取的文件路径发送给NameNode,NameNode获取文件的元数据信息(Block存放位置)返回给客户端,客户端通过相应信息找到DataNode,逐个获取文件的block并在客户端本地进行数据追加合并,从而得到完整文件。
    1. 通过NameNode获取元数据信息,找到文件块所在DataNode。
    2. 挑选一台DataNode(就近原则,然后随机),建立Socket流。
    3. datanode开始发送数据(从磁盘读取数据放入流,以packet为单位来做校验);
    4. 客户端以packet为单位接收,先在本地缓存,然后写入目标文件

NameNode机制

  1. 元数据存储机制
    1. 内存数据(完整的元数据信息)
    2. 磁盘元数据镜像文件(“准完整”,在namenode的工作目录)
    3. 数据操作日志文件(可通过日志文件运算出元数据),用于衔接内存和持久化元数据镜像的操作日志(edits文件),
      注:当客户端新增,操作日志首先被记入edits文件,当客户端操作成功,相应的元数据会更新到内存中。
  2. 元数据手动查看
    可以通过hdfs的一个工具来查看edits中的信息
bin/hdfs oev -i edits -o edits.xml

bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml
  1. 元数据的checkpoint
    每隔一段时间,会由secondary namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行merage(这个过程称作checkpoint)
  2. checkpoint操作的触发条件配置参数
dfs.namenode.checkpoint.check.period=60  #检查触发条件是否满足的频率,60秒
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
#以上两个参数做checkpoint操作时,secondary namenode的本地工作目录
dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}

dfs.namenode.checkpoint.max-retries=3  #最大重试次数
dfs.namenode.checkpoint.period=3600  #两次checkpoint之间的时间间隔3600秒
dfs.namenode.checkpoint.txns=1000000 #两次checkpoint之间最大的操作记录

  1. checkpoint的附带作用
    namenode和secondary namenode的工作目录存储结构完全相同,所以,当namenode故障退出需要重新恢复时,可以从secondary namenode的工作目录中将fsimage拷贝到namenode的工作目录,以恢复namenode的元数据

DataNode工作机制

  1. 工作职责
    存储用户的文件块数据
    定期向namenode汇报自身所持有的block信息,(通过心跳信息上报)
  2. Datanode掉线判断时限参数
    datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:
    timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。
    而默认的heartbeat.recheck.interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。
    需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。所以,举个例子,如果heartbeat.recheck.interval设置为5000(毫秒),dfs.heartbeat.interval设置为3(秒,默认),则总的超时时间为40秒。
 <property>
       <name>heartbeat.recheck.interval</name>
       <value>2000</value>
</property>
<property>
       <name>dfs.heartbeat.interval</name>
       <value>1</value>
</property>

Java API

  1. HDFS客户端依赖
<dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-client</artifactId>
 <version>2.6.1</version>
</dependency>

注意:org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security .AccessControlException: Permission denied
没有权限访问:core-site.xml添加

```
<property>

  <name>hadoop.security.authorization</name>

  <value>false</value>

</property>
2. windows上开发注意
   1. 解压hadoop安装包;
   2. 将安装包下的lib和bin目录用对应windows版本平台编译的本地库替换
   3. 在window系统中配置HADOOP_HOME指向你解压的安装包
   4. 在windows系统的path变量中加入hadoop的bin目录
3. 代码
 ```
 //创建连接
 Configuration conf = new Configuration();
 //配置连接服务器
 conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");
 //副本数量 :参数优先级: 1、客户端代码中设置的值 2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置
 conf.set("dfs.replication", "3");
 //获取Hdfs客户端
 FileSystem fs = FileSystem.get(conf);

// 如果这样去获取,那conf里面就可以不要配"fs.defaultFS"参数,而且,这个客户端的身份标识已经是hadoop用户
 FileSystem fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");

@Test
 public void testAddFileToHdfs() throws Exception {

     // 要上传的文件所在的本地路径
     Path src = new Path("g:/redis-recommend.zip");
     // 要上传到hdfs的目标路径
     Path dst = new Path("/aaa");
     fs.copyFromLocalFile(src, dst);
     fs.close();
 }

 /**
  * 从hdfs中复制文件到本地文件系统
  * 
  * @throws IOException
  * @throws IllegalArgumentException
  */
 @Test
 public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
     fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));
     fs.close();
 }

 @Test
 public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {

     // 创建目录
     fs.mkdirs(new Path("/a1/b1/c1"));

     // 删除文件夹 ,如果是非空文件夹,参数2必须给值true
     fs.delete(new Path("/aaa"), true);

     // 重命名文件或文件夹
     fs.rename(new Path("/a1"), new Path("/a2"));

 }

 /**
  * 查看目录信息,只显示文件
  * 
  * @throws IOException
  * @throws IllegalArgumentException
  * @throws FileNotFoundException
  */
 @Test
 public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {

     // 思考:为什么返回迭代器,而不是List之类的容器
     RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

     while (listFiles.hasNext()) {
         LocatedFileStatus fileStatus = listFiles.next();
         System.out.println(fileStatus.getPath().getName());
         System.out.println(fileStatus.getBlockSize());
         System.out.println(fileStatus.getPermission());
         System.out.println(fileStatus.getLen());
         BlockLocation[] blockLocations = fileStatus.getBlockLocations();
         for (BlockLocation bl : blockLocations) {
             System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
             String[] hosts = bl.getHosts();
             for (String host : hosts) {
                 System.out.println(host);
             }
         }
         System.out.println("--------------为angelababy打印的分割线--------------");
     }
 }

 /**
  * 查看文件及文件夹信息
  * 
  * @throws IOException
  * @throws IllegalArgumentException
  * @throws FileNotFoundException
  */
 @Test
 public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {

     FileStatus[] listStatus = fs.listStatus(new Path("/"));

     String flag = "d--             ";
     for (FileStatus fstatus : listStatus) {
         if (fstatus.isFile())  flag = "f--         ";
         System.out.println(flag + fstatus.getPath().getName());
     }

通过流的方式访问hdfs

@Test
 public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
     
     //先获取一个文件的输入流----针对hdfs上的
     FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
     
     //再构造一个文件的输出流----针对本地的
     FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
     
     //再将输入流中数据传输到输出流
     IOUtils.copyBytes(in, out, 4096);
     
     
 }
 
 
 /**
  * hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度
  * 用于上层分布式运算框架并发处理数据
  * @throws IllegalArgumentException
  * @throws IOException
  */
 @Test
 public void testRandomAccess() throws IllegalArgumentException, IOException{
     //先获取一个文件的输入流----针对hdfs上的
     FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
     
     
     //可以将流的起始偏移量进行自定义
     in.seek(22);
     
     //再构造一个文件的输出流----针对本地的
     FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
     
     IOUtils.copyBytes(in,out,19L,true);
     
 }
 
 
 
 /**
  * 显示hdfs上文件的内容
  * @throws IOException 
  * @throws IllegalArgumentException 
  */
 @Test
 public void testCat() throws IllegalArgumentException, IOException{
     
     FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
     
     IOUtils.copyBytes(in, System.out, 1024);
 }


 ```
上一篇下一篇

猜你喜欢

热点阅读