HDFS文件

2017-11-12  本文已影响0人  Mervey

一、操作方式

Hadoop支持的文件系统由很多(见下图),HDFS只是其中一种实现。Java抽象类org.apache.hadoop.fs.FileSystem定义了Hadoop中一个文件系统的客户端接口,并且该抽象类有几个具体实现。Hadoop一般使用URI(下图)方案来选取合适的文件系统实例进行交互。

比如想要列出本地文件系统根目录下的文件,可以输入命令 hadoop fs -ls file:///

Hadoop支持的文件系统

特别的,HDFS文件系统的操作可以使用 FsSystem shell 、客户端(http rest api、Java api、C api等)。

FsSystem shell

FsSystem shell 的用法基本同本地shell类似,命令可参考 FsSystem shell

JAVA

Hadoop是用Java写的,通过Java Api(FileSystem类)可以调用大部分Hadoop文件系统的交互操作。更详细的介绍可参考 hadoop Filesystem

HTTP

非Java开发的应用可以使用由WebHDFS协议提供的HTTP REST API,但是HTTP比原生的Java客户端要慢,所以不到万不得已尽量不要使用HTTP传输特大数据。通过HTTP来访问HDFS有两种方法:

两种如图

Http访问

在第一种情况中,namenode和datanode内嵌的web服务作为WebHDFS的端节点运行(是否启用WebHDFS可通过dfs.webhdfs.enabled设置,默认为true)。文件元数据在namenode上,文件读写操作首先被发往namenode,有namenode发送一个HTTP重定向至某个客户端,指示以流的方式传输文件数据的目的或源datanode。

第二种方法依靠一个或多个独立代理服务器通过HTTP访问HDFS。所有集群的网络通信都需要通过代理,因此客户端从来不直接访问namenode或datanode。使用代理后可以使用更严格的防火墙策略和带宽策略。

HttpFs代理提供和WebHDFS相同的HTTP接口,这样客户端能够通过webhdfs URI访问接口。HttpFS代理启动独立于namenode和datanode的守护进程,使用httpfs.sh 脚本,默认在一个不同的端口上监听(14000)。

二、文件读取

数据流解析

下图描述了

读文件时客户端与 HDFS 中的 namenode, datanode 之间的数据流动。

客户端从HDFS读数据

对上图的解释如下:

在读取过程中, 如果 FSDataInputStream 在和一个 datanode 进行交流时出现了一个错误,他就去试一试下一个最接近的块,他当然也会记住刚才发生错误的 datanode 以至于之后不会再在这个 datanode 上进行没必要的尝试。 DFSInputStream 也会在 datanode 上传输出的数据上核查检查数(checknums).如果损坏的块被发现了,DFSInputStream 就试图从另一个拥有备份的 datanode 中去读取备份块中的数据。

在这个设计中一个重要的方面就是客户端直接从 datanode 上检索数据,并通过 namenode 指导来得到每一个块的最佳 datanode。这种设计允许 HDFS 扩展大量的并发客户端,因为数据传输只是集群上的所有 datanode 展开的。期间,namenode 仅仅只需要服务于获取块位置的请求(块位置信息是存放在内存中,所以效率很高)。如果不这样设计,随着客户端数据量的增长,数据服务就会很快成为一个瓶颈。

集群网络拓扑

我们知道,相对于客户端(之后就是 mapreduce task 了),块的位置有以下可能性:

我们认为他们对于客户端的带宽递减,距离递增(括号中表示距离)。示意图如下:

网络拓扑

如果集群中的机器都在同一个机架上,我们无需其他配置,若集群比较复杂,由于hadoop无法自动发现网络拓扑,所以需要额外配置网络拓扑。

读取举例(Java)

基本读取程序,将文件内容输出到console

FileSystemCat

// cc FileSystemCat Displays files from a Hadoop filesystem on standard output by using the FileSystem directly
import java.io.InputStream;
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
 
