PySpark-装载数据

2019-07-05  本文已影响0人  NEO_X

更多信息https://blue-shadow.top/

主要函数

主要是这三个函数:
parallelize ; textFile ; wholetextFiles

其他的针对特定格式的文件:

装载数据的目的为了生成RDD。

做任何Spark开发前,都先要对spark核心类SparkContext有一个清晰的认识,以下链接来自spark官网的说明。
SparkContext相关说明

Spakcontext表示与Spark群集的连接,可用于在该群集上创建RDD和广播变量。所以在进行任何Spark操作前都是需要SparkContext对象

class pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)

SparkContext是spark的主要切入点,由于RDD是主要的API,通过sparkcontext来创建和操作RDD ; 对于其他不同的处理场景使用不同的Context。如SqlContext,StreamingContext,hiveContext.这种多个Context的情况在Spark 1.0时代出现。在Spark 2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,SparkSession封装了SparkConf、SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来
SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。

parallelize(c,numSlices=None)

>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]

除了开发原型和测试时,这种方式用得并不多,毕竟这种方式需要把整个数据集先放在一台机器的内存中,针对大数据量无法处理

textFile(name, minPartitions=None, use_unicode=True)

从外部读完数据,可以在不同的数据源上进行。

>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
['Hello world!']

用于读取不同外部数据源的数据,针对不同的数据机构;textFile是主要的获取数据的方法

wholeTextFiles(path, minPartitions=None, use_unicode=True)

同时处理整个文件。如果文件足够小,那么可以使用SparkContext.wholeTextFiles()方法,该方法会返回一个pair RDD,其中键是输入文件的文件名

>>> dirPath = os.path.join(tempdir, "files")
>>> os.mkdir(dirPath)
>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
...    _ = file1.write("1")
>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
...    _ = file2.write("2")
>>> textFiles = sc.wholeTextFiles(dirPath)
>>> sorted(textFiles.collect())
[('.../1.txt', '1'), ('.../2.txt', '2')]

用于读取文件内容,进行一次处理;针对小文件可以进行这样的处理,一次性读取

总结

主要介绍了数据读取的3种方法,读取的后生成RDD,就可以进行对RDD进行操作。在下一章种进行对RDD的基础操作进行说明。

脚本示例

from pyspark import SparkContext

def test_parallelize(sc):
    """
    对parallelize使用
    """
    print('{}{}'.format('parallelize:',sc.parallelize([0, 2, 3, 4, 6], 5).collect()))

def test_textFile(sc):
    """
    对textFile使用
    """
    lines=sc.textFile('E:\py_lab\README.md')
    print('{}{}'.format('textFile -> num of doc lines:',lines.count()))
    print('{}{}'.format('textFile -> doc first line:',lines.first()))

def test_wholeTextFiles(sc):
    """
    对wholeTextFiles使用
    """
    lines=sc.wholeTextFiles('E:\py_lab\README.md')
    print('{}{}'.format('wholeTextFiles -> num of doc lines:',lines.count()) )
    print('{}{}'.format('wholeTextFiles -> doc first line:',lines.first()) )
    
if __name__=='__main__': 
    sc = SparkContext()
    test_parallelize(sc)
    test_textFile(sc)
    test_wholeTextFiles(sc)
get_data.png

上一篇:pyspark-前言
下一篇:pyspark-rdd-操作

上一篇下一篇

猜你喜欢

热点阅读