HDFS中API的使用

2019-05-23  本文已影响0人  lizb

在项目开发中,有时我们需要通过HDFS的api来对文件进行操作,比如将数据上传到HDFS或者从HDFS获取数据等。本篇来介绍一下HDFS中API的具体使用。直接上代码:

package com.lzb.hdfs.fs;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

import java.io.IOException;

public class HDFSHelper {

    private FileSystem fs;

    public HDFSHelper() {
        fs = getFileSystem();
    }

    /**
     * Configuration是配置对象,conf可以理解为包含了所有配置信息的一个集合,可以认为是Map,
     * 在初始化的时候底层会加载一堆配置文件 core-site.xml;hdfs-site.xml;mapred-site.xml;yarn-site.xml
     * 如果需要项目代码自动加载配置文件中的信息,那么就必须把配置文件改成-default.xml或者-site.xml的名称,
     * 而且必须放置在src下,如果不叫这个名,或者不在src下,也需要加载这些配置文件中的参数,必须使用conf对象提供的方法手动加载.
     * 依次加载的参数信息的顺序是:
     * 1.加载core/hdfs/mapred/yarn-default.xml
     * 2.加载通过conf.addResource()加载的配置文件
     * 3.加载conf.set(name,value)
     */
    private Configuration getConfiguration(){
        Configuration conf = new Configuration();
        //conf.addResource("xxx");
        //conf.set("xxx","xxx");
        //Configuration.addDefaultResource("core-site.xml");
        //Configuration.addDefaultResource("hdfs-site.xml");
        //conf.set("fs.default.name","hdfs://probd01:8020");

        //HA模式的配置
        conf.set("fs.defaultFS", "hdfs://probd");
        conf.set("dfs.nameservices", "probd");
        conf.set("dfs.ha.namenodes.probd", "nn1,nn2");
        conf.set("dfs.namenode.rpc-address.probd.nn1", "probd01:8020");
        conf.set("dfs.namenode.rpc-address.probd.nn2", "probd02:8020");
        conf.set("dfs.client.failover.proxy.provider.probd", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");

        //防止报错:no FileSystem for scheme: hdfs...
        conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        return conf;
    }

    /**
     * 获取文件系统
     * 本地文件系统为LocalFileSystem,URL形式:    file:///c:myProgram
     * HDFS文件系统为DistributedFileSystem,URL形式:    fs.defaultFS=hdfs://hadoop01:9000
     */
    public FileSystem getFileSystem(){
        Configuration conf = getConfiguration();
        FileSystem fs = null;
        try {
            fs = FileSystem.get(conf);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println(fs);
        return fs;
    }

    /**
     * 上传本地文件到HDFS,底层就是采用流的方式
     * @param localPath 本地文件路径
     * @param remotePath HDFS文件路径
     * @return 是否上传成功
     */
    public boolean copyFromLocal(String localPath,String remotePath){
        if(fs == null) return false;
        try {
            fs.copyFromLocalFile(new Path(localPath),new Path(remotePath));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return true;
    }

    /**
     * 从HDFS下载文件,底层就是采用流的方式
     * @param remotePath HDFS文件路径
     * @param localPath 本地路径
     * @return 是否下载成功
     */
    public boolean copyToLocal(String remotePath,String localPath){
        if(fs == null) return false;
        try {
            fs.copyToLocalFile(new Path(remotePath),new Path(localPath));
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;
    }

    /**
     * 获取目录下的文件
     * @param remotePath HDFS文件路径
     * @param recursive 是否级联(该文件夹下面如果还有子文件 要不要看,注意没有 子文件夹!!)
     */
    public void listFiles(String remotePath,boolean recursive){
        if(fs == null) return;
        try {
            RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(remotePath), recursive);
            while (iterator.hasNext()){
                LocatedFileStatus fileStatus = iterator.next();

                //文件的存储路径,以hdfs://开头的全路径 ==> hdfs://hadoop01:9000/a/gg.txt
                System.out.println( "file path === " + fileStatus.getPath());

                //文件名
                System.out.println("file name === " + fileStatus.getPath().getName());

                //文件长度
                System.out.println("file size === "+fileStatus.getLen());

                //文件所有者
                System.out.println("file owner === "+fileStatus.getOwner());

                //分组信息
                System.out.println("file group === " + fileStatus.getGroup());

                //文件权限信息
                System.out.println("file permission === " + fileStatus.getPermission());

                //文件副本数
                System.out.println("file blocks === " + fileStatus.getReplication());

                //块大小
                System.out.println("file block size === " + fileStatus.getBlockSize());

                //块位置相关信息
                BlockLocation[] blockLocations = fileStatus.getBlockLocations();

                //块的数量
                System.out.println("file block nums === " + blockLocations.length);

                for (BlockLocation bl : blockLocations) {
                    String[] hosts = bl.getHosts();
                    for (String host: hosts) {
                        System.out.println("block host === " + host);
                    }
                    //块的一个逻辑路径
                    bl.getTopologyPaths();
                }

            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    /**
     * 获取目录下的文件
     * 此方法与listFiles不同,不支持传true或false,即不能级联,如果想实现级联就采用递归的方式
     * @param remotePath HDFS文件路径
     */
    public void listStatus(String remotePath){
        if(fs == null) return;
        try {
            FileStatus[] listStatus = fs.listStatus(new Path(remotePath));
            for (FileStatus fss : listStatus) {
                //判断是不是文件夹
                boolean directory = fss.isDirectory();

                //判断是不是文件
                boolean file = fss.isFile();

                String name = fss.getPath().getName();

                if(file) {
                    System.out.println(name+":文件");
                }else {
                    System.out.println(name+":文件夹");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    /**
     * 删除空文件夹或空文件
     * @param path
     */
    public void deleteEmptyDirAndFile(Path path){
        if(fs == null) return;
        try {

            FileStatus[] listStatus = fs.listStatus(path);
            if(listStatus.length == 0){
                //删除空文件夹
                fs.delete(path,true);
                return;
            }

            RemoteIterator<LocatedFileStatus> iterator = fs.listLocatedStatus(path);

            while (iterator.hasNext()) {
                LocatedFileStatus next = iterator.next();
                Path currentPath = next.getPath();
                Path parentPath = next.getPath().getParent();


                if (next.isDirectory()) {
                    // 如果是空文件夹
                    if (fs.listStatus(currentPath).length == 0) {
                        // 删除掉
                        fs.delete(currentPath, true);
                    } else {
                        // 不是空文件夹,那么则继续遍历
                        if (fs.exists(currentPath)) {
                            deleteEmptyDirAndFile(currentPath);
                        }
                    }
                } else {
                    // 获取文件的长度
                    long fileLength = next.getLen();
                    // 当文件是空文件时, 删除
                    if (fileLength == 0) {
                        fs.delete(currentPath, true);
                    }
                }

                // 当空文件夹或者空文件删除时,有可能导致父文件夹为空文件夹,
                // 所以每次删除一个空文件或者空文件的时候都需要判断一下,如果真是如此,那么就需要把该文件夹也删除掉
                int length = fs.listStatus(parentPath).length;
                if (length == 0) {
                    fs.delete(parentPath, true);
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    /**
     * 创建文件夹
     * @param remotePath HDFS文件路径
     * @return 是否创建成功
     */
    public boolean mkdir(String remotePath){
        if(fs == null) return false;
        boolean success = false;
        try {
            success = fs.mkdirs(new Path(remotePath));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return success;
    }

    /**
     * 写入文件
     * @param remotePath HDFS文件路径
     * @param content 内容
     * @return 是否写入成功
     */
    public boolean writeToFile(String remotePath,String content){
        if(fs == null) return false;
        try {
            FSDataOutputStream out = fs.create(new Path(remotePath));
            out.writeUTF(content);
            out.close();
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }
        return true;

    }

    /**
     * 读取文件数据
     * @param remotePath HDFS文件路径
     * @return 读取的结果数据
     */
    public String readFromFile(String remotePath){
        String result = null;
        if(fs == null) return null;
        try {

            FSDataInputStream in = fs.open(new Path(remotePath));
            result = in.readUTF();
            in.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return result;
    }


    /**
     * 重命名文件
     * @param oldPath 旧文件路径
     * @param newPath 新文件路径
     * @return 是否重命名成功
     */
    public boolean renameFile(String oldPath,String newPath){
        if(fs == null) return false;
        Path old=new Path(oldPath);
        Path now=new Path(newPath);
        boolean rename = false;
        try {
            rename = fs.rename(old, now);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return rename;
    }

    /**
     * 删除目录和文件
     * @param remotePath HDFS文件路径
     * @return 是否删除成功
     */
    public boolean deleteFile(String remotePath){
        if(fs == null) return false;
        boolean success = false;
        try {
            success = fs.delete(new Path(remotePath), true);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return success;
    }

    /**
     * 检查文件是否存在
     * @param remotePath HDFS文件路径
     * @return 是否存在
     */
    public boolean existFile(String remotePath){
        if(fs == null) return false;
        boolean exist = false;
        try {
            exist = fs.exists(new Path(remotePath));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return exist;
    }


    /**
     * 关闭FileSystem
     */
    public void closeFileSystem(){
        if(fs != null){
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

上面代码都有注释,这里就不过多解释了,下面来看主类的执行代码:

package com.lzb.hdfs;


import com.lzb.hdfs.fs.HDFSHelper;

public class Demo {
    public static void main(String[] args) {

        HDFSHelper hdfsHelper = new HDFSHelper();

        String dir = "/test";
        String filename = "hello.txt";
        String path = dir + "/" + filename;

        boolean exist = hdfsHelper.existFile(path);
        System.out.println(path + " exist file ==> " + exist);

        if(!exist){
            boolean mkdir = hdfsHelper.mkdir(dir);
            System.out.println(dir + " create success ==> " + mkdir);

            boolean copyFromLocal = hdfsHelper.copyFromLocal("/"+filename, dir);
            System.out.println("upload success ==> " + copyFromLocal);

            hdfsHelper.listFiles(dir,false);

            String content = "hello world new";
            boolean write = hdfsHelper.writeToFile(path, content);
            System.out.println("write success ==> " + write);

            String data = hdfsHelper.readFromFile(path);
            System.out.println("read the data ==> " + data);

            String newPath = dir + "/hello2.txt";
            boolean renameFile = hdfsHelper.renameFile(path, newPath);
            System.out.println("rename success ==> " + renameFile);

            boolean copyToLocal = hdfsHelper.copyToLocal(newPath, "/hello2.txt");
            System.out.println("download success ==> " + copyToLocal);

            //boolean deleteFile = hdfsHelper.deleteFile(newPath);
            //System.out.println("delete success ==> " + deleteFile);

        }

        hdfsHelper.closeFileSystem();

    }
}

执行结果如下:

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-1866182384_1, ugi=root (auth:SIMPLE)]]
/test/hello.txt exist file ==> false
/test create success ==> true
upload success ==> true
file path === hdfs://probd/test/hello.txt
file name === hello.txt
file size === 12
file owner === root
file group === supergroup
file permission === rw-r--r--
file blocks === 3
file block size === 134217728
file block nums === 1
block host === Probd01
block host === Probd03
block host === Probd02
write success ==> true
read the data ==> hello world new
rename success ==> true
download success ==> true
上一篇下一篇

猜你喜欢

热点阅读