我爱编程Hadoop

通过API访问HDFS

2018-08-08  本文已影响199人  须臾之北

通过API操作HDFS

今天的主要内容

1. HDFS获取文件系统

//获取文件系统
@Test
public void initHDFS() throws Exception{
    //1. 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fileSystem = FileSystem.get(configuration);
    
    //2. 打印文件系统到控制台
    System.out.println(fileSystem.toString());      
}

2. HDFS文件上传(测试参数优先级)

@Test
public void putFileToHdfs() throws Exception{
    Configuration conf = new Configuration();
    conf.set("dfs.replication", "2");       //代码优先级是最高的
    conf.set("fs.defaultFS", "hdfs://10.9.190.111:9000");
    FileSystem fileSystem = FileSystem.get(conf);
        
    //上传文件
    fileSystem.copyFromLocalFile(new Path("hdfs.txt"), new Path("/user/anna/hdfs/test.txt"));
    
    //关闭资源
    fileSystem.close(); 
}

参数优先级:(1)客户端代码中设置的值 >(2)classpath 下的用户自定义配置文件 > (3)然后是服务器的默认配置

3. HDFS文件下载

public void copyToLocalFile(boolean delSrc,Path src,Path dst,boolean useRawLocalFileSystem)
                 throws IOException

delSrc - whether to delete the src
src - path
dst - path
useRawLocalFileSystem - whether to use RawLocalFileSystem as local file system or not.

@Test
public void testCopyToLocalFile() throws Exception{
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://10.9.190.111:9000");
    FileSystem fileSystem = FileSystem.get(conf);
        
    ///下载文件
    fileSystem.copyToLocalFile(false,new Path("/user/anna/hdfs/test.txt"), new Path("test.txt"),true);
    
    //关闭资源
    fileSystem.close(); 
}

4. HDFS目录创建

@Test
public void testMakedir() throws Exception{
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://10.9.190.111:9000");
    FileSystem fileSystem = FileSystem.get(conf);
        
    //目录创建
    fileSystem.mkdirs(new Path("/user/anna/test/hahaha"));
    
    //关闭资源
    fileSystem.close();
}

5. HDFS文件夹删除

@Test
public void testDelete() throws Exception{
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://10.9.190.111:9000");
    FileSystem fileSystem = FileSystem.get(conf);
        
    //文件夹删除
    fileSystem.delete(new Path("/user/anna/test/hahaha"),true);         //true表示递归删除
    
    //关闭资源
    fileSystem.close();
}

6. HDFS文件名更改

@Test
public void testRename() throws Exception{
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS", "hdfs://10.9.190.111:9000");
    FileSystem fileSystem = FileSystem.get(conf);
        
    //文件名称更改
    fileSystem.rename(new Path("/user/anna/test/copy.txt"), new Path("/user/anna/test/copyRename.txt"));
    
    //关闭资源
    fileSystem.close();
}

7. HDFS文件详情查看

几种实现方法

1. public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,IOException
    * 返回FileStatus型数组

2. public FileStatus[] listStatus(Path f,PathFilter filter) throws FileNotFoundException,IOException

3. public FileStatus[] listStatus(Path[] files,PathFilter filter) throws FileNotFoundException,IOException

    * 此时注意PathFilter是一个接口,里面只有一个方法:accept,本质是对文件进行筛选

    * Enumerate all files found in the list of directories passed in, calling listStatus(path, filter) on each one.

注意:以上方法返回的文件按照字母表顺序排列

代码:FileStatus[] listStatus(Path f)

//FileStatus[] listStatus(Path f)的使用
try {
    //创建与HDFS连接
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS","hdfs://10.9.190.90:9000");

    //获得fileSystem
    FileSystem fileSystem = FileSystem.get(conf);

    //listStatus获取/test目录下信息
    FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/test"));

    //遍历输出文件夹下文件
    for(FileStatus fileStatus :fileStatuses) {
        System.out.println(fileStatus.getPath() + "  " + new Date(fileStatus.getAccessTime()) + "  " + 
            fileStatus.getBlockSize() + "  " + fileStatus.getPermission());
    }
}catch(Exception e) {
    e.printStackTrace();
}
/*
在JDK1.8中输出结果为:
----------------------------------------------------------------------------
hdfs://10.9.190.90:9000/test/hadoop-2.7.3.tar.gz  2012-07-26  134217728  rw-r--r--
hdfs://10.9.190.90:9000/test/hello.txt  2012-07-26  134217728  rw-r--r--
hdfs://10.9.190.90:9000/test/test2  1970-01-01  0  rwxr-xr-x
----------------------------------------------------------------------------
*/