// vv FileSystemCat 
public class FileSystemCat {
 
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    InputStream in = null;
    try {
      in = fs.open(new Path(uri));
      IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
      IOUtils.closeStream(in);
    }
  }
}
// ^^ FileSystemCat

随机读取

// cc FileSystemDoubleCat Displays files from a Hadoop filesystem on standard output twice, by using seek
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
 
// vv FileSystemDoubleCat
public class FileSystemDoubleCat {
 
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    FSDataInputStream in = null;
    try {
      in = fs.open(new Path(uri));
      IOUtils.copyBytes(in, System.out, 4096, false);
      in.seek(0); // go back to the start of the file
      IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
      IOUtils.closeStream(in);
    }
  }
}
// ^^ FileSystemDoubleCat

展开原码

三、文件写入

数据流解析

下图描述了写文件时客户端与 HDFS 中的 namenode, datanode 之间的数据流动。

写文件

对上图的解释如下:

如果在任何一个 datanode 在写入数据的时候失败了,接下来所做的一切对客户端都是透明的:首先, pipeline 被关闭,在确认队列中的剩下的包会被添加进数据队列的起始位置上,以至于在失败的节点下游的任 何节点都不会丢失任何的包。然后与 namenode 联系后,当前在一个好的 datanode 会联系 namenode, 给失败节点上还未写完的块生成一个新的标识ID, 以至于如果这个失败的 datanode 不久后恢复了,这个不完整的块将会被删除。失败节点会从 pipeline 中移除,然后剩下两个好的 datanode 会组成一个的新的 pipeline ,剩下的 这些块的包(也就是刚才放在数据队列队首的包)会继续写进 pipeline中好的 datanode 中。最后,namenode 注意到块备份数小于规定的备份数,他就安排在另一个节点上创建完成备份,直接从已有的块中复制就可以。然后一直到满足了备份数(dfs.replication)。如果有多个节点的写入失败了,如果满足了最小备份数的设置(dfs.namenode.repliction.min),写入也将会成功,然后剩下的备份会被集群异步的执行备份,直到满足了备份数(dfs.replication)。

写入举例(Java)

// cc FileCopyWithProgress Copies a local file to a Hadoop filesystem, and shows progress
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
 
// vv FileCopyWithProgress
public class FileCopyWithProgress {
  public static void main(String[] args) throws Exception {
    String localSrc = args[0];
    String dst = args[1];
     
    InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
     
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(dst), conf);
    OutputStream out = fs.create(new Path(dst), new Progressable() {
      public void progress() {
        System.out.print(".");
      }
    });
     
    IOUtils.copyBytes(in, out, 4096, true);
  }
}
// ^^ FileCopyWithProgress

四、其他文件操作

创建目录

创建目录

public blooean mkdir(Path f) throws IOException

查看文件元数据

// cc ShowFileStatusTest Demonstrates file status information
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
import java.io.*;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.*;
 
// vv ShowFileStatusTest
public class ShowFileStatusTest {
   
  private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing
  private FileSystem fs;
 
  @Before
  public void setUp() throws IOException {
    Configuration conf = new Configuration();
    if (System.getProperty("test.build.data") == null) {
      System.setProperty("test.build.data", "/tmp");
    }
    cluster = new MiniDFSCluster.Builder(conf).build();
    fs = cluster.getFileSystem();
    OutputStream out = fs.create(new Path("/dir/file"));
    out.write("content".getBytes("UTF-8"));
    out.close();
  }
   
  @After
  public void tearDown() throws IOException {
    if (fs != null) { fs.close(); }
    if (cluster != null) { cluster.shutdown(); }
  }
   
  @Test(expected = FileNotFoundException.class)
  public void throwsFileNotFoundForNonExistentFile() throws IOException {
    fs.getFileStatus(new Path("no-such-file"));
  }
   
