在 pyspark 中自定义 hdfs 的输出

2019-07-19  本文已影响0人  Rosyyyy

本文主要是参考在pyspark中操作hdfs文件, 并修改了一些代码中的bug

利用pyspark输出主要用的是 SaveAsTextFile ,但是这个函数为了实现输出的并行化会输出很多文件,如果想输出单个文件 / 自定义输出文件,应该怎么操作呢?

首先在pyspark中没有直接的接口函数对hdfs文件进行操作,这里使用py4j在python代码中运行java,实现hadoopConfiguration()的调用

from py4j.java_gateway import JavaGateway


def path(sc, filepath):
    """
    创建hadoop path对象
    :param sc sparkContext对象
    :param filename 文件绝对路径
    :return org.apache.hadoop.fs.Path对象
    """
    path_class = sc._gateway.jvm.org.apache.hadoop.fs.Path
    return path_class(filepath)


def get_file_system(sc):
    """
    创建FileSystem
    :param sc SparkContext
    :return FileSystem对象
    """
    filesystem_class = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    hadoop_configuration = sc._jsc.hadoopConfiguration()
    return filesystem_class.get(hadoop_configuration)


def write(sc, filepath, overwrite=True):
    """
    写内容到hdfs文件
    :param sc SparkContext
    :param filepath 绝对路径
    :param content 文件内容
    :param overwrite 是否覆盖
    """
    filesystem = get_file_system(sc)
    out = filesystem.create(path(sc, filepath), overwrite)
    return out

使用的方法就是

out = write(sc, output)  # 获得输出流
out.write(bytes(【str】, "utf-8"))
out.flush()
out.close()

参考

[1] 在pyspark中操作hdfs文件

上一篇下一篇

猜你喜欢

热点阅读