代码:FileStatus[] listStatus(Path f,PathFilter filter)

try {
    //创建与HDFS连接
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS","hdfs://10.9.190.90:9000");

    //获得fileSystem
    FileSystem fileSystem = FileSystem.get(conf);

    //列出目录下后缀为.md的文件相关信息
    FileStatus[] statuses = fileSystem.listStatus(new Path("/test/test2"), new PathFilter() {

        @Override
        public boolean accept(Path path) {
            // TODO Auto-generated method stub
            String string = path.toString();

            if(string.endsWith(".md"))
                return true;
            else
                return false;
        }
    });

    //列出文件信息
    for(FileStatus status : statuses) {
        System.out.println("Path : " + status.getPath() + "  Permisson : " + status.getPermission() + 
                "  Replication : " + status.getReplication());
    }
}catch(Exception e) {
    e.printStackTrace();
}

7. 定位文件读取

8. FileSystem类的学习

FileSystem的学习

准备工作

org.apache.hadoop.fs.FileSystem简介

FileSystem方法——判断功能

  1. 方法

    • public boolean exists(Path f) throws IOException

      • 判断文件是否存在
    • public boolean isDirectory(Path f) throws IOException

      • 判断是否为目录
    • public boolean isFile(Path f) throws IOException

      • 判断是否为文件
  2. 练习

     try {
         //获得与hdfs文件系统的连接
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS", "hdfs://10.9.190.90:9000");
         
         //获取连接对象
         FileSystem fileSystem = FileSystem.get(conf);
         
         //判断文件是否存在
         System.out.println(fileSystem.exists(new Path("/test")));   //true
         
         //判断是否为目录
         System.out.println(fileSystem.isDirectory(new Path("/test")));  //true
         
         //判断是否为文件
         System.out.println(fileSystem.isFile(new Path("/test")));           //false
     }catch(Exception e) {
         e.printStackTrace();
     }
    

FileSystem方法——获取功能—文件信息获取

  1. 方法

    • public abstract FileStatus getFileStatus(Path f) throws IOException

      • Return a file status object that represents the path.

      • 返回的是FileStatus对象类型

    • public Path getHomeDirectory()

      • Return the current user's home directory in this FileSystem. The default implementation returns "/user/$USER/".

      • 返回当前用户的home目录

  2. 练习

     try {
         //获得与hdfs文件系统的连接
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS", "hdfs://10.9.190.90:9000");
         
         //获取连接对象
         FileSystem fileSystem = FileSystem.get(conf);
         
         //获取当前用户的home目录
         System.out.println("========当前用户的home目录============");
         Path path = fileSystem.getHomeDirectory();
         System.out.println(path);
         
         //获取文件状态对象
         System.out.println("============文件信息===============");
         FileStatus status = fileSystem.getFileStatus(new Path("/eclipse"));
         System.out.println("Path : " + status.getPath());
         System.out.println("isFile ? " + status.isFile());          
         System.out.println("Block size : " + status.getBlockSize());
         System.out.println("Perssions : " + status.getPermission());
         System.out.println("Replication : " + status.getReplication());
         System.out.println("isSymlink : " + status.isSymlink());            
         
     }catch(Exception e) {
         e.printStackTrace();
     }
    
    
     /*
         在JDK1.8中输出结果为:
      *  ------------------------------------------------
      *  ========当前用户的home目录============
         hdfs://10.9.190.90:9000/user/anna
         ============文件信息===============
         Path : hdfs://10.9.190.90:9000/eclipse
         isFile ? true
         Block size : 134217728
         Perssions : rw-r--r--
         Replication : 3
         isSymlink : false
         ------------------------------------------------
     */
    
  3. FileStatus中常用方法

    • public Path getPath()

    • public boolean isFile()

    • public boolean isSymlink()

    • public long getBlockSize()

    • public short getReplication()

    • public FsPermission getPermission()

