java api操作HDFS

2018-12-28  本文已影响0人  小月半会飞

第一部分 项目创建

一、创建一个maven项目

image.png

选中红框所选 部分,点击下一步
在下一步之前,确认setting.xml的两个参数已经更改,
1)、<localRepository>
2)、<mirror>


image.png

继续下一步


image.png
完成

第二部分 代码实现

1、导入依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-client</artifactId>
    <version>2.8.3</version>
</dependency>

2、准备步骤

在进行一系列文件操作之前,要先拿到客户端对象

public class App 
{
    Configuration configuration;
    FileSystem fileSystem;
    @BeforeTest
    public void init() throws IOException {
//        告诉hadoop我的主机名是root,不写的话,虚拟机会认为是该window的主机名
        System.setProperty("HADOOP_USER_NAME", "root") ;
        configuration=new Configuration();
//        给configuration添加属性
        configuration.set("fs.defaultFS","hdfs://hadoop3:8020");
        fileSystem=FileSystem.get(configuration);
    }
}

3、上传文件

/**
     测试上传文件
    */
    @Test
    public void testUpload() throws IOException {
//        上传本地文件到hdfs
        fileSystem.copyFromLocalFile(new Path("d:/hello.txt"),new Path("/hello.txt"));
        fileSystem.close();
    }

4、下载文件

/**
     测试下载文件
    */
    @Test
    public void testDownload() throws IOException {
//        第一个true表示要删除源文件,false表示不删除源文件、
//        第二个true表示要拷贝到本地文件系统,false表示是在hdfs来回拷贝
        fileSystem.copyToLocalFile(true,new Path("/hello.txt"),new Path("d:/test/hello.txt"),true);
        fileSystem.close();
    }

5、获取配置参数

/**
     * 获取配置参数:获取的是jar包中配置的,服务端配置的是不起作用的
     * 但是可以使用配置文件或者用cong.set()来指定
     * 比如副本数量 dfs.replication,3
     * @throws Exception
     */
    @Test
    public void testConfiguration() throws Exception{
        Iterator<Map.Entry<String, String>> it = configuration.iterator();
        while (it.hasNext()){
            Map.Entry<String, String> entry = it.next();
            System.out.println(entry.getKey()+","+entry.getValue());
        }
        fileSystem.close();
    }

6、创建文件夹

/**
     * 测试创建文件夹 可以是多层
     * @throws Exception
     */
    @Test
    public void testMkdir() throws Exception{
        boolean b = fileSystem.mkdirs(new Path("/aaa/bbb"));
        System.out.println("文件夹是否创建成功:" + b);
        fileSystem.close();
    }

7、删除文件或者文件夹

 /**
     * 测试删除文件夹或文件,第二个参数用true则表示递归删除
     * @throws Exception
     */
    @Test
    public void testDelete() throws IOException {
        boolean b = fileSystem.delete(new Path("/aaa"), true);
        System.out.println("是否成功删除:"+b);
        fileSystem.close();
    }

8、查看文件列表

/**
     * 列出指定文件夹下的所有文件,第二个参数true表示递归列出,不写true表示不递归,相当于ls
     * 每次拿到的只有一个文件,如果不使用迭代器一次拿太多内存吃不消
     */
    @Test
    public void testListfiles() throws Exception{
        RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(new Path("/output"), true);//true表示递归
        while(files.hasNext()){
            LocatedFileStatus fileStatus = files.next();
            System.out.println("blockSize"+fileStatus.getBlockSize());
            System.out.println("owner:"+fileStatus.getOwner());
//            备份数量
            System.out.println("replication:"+fileStatus.getReplication());
            //文件路径
            System.out.println("path:"+fileStatus.getPath());
            //文件名
            System.out.println("name:"+fileStatus.getPath().getName());
            //关于block块的一些信息
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            for (BlockLocation block : blockLocations) {
                System.out.println("块大小:"+block.getLength());
                System.out.println("偏移量:"+block.getOffset());
//                所在datanode
                String[] hosts = block.getHosts();
                for (String host : hosts) {
                    System.out.println("所在datanode:"+host);
                }
            }
            System.out.println("------------");
        }
        fileSystem.close();
    }

第三部分 使用流进行文件读写

1、上传文件

/**
 * 用流的方式来操作hdfs上的文件,可以实现读取指定偏移量范围的数据
 * @author 12706
 *
 */
public class HdfsStreamAccess {
        Configuration conf = null;
        FileSystem fs = null;

        @BeforeTest
        public void init() throws Exception {
            System.setProperty("HADOOP_USER_NAME", "root") ;
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://hadoop3:8020");
            fs = FileSystem.get(conf);
        }

    /**
     * 通过流,将本地文件D:/test/bbb.txt写到hdfs下的/bbb.txt
     *
     * @throws Exception
     */
    @Test
    public void testUpload() throws Exception {
        FSDataOutputStream outputStream = fs.create(new Path("/bbb.txt"), true);//有就覆盖
        FileInputStream inputStream = new FileInputStream("D:/test/bbb.txt");
        IOUtils.copy(inputStream, outputStream);
    }
}

2、下载文件

@Test
    public void testDownload() throws Exception {
        FSDataInputStream inputStream = fs.open(new Path("/bbb.txt"));
        FileOutputStream outputStream = new FileOutputStream("D:/test/bbb1.txt");
//        下载到本地
        IOUtils.copy(inputStream, outputStream);
    }

3、下载文件

/**
     * 指定位置开始写
     *
     * @throws Exception
     */
    @Test
    public void testRandomAccess() throws Exception {
        FSDataInputStream inputStream = fs.open(new Path("/bbb.txt"));
        //FileInoutStream是没有这个方法的,定位到第12个字节处
        inputStream.seek(12);
        FileOutputStream outputStream = new FileOutputStream("D:/test/bbb2.txt");
        //从第12个字节写到文件末
        IOUtils.copy(inputStream, outputStream);
    }

4、指定文件内容输出到控制

/**
     * 将hdfs指定文件内容输出到控制台
     *
     * @throws Exception
     */
    @Test
    public void testCat() throws Exception {
        FSDataInputStream inputStream = fs.open(new Path("/bbb.txt"));
//        也能从指定位置开始输出
//        inputStream.seek(20);
        IOUtils.copy(inputStream, System.out);
    }
上一篇下一篇

猜你喜欢

热点阅读