  @Test
  public void fileStatusForFile() throws IOException {
    Path file = new Path("/dir/file");
    FileStatus stat = fs.getFileStatus(file);
    assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
    assertThat(stat.isDirectory(), is(false));
    assertThat(stat.getLen(), is(7L));
    assertThat(stat.getModificationTime(),
        is(lessThanOrEqualTo(System.currentTimeMillis())));
    assertThat(stat.getReplication(), is((short) 1));
    assertThat(stat.getBlockSize(), is(128 * 1024 * 1024L));
    assertThat(stat.getOwner(), is(System.getProperty("user.name")));
    assertThat(stat.getGroup(), is("supergroup"));
    assertThat(stat.getPermission().toString(), is("rw-r--r--"));
  }
   
  @Test
  public void fileStatusForDirectory() throws IOException {
    Path dir = new Path("/dir");
    FileStatus stat = fs.getFileStatus(dir);
    assertThat(stat.getPath().toUri().getPath(), is("/dir"));
    assertThat(stat.isDirectory(), is(true));
    assertThat(stat.getLen(), is(0L));
    assertThat(stat.getModificationTime(),
        is(lessThanOrEqualTo(System.currentTimeMillis())));
    assertThat(stat.getReplication(), is((short) 0));
    assertThat(stat.getBlockSize(), is(0L));
    assertThat(stat.getOwner(), is(System.getProperty("user.name")));
    assertThat(stat.getGroup(), is("supergroup"));
    assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
  }
   
}
// ^^ ShowFileStatusTest

查看文件状态

// cc ListStatus Shows the file statuses for a collection of paths in a Hadoop filesystem
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
 
// vv ListStatus
public class ListStatus {
 
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
     
    Path[] paths = new Path[args.length];
    for (int i = 0; i < paths.length; i++) {
      paths[i] = new Path(args[i]);
    }
     
    FileStatus[] status = fs.listStatus(paths);
    Path[] listedPaths = FileUtil.stat2Paths(status);
    for (Path p : listedPaths) {
      System.out.println(p);
    }
  }
}
// ^^ ListStatus

文件模式匹配

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
 
public class DateRangePathFilter implements PathFilter {
   
  private final Pattern PATTERN = Pattern.compile("^.*/(\\d\\d\\d\\d/\\d\\d/\\d\\d).*$");
   
  private final Date start, end;
 
  public DateRangePathFilter(Date start, Date end) {
    this.start = new Date(start.getTime());
    this.end = new Date(end.getTime());
  }
   
  public boolean accept(Path path) {
    Matcher matcher = PATTERN.matcher(path.toString());
    if (matcher.matches()) {
      DateFormat format = new SimpleDateFormat("yyyy/MM/dd");
      try {
        return inInterval(format.parse(matcher.group(1)));
      } catch (ParseException e) {
        return false;
      }
    }
    return false;
  }
 
  private boolean inInterval(Date date) {
    return !date.before(start) && !date.after(end);
  }
 
}

文件删除

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
 
public class FileSystemDeleteTest {
 
  private FileSystem fs;
   
  @Before
  public void setUp() throws Exception {
    fs = FileSystem.get(new Configuration());
    writeFile(fs, new Path("dir/file"));
  }
   
  private void writeFile(FileSystem fileSys, Path name) throws IOException {
    FSDataOutputStream stm = fileSys.create(name);
    stm.close();
  }
   
  @Test
  public void deleteFile() throws Exception {
    assertThat(fs.delete(new Path("dir/file"), false), is(true));
    assertThat(fs.exists(new Path("dir/file")), is(false));
    assertThat(fs.exists(new Path("dir")), is(true));
    assertThat(fs.delete(new Path("dir"), false), is(true));
    assertThat(fs.exists(new Path("dir")), is(false));
  }
 
  @Test
  public void deleteNonEmptyDirectoryNonRecursivelyFails() throws Exception {
    try {
      fs.delete(new Path("dir"), false);
      fail("Shouldn't delete non-empty directory");
    } catch (IOException e) {
      // expected
    }
  }
   
  @Test
  public void deleteDirectory() throws Exception {
    assertThat(fs.delete(new Path("dir"), true), is(true));
    assertThat(fs.exists(new Path("dir")), is(false));
  }
   
}