FileSystem方法——获取功能——文件夹遍历1

  1. 方法

    • public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,IOException

      • 返回FileStatus型数组
    • public FileStatus[] listStatus(Path f,PathFilter filter)
      throws FileNotFoundException,IOException

    • public FileStatus[] listStatus(Path[] files,PathFilter filter)
      throws FileNotFoundException,IOException

      • 此时注意PathFilter是一个接口,里面只有一个方法:accept,本质是对文件进行筛选

      • Enumerate all files found in the list of directories passed in, calling listStatus(path, filter) on each one.

    • 注意:以上方法返回的文件按照字母表顺序排列

  2. 练习1——FileStatus[] listStatus(Path f)的使用

     //FileStatus[] listStatus(Path f)的使用
     try {
         //创建与HDFS连接
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS","hdfs://10.9.190.90:9000");
         
         //获得fileSystem
         FileSystem fileSystem = FileSystem.get(conf);
         
         //listStatus获取/test目录下信息
         FileStatus[] fileStatuses = fileSystem.listStatus(new Path("/test"));
         
         //遍历输出文件夹下文件
         for(FileStatus fileStatus :fileStatuses) {
             System.out.println(fileStatus.getPath() + "  " + new Date(fileStatus.getAccessTime()) + "  " + 
                 fileStatus.getBlockSize() + "  " + fileStatus.getPermission());
         }
     }catch(Exception e) {
         e.printStackTrace();
     }
     /*
     在JDK1.8中输出结果为:
     ----------------------------------------------------------------------------
     hdfs://10.9.190.90:9000/test/hadoop-2.7.3.tar.gz  2012-07-26  134217728  rw-r--r--
     hdfs://10.9.190.90:9000/test/hello.txt  2012-07-26  134217728  rw-r--r--
     hdfs://10.9.190.90:9000/test/test2  1970-01-01  0  rwxr-xr-x
     ----------------------------------------------------------------------------
     */
    
  3. 练习2——FileStatus[] listStatus(Path f,PathFilter filter)的使用

    • 需求:列出/test/test2目录下以.md结尾的问价信息

    • 代码:

        try {
            //创建与HDFS连接
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS","hdfs://10.9.190.90:9000");
            
            //获得fileSystem
            FileSystem fileSystem = FileSystem.get(conf);
            
            //列出目录下后缀为.md的文件相关信息
            FileStatus[] statuses = fileSystem.listStatus(new Path("/test/test2"), new PathFilter() {
                
                @Override
                public boolean accept(Path path) {
                    // TODO Auto-generated method stub
                    String string = path.toString();
                    
                    if(string.endsWith(".md"))
                        return true;
                    else
                        return false;
                }
            });
            
            //列出文件信息
            for(FileStatus status : statuses) {
                System.out.println("Path : " + status.getPath() + "  Permisson : " + status.getPermission() + 
                        "  Replication : " + status.getReplication());
            }
        }catch(Exception e) {
            e.printStackTrace();
        }
      
  4. 注意问题

    • By the time the listStatus() operation returns to the caller, there is no guarantee that the information contained in the response is current. The details MAY be out of date, including the contents of any directory, the attributes of any files, and the existence of the path supplied.(listStatus()方法线程不安全)

FileSystem方法——获取功能——文件夹遍历2

  1. 方法

    • public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
      throws FileNotFoundException, IOException

    • protected org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,PathFilter filter)
      throws FileNotFoundException, IOException

      • 注意:此方法是protected的,protected权限是:本类,同一包下(子类或无关类),不同包下子类
    • 注意:LocatedFileStatus是FileStatus的子类

  2. 使用

     try {
         //创建与HDFS连接
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS","hdfs://10.9.190.90:9000");
         
         //获得fileSystem
         FileSystem fileSystem = FileSystem.get(conf);
         
         //列出目录下后缀为.md的文件相关信息
         RemoteIterator<LocatedFileStatus> iterator = fileSystem.listLocatedStatus(new Path("/test/test2"));
         
         while(iterator.hasNext()) {
             LocatedFileStatus status = iterator.next();
             System.out.println("Path : " + status.getPath() + "  Permisson : " + status.getPermission() + 
                     "  Replication : " + status.getReplication());
         }           
     }catch(Exception e) {
         e.printStackTrace();
     }
     /*
      * 在JDK1.8中输出结果为:
      * ---------------------------------------------------------------------------------------------
      * Path : hdfs://10.9.190.90:9000/test/test2/Map.md  Permisson : rw-r--r--  Replication : 3
        Path : hdfs://10.9.190.90:9000/test/test2/biji.md  Permisson : rw-r--r--  Replication : 3
        Path : hdfs://10.9.190.90:9000/test/test2/haha.txt  Permisson : rw-r--r--  Replication : 3
        ---------------------------------------------------------------------------------------------
      * */
    
  3. 与listStatus(Path p)不同的是

    • listStatus返回的是FileStatus[]数组类型,遍历时可通过数组for-each进行遍历

    • listLocatedStatus(Path p)返回的是LocatedFileStatus类型的RemoteIterator集合,通过迭代器进行遍历输出

    • 但是要注意的是listLocatedStatus()方法本质上内部还是listStatus(Path p)实现的

FileSystem方法——获取功能——文件夹遍历3

  1. 方法

    • public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listFiles(Path f,boolean recursive)
      throws FileNotFoundException,IOException
      • 递归遍历出文件夹内容以及子文件夹中内容
  2. 使用

     try {
         //创建与HDFS连接
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS","hdfs://10.9.190.90:9000");
         
         //获得fileSystem
         FileSystem fileSystem = FileSystem.get(conf);
         
         //列出目录下后缀为.md的文件相关信息
         RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(new Path("/test"),true);
         
         while(iterator.hasNext()) {
             LocatedFileStatus status = iterator.next();
             System.out.println("Path : " + status.getPath() + "  Permisson : " + status.getPermission() + 
                     "  Replication : " + status.getReplication());
         }           
     }catch(Exception e) {
         e.printStackTrace();
     }
    
     /*
      *  在JDK1.8中输出结果为:
      *  ---------------------------------------------------------------------------------------------------
      *  Path : hdfs://10.9.190.90:9000/test/hadoop-2.7.3.tar.gz  Permisson : rw-r--r--  Replication : 3
         Path : hdfs://10.9.190.90:9000/test/hello.txt  Permisson : rw-r--r--  Replication : 3
         Path : hdfs://10.9.190.90:9000/test/test2/Map.md  Permisson : rw-r--r--  Replication : 3
         Path : hdfs://10.9.190.90:9000/test/test2/biji.md  Permisson : rw-r--r--  Replication : 3
         Path : hdfs://10.9.190.90:9000/test/test2/haha.txt  Permisson : rw-r--r--  Replication : 3
         ---------------------------------------------------------------------------------------------------
      * */       
    

FileSystem方法——获取功能——获取文件block的位置

  1. 方法

    • public BlockLocation[] getFileBlockLocations(Path p,long start,long len) throws IOException

    • public BlockLocation[] getFileBlockLocations(FileStatus file,long start,long len) throws IOException

  2. 使用

     //查看/test/hadoop的block存放位置
     try {
         //创建与HDFS连接
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS","hdfs://10.9.190.90:9000");
         
         //获得fileSystem
         FileSystem fileSystem = FileSystem.get(conf);
         
         FileStatus status = fileSystem.getFileStatus(new Path("/test/hadoop"));
         
         BlockLocation[] locations = fileSystem.getFileBlockLocations(status, 0,status.getLen());
         
         for(BlockLocation location : locations) {
             System.out.println("host : " + location.getHosts() + " name : " + location.getNames() + " length : " + location.getLength());
         } 
     }catch(Exception e) {
         e.printStackTrace();
     }
     /*
     在JDK1.8中输出结果为:
     ------------------------------------------------------------------------------
     host : [Ljava.lang.String;@18ece7f4 name : [Ljava.lang.String;@3cce57c7 length : 134217728
     host : [Ljava.lang.String;@1cf56a1c name : [Ljava.lang.String;@33f676f6 length : 79874467
     ------------------------------------------------------------------------------
     */
    