五、文件压缩

压缩格式

文件压缩有两大好处:

Hadoop 对于压缩格式的是自动识别。如果我们压缩的文件有相应压缩格式的扩展名(比如 lzo,gz,bzip2 等)。Hadoop 会根据压缩格式的扩展名自动选择相对应的解码器来解压数据,此过程完全是 Hadoop 自动处理,我们只需要确保输入的压缩文件有扩展名。

Hadoop中有多种压缩格式、算法和工具,下图列出了常用的压缩方法。

压缩格式

表中的“是否可切分”表示对应的压缩算法是否支持切分,也就是说是否可以搜索数据流的任意位置并进一步往下读取数据,可切分的压缩格式尤其适合MapReduce。

比较

所有的压缩算法都需要权衡空间/时间:压缩和解压缩速度更快,其代价通常是只能节省少量的空间。不同的压缩工具有不同的特性:

更详细的比较如下

1.压缩性能比较


性能

2.优缺点

优缺点

另外使用hadoop原生(native)类库比其他java实现有更快的压缩和解压缩速度。特征比较如下:

特征

使用容器文件格式结合压缩算法也能更好的提高效率。顺序文件、Arvo文件、ORCFiles、Parqurt文件同时支持压缩和切分。

压缩举例(Java)

压缩

// cc PooledStreamCompressor A program to compress data read from standard input and write it to standard output using a pooled compressor
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.ReflectionUtils;
 
// vv PooledStreamCompressor
public class PooledStreamCompressor {
 
  public static void main(String[] args) throws Exception {
    String codecClassname = args[0];
    Class<?> codecClass = Class.forName(codecClassname);
    Configuration conf = new Configuration();
    CompressionCodec codec = (CompressionCodec)
      ReflectionUtils.newInstance(codecClass, conf);
    /*[*/Compressor compressor = null;
    try {
      compressor = CodecPool.getCompressor(codec);/*]*/
      CompressionOutputStream out =
        codec.createOutputStream(System.out, /*[*/compressor/*]*/);
      IOUtils.copyBytes(System.in, out, 4096, false);
      out.finish();
    /*[*/} finally {
      CodecPool.returnCompressor(compressor);
    }/*]*/
  }
}
// ^^ PooledStreamCompressor

解压缩

// cc FileDecompressor A program to decompress a compressed file using a codec inferred from the file's extension
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
// vv FileDecompressor
public class FileDecompressor {
 
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
     
    Path inputPath = new Path(uri);
    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    CompressionCodec codec = factory.getCodec(inputPath);
    if (codec == null) {
      System.err.println("No codec found for " + uri);
      System.exit(1);
    }
 
    String outputUri =
      CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
 
    InputStream in = null;
    OutputStream out = null;
    try {
      in = codec.createInputStream(fs.open(inputPath));
      out = fs.create(new Path(outputUri));
      IOUtils.copyBytes(in, out, conf);
    } finally {
      IOUtils.closeStream(in);
      IOUtils.closeStream(out);
    }
  }
}
// ^^ FileDecompressor

六、文件序列化

序列化是指将结构化数据转换为字节流以便在网络上传输或写到磁盘进行永久存储。反序列化狮子将字节流转换回结构化对象的逆过程。

序列化用于分布式数据处理的两大领域:进程间通信和永久存储。

对序列化的要求时是格式紧凑(高效使用存储空间)、快速(读写效率高)、可扩展(可以透明地读取老格式数据)且可以互操作(可以使用不同的语言读写数据)。

Hadoop使用的是自己的序列化格式Writable,它绝对紧凑、速度快,但不太容易用java以外的语言进行扩展或使用。

当然,用户也可以使用其他序列化框架或者自定义序列化方式,如Avro框架。

Hadoop内部还使用了Apache ThriftProtocal Buffers 来实现RPC和数据交换。

上一篇下一篇

猜你喜欢

热点阅读