FileSystem方法——获取功能——获取到某文件的输出流

  1. 方法

    • public FSDataOutputStream create(Path f) throws IOException

    • public FSDataOutputStream create(Path f,boolean overwrite)
      throws IOException

      • overwrite - if a file with this name already exists, then if true, the file will be overwritten, and if false an exception will be thrown.
    • public FSDataOutputStream create(Path f,
      Progressable progress)
      throws IOException

      • Create an FSDataOutputStream at the indicated Path with write-progress reporting. Files are overwritten by default.
    • public FSDataOutputStream create(Path f,boolean overwrite,int bufferSize)
      throws IOException

    • public FSDataOutputStream create(Path f,boolean overwrite,int bufferSize, Progressable progress)throws IOException

    • FSDataOutputStream append(Path p, int bufferSize, Progressable progress)

  2. 使用——将本地E:/hzy.jpg上传到hdfs的/1.jpg

     public static void main(String[] args) {        
     BufferedInputStream in = null;
     FSDataOutputStream out = null;
     
     try {
         //创建与HDFS连接
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS","hdfs://10.9.190.90:9000");
         
         //获得fileSystem
         FileSystem fileSystem = FileSystem.get(conf);
         
         //获取本地文件输入流
         File file = new File("E:/hzy.jpg");
         in = new BufferedInputStream(new FileInputStream(file));
         final long fileSize = file.length();
         
         //获取到/test/hello.txt的输出流
         out = fileSystem.create(new Path("/1.jpg"),new Progressable() {
             long fileCount = 0;
             
             @Override
             public void progress() {
                 // TODO Auto-generated method stub
                 fileCount++;
                 System.out.println("总进度:" + (fileCount/fileSize)*100 + " %");
             }
         });
         
         //拷贝
         int len = 0;
         while((len = in.read()) != -1) {
             out.write(len);                 //此时也可以用:IOUtils.copyBytes(in,out,conf);
         }
         
         in.close();
         out.close();
         
                 
     }catch(Exception e) {
         e.printStackTrace();
     }finally {
         if(in != null) {
             try {
                 in.close();
             } catch (IOException e) {
                 // TODO Auto-generated catch block
                 e.printStackTrace();
             }
         }
         if (out != null) {
             try {
                 out.close();
             } catch (IOException e) {
                 // TODO Auto-generated catch block
                 e.printStackTrace();
             }
         }
     }
    

    }

FileSystem方法——获取功能——获取到某文件的输入流——读取文件

  1. 方法

    • public FSDataInputStream open(Path f) throws IOException

    • public abstract FSDataInputStream open(Path f,int bufferSize)throws IOException

  2. 使用——将hdfs中的1.jpg拷贝到本地E:/hzy2.jpg

     try {
         //创建与HDFS连接
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS","hdfs://10.9.190.90:9000");
         
         //获得fileSystem
         FileSystem fileSystem = FileSystem.get(conf);
         
         //获取hdfs文件输入流
         FSDataInputStream in = fileSystem.open(new Path("/1.jpg"));
         
         //获取本地输出流
         BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(new File("E:/hzyCopy.jpg")));
         
         int len = 0;
         byte[] bArr = new byte[1024*3];
         while((len = in.read(bArr)) != -1) {
             out.write(bArr,0,len);
         }
         
         in.close();
         out.close();
         
     }catch(Exception e) {
         e.printStackTrace();
     }
    

    }

FileSystem方法——创建功能

FileSystem方法——删除功能

FileSystem方法——重命名功能

FileSystem其他方法

interface RemoteIterator

  1. 定义

     public interface RemoteIterator<E> {
       boolean hasNext() throws IOException;
       E next() throws IOException;
     }   
    
    • The primary use of RemoteIterator in the filesystem APIs is to list files on (possibly remote) filesystems.
  2. 使用

     //listLocatedFileStatus(Path f)
     public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
         throws FileNotFoundException,IOException
    
    
     //listLocatedStatus(Path f,PathFilter filter)
     protected org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f,PathFilter filter)
         throws FileNotFoundException,IOException
    
     //listStatusIterator(Path p)
     public org.apache.hadoop.fs.RemoteIterator<FileStatus> listStatusIterator(Path p)
         throws FileNotFoundException,IOException
    
     //listFiles(Path f,boolean recursive)
     public org.apache.hadoop.fs.RemoteIterator<LocatedFileStatus> listFiles(Path f,boolean recursive)
         throws FileNotFoundException,IOException
    

interface StreamCapabilities

  1. 方法

     public interface StreamCapabilities {
       boolean hasCapability(String capability);
     }
    
  2. 使用

     hadoop2.7.3中无此方法,在2.9.1中才有
    
上一篇下一篇

猜你喜欢

热点阅读