python数据分析人工智能机器学习软件测试Python专家之路程序员

pySpark工具学习2-弹性分布式数据集(RDD Resili

2019-05-22  本文已影响25人  python测试开发

弹性分布式数据集(RDD Resilient Distributed Dataset)是不可变JVM对象的分布式集合,允许您非常快速地执行计算,并且它们是Apache Spark的核心。

顾名思义,数据集是分布式的;它根据一些密钥分成块并分发到执行程序节点。这样做可以非常快速地对这些数据集运行计算。RDD跟踪(日志)应用于每个块的所有转换,以加快计算速度,并在出现问题并且丢失部分数据时提供回退;在这种情况下,RDD可以重新计算数据。这是防止数据丢失的另一道防线,是数据复制的补充。

本章内容:

RDD的内部工作方式

RDD并行运行。这是在Spark中工作的最大优势:每个转换都是并行执行的,以便大幅提高速度。

对数据集的转换是惰性的。这意味着只有在调用数据集上的操作时才会执行任何转换。这有助于Spark优化执行。例如,考虑以下非常常见的步骤,分析师通常会这样做以熟悉数据集:

1.计算某列中不同值的出现次数。
2.选择以A开头的值。
3.将结果输出到屏幕上。

首先使用.map(lambda v: (v, 1))方法映射A的值,然后选择那些以'A'开头的记录(使用.filter(lambda val: val.startswith('A')))。如果我们调用.reduceByKey(operator.add)方法,它将减少数据集并添加(在此示例中,计算)每个键的出现次数。所有这些步骤都会转换数据集。

然后调用.collect()方法来执行这些步骤。此步骤是对我们的数据集的操作 - 它最终计算数据集的不同元素。实际上,该操作可能会颠倒转换的顺序并在映射之前首先过滤数据,从而将较小的数据集传递给reducer。

如果您还不了解这些命令,请不要担心 - 我们将在本章后面详细解释它们。

创建RDD

在PySpark中有两种创建RDD的方法:parallelize方法传入集合(列表或一些元素的数组):

data = sc.parallelize([('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12),('Amber', 9)])

或者您可以引用位于本地或外部位置的文件(或多个文件),下面我们使用Mortality数据集VS14MORT.txt文件(2016年7月31日访问),数据集说明参见Record_Layout_2014.pdf,数据集下载VS14MORT.txt.gz

data_from_file = sc.textFile('/home/andrew/code/meil/VS14MORT.txt.gz', 4)

最后一个参数指定数据集分成的区数。一般是2-4个分区。
Spark可以从多种文件系统中读取:本地文件系统,如NTFS,FAT或Mac OS Extended(HFS +),或分布式文件系统,如HDFS,S3,Cassandra等。注意路径不能包含特殊字符[]等。

支持多种数据格式:可以使用JDBC驱动程序读取文本,parquet,JSON,Hive表和关系数据库中的数据。Spark可以自动使用压缩数据集(如前面示例中的Gzipped)。

根据数据的读取方式,保存数据的对象的表示方式略有不同。当我们.paralellize(...)一个集合时,从文件读取的数据表示为MapPartitionsRDD而不是ParallelCollectionRDD。

RDD是无模式数据结构(与DataFrame不同,我们将在下一章讨论)。比如可以并行如下数据:

In [9]: data_heterogenous = sc.parallelize([('Ferrari', 'fast'),{'Porsche': 100000},['Spain','visited', 4504]]).collect()
In [10]: data_heterogenous[1]['Porsche']
Out[10]: 100000

collect()方法将RDD的所有元素返回给驱动程序,并将其序列化为列表。

从文本文件读取时,文件中的每一行都构成RDD的元素。

In [11]:  data_from_file.take(1)
Out[11]: ['                   1                                          2101  M1087 432311  4M4                2014U7CN                                    I64 238 070   24 0111I64                                                                                                                                                                           01 I64                                                                                                  01  11                                 100 601']

我们将从data_from_file中提取有用信息。

需要注意一点。定义纯Python方法会降低应用程序的速度,因为Spark需要在Python解释器和JVM之间不断地来回切换。要尽量使用内置的Spark函数。

In [12]: 
def extractInformation(row):
    import re
    import numpy as np

    selected_indices = [
         2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,
         19,21,22,23,24,25,27,28,29,30,32,33,34,
         36,37,38,39,40,41,42,43,44,45,46,47,48,
         49,50,51,52,53,54,55,56,58,60,61,62,63,
         64,65,66,67,68,69,70,71,72,73,74,75,76,
         77,78,79,81,82,83,84,85,87,89
    ]

    '''
        Input record schema
        schema: n-m (o) -- xxx
            n - position from
            m - position to
            o - number of characters
            xxx - description
        1. 1-19 (19) -- reserved positions
        2. 20 (1) -- resident status
        3. 21-60 (40) -- reserved positions
        4. 61-62 (2) -- education code (1989 revision)
        5. 63 (1) -- education code (2003 revision)
        6. 64 (1) -- education reporting flag
        7. 65-66 (2) -- month of death
        8. 67-68 (2) -- reserved positions
        9. 69 (1) -- sex
        10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated
        11. 71-73 (3) -- number of units (years, months etc)
        12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)
        13. 75-76 (2) -- age recoded into 52 categories
        14. 77-78 (2) -- age recoded into 27 categories
        15. 79-80 (2) -- age recoded into 12 categories
        16. 81-82 (2) -- infant age recoded into 22 categories
        17. 83 (1) -- place of death
        18. 84 (1) -- marital status
        19. 85 (1) -- day of the week of death
        20. 86-101 (16) -- reserved positions
        21. 102-105 (4) -- current year
        22. 106 (1) -- injury at work
        23. 107 (1) -- manner of death
        24. 108 (1) -- manner of disposition
        25. 109 (1) -- autopsy
        26. 110-143 (34) -- reserved positions
        27. 144 (1) -- activity code
        28. 145 (1) -- place of injury
        29. 146-149 (4) -- ICD code
        30. 150-152 (3) -- 358 cause recode
        31. 153 (1) -- reserved position
        32. 154-156 (3) -- 113 cause recode
        33. 157-159 (3) -- 130 infant cause recode
        34. 160-161 (2) -- 39 cause recode
        35. 162 (1) -- reserved position
        36. 163-164 (2) -- number of entity-axis conditions
        37-56. 165-304 (140) -- list of up to 20 conditions
        57. 305-340 (36) -- reserved positions
        58. 341-342 (2) -- number of record axis conditions
        59. 343 (1) -- reserved position
        60-79. 344-443 (100) -- record axis conditions
        80. 444 (1) -- reserve position
        81. 445-446 (2) -- race
        82. 447 (1) -- bridged race flag
        83. 448 (1) -- race imputation flag
        84. 449 (1) -- race recode (3 categories)
        85. 450 (1) -- race recode (5 categories)
        86. 461-483 (33) -- reserved positions
        87. 484-486 (3) -- Hispanic origin
        88. 487 (1) -- reserved
        89. 488 (1) -- Hispanic origin/race recode
     '''

    record_split = re\
        .compile(
            r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + 
            r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + 
            r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +
            r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +
            r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + 
            r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + 
            r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + 
            r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + 
            r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')
    try:
        rs = np.array(record_split.split(row))[selected_indices]
    except:
        rs = np.array(['-99'] * len(selected_indices))
    return rs
#     return record_split.split(row)
In [13]: data_from_file_conv = data_from_file.map(extractInformation)
    ...: data_from_file_conv.map(lambda row: row).take(1)
Out[13]:
[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

全局与本地范围

Spark固有并行性,可以以两种模式运行:本地和集群。当您在本地运行Spark时,您的代码可能与集群没有太大区别。群集模式下,当提交作业执行时,作业将发送到驱动程序(或主节点)。驱动程序节点创建DAG(参见第1章,
了解作业并确定哪个执行者(或工作者)节点将运行特定任务。

然后,驱动程序指示工作人员执行他们的任务,并在完成后将结果返回给驱动程序。然而,在此之前,驱动程序准备每个任务的闭包:驱动程序上存在一组变量和方法,供工作人员在RDD上执行其任务。

每个执行程序都从驱动程序中获取变量和方法的副本。如果,在运行任务时,执行程序会更改这些变量或覆盖方法,
它不会影响其他执行者的副本或驱动程序的变量和方法。这可能会导致一些意外行为和运行时错误,有时可能很难追查。

spark在单机的性能,比pandas,对中小数据集,起码差了1个数量级。

更多参考:http://spark.apache.org/docs/latest/programming-guide.html#local-vs-cluster-modes

转换

转换可以塑造数据集。其中包括对数据集中的值进行映射,过滤,连接和转码。在本节中,我们将展示RDD上可用的一些转换。

由于空间限制,我们仅包括最常用的转换和操作。对于一整套可用的方法,我们建议您查看PySpark关于RDD的文档http://spark.apache.org/docs/latest/api/python/pyspark

In [15]: data_2014.take(10)
Out[15]: [2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, 2014, -99]

In [16]: data_2014_2 = data_from_file_conv.map(lambda row: (row[16], int(row[16])))
    ...: data_2014_2.take(10)
Out[16]:
[('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('2014', 2014),
 ('-99', -99)]
In [19]: data_filtered = data_from_file_conv.filter(lambda row: row[5] == 'F' and row[21] == '0')

In [20]: data_filtered.count()
Out[20]: 6

In [21]: data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))

In [22]: data_2014_flat.take(10)
Out[22]: ['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]
In [21]: data_2014_flat = data_from_file_conv.flatMap(lambda row: (row[16], int(row[16]) + 1))

In [22]: data_2014_flat.take(10)
Out[22]: ['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015]
In [25]: distinct_gender = data_from_file_conv.map(lambda row: row[5]).distinct().collect()

In [26]: distinct_gender
Out[26]: ['M', 'F', '-99']

开销比较大,慎用

.sample(...)方法返回数据集中的随机样本。 第一个参数指定采样是否应该替换,第二个参数定义要返回的数据的分数,第三个参数是伪随机数生成器的种子:

在此示例中,我们从原始数据集中选择了10%的随机样本。

In [27]: fraction = 0.1

In [28]: data_sample = data_from_file_conv.sample(False, fraction, 666)

In [29]: data_sample.take(1)
Out[29]:
[array(['1', '  ', '5', '1', '01', 'F', '1', '082', ' ', '42', '22', '10',
        '  ', '4', 'W', '5', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I251',
        '215', '063', '   ', '21', '02', '11I350 ', '21I251 ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '02',
        'I251 ', 'I350 ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '28', ' ',
        ' ', '2', '4', '100', '8'], dtype='<U40')]

就像在SQL世界中一样,根据在两个数据集中找到的值连接两个RDD,并返回左RDD中的记录,其中右侧记录附加在两个RDD匹配的位置:

这是另一种昂贵的方法,应该谨慎使用,并且只在必要时使用,因为它会使数据混乱,从而导致性能下降。
你在这里看到的是来自RDD rdd1的所有元素及其来自RDD rdd2的相应值。如您所见,值'a'在rdd3中显示两次,而'a'在RDD rdd2中显示两次。来自rdd1的值b仅显示一次,并与来自rdd2的值“6”连接。缺少两件事:
来自rdd1的值'c'在rdd2中没有对应的键,因此返回的元组中的值显示为None,并且,因为我们执行了左外连接,
rdd2的值'd'按预期消失了。

如果我们使用.join(...)方法,我们只得到'a'和'b'的值,因为这两个值在这两个RDD之间相交。

另一个有用的方法是.intersection(...),它返回两个RDD中相同的记录。执行以下代码:

In [30]: rdd1 = sc.parallelize([('a', 1), ('b', 4), ('c',10)])

In [31]: rdd2 = sc.parallelize([('a', 4), ('a', 1), ('b', '6'), ('d', 15)])

In [32]: rdd3 = rdd1.leftOuterJoin(rdd2)

In [33]: rdd3.take(5)
Out[33]: [('b', (4, '6')), ('c', (10, None)), ('a', (1, 4)), ('a', (1, 1))]

In [34]: rdd4 = rdd1.join(rdd2)

In [35]: rdd4.collect()
Out[35]: [('b', (4, '6')), ('a', (1, 4)), ('a', (1, 1))]

In [36]: rdd5 = rdd1.intersection(rdd2)

In [37]: rdd5.collect()
Out[37]: [('a', 1)]

In [38]: rdd1 = rdd1.repartition(4)

In [39]: len(rdd1.glom().collect())
Out[39]: 4

重新分区数据集会更改数据集分区的分区数。 应该谨慎使用此功能,并且仅在真正需要时才会使用,因为它会对数据进行混洗,这实际上会导致性能方面的重大影响:

上面的代码打印出4作为新的分区数。
与.collect()相比,.glom()方法生成一个列表,其中每个元素是指定分区中存在的数据集的所有元素的另一个列表; 返回的主列表包含与分区数一样多的元素。

操作 Action

与转换相比,操作在数据集上执行计划任务;完成数据转换后,您可以执行转换。这可能不包含任何转换(例如,即使您没有对RDD进行任何转换,.take(n)也只会从RDD返回n条记录)
或者执行整个转换链。

常用方法。该方法优先于.collect(...),因为它只返回单个数据分区中的n个行。处理大型数据集时,这一点尤为重要:

In [40]: data_first = data_from_file_conv.take(1)

In [41]: data_first
Out[41]:
[array(['1', '  ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',
        '  ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ',
        '238', '070', '   ', '24', '01', '11I64  ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '01',
        'I64  ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

In [42]: data_take_sampled = data_from_file_conv.takeSample(False, 1, 667)

In [43]: data_take_sampled
Out[43]:
[array(['2', '17', ' ', '0', '08', 'M', '1', '069', ' ', '39', '19', '09',
        '  ', '1', 'M', '7', '2014', 'U', '7', 'U', 'N', ' ', ' ', 'I251',
        '215', '063', '   ', '21', '06', '11I500 ', '21I251 ', '61I499 ',
        '62I10  ', '63N189 ', '64K761 ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '       ',
        '       ', '       ', '       ', '       ', '       ', '05',
        'I251 ', 'I120 ', 'I499 ', 'I500 ', 'K761 ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '     ',
        '     ', '     ', '     ', '     ', '     ', '     ', '01', ' ',
        ' ', '1', '1', '100', '6'], dtype='<U40')]

此方法将RDD的所有元素返回给驱动程序。类似pandas的all()。

In [44]: rdd1.map(lambda row: row[1]).reduce(lambda x, y: x + y)
Out[44]: 15

In [45]: data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 1) # 1个分区计算比较准确

In [46]: works = data_reduce.reduce(lambda x, y: x / y)

In [47]: works
Out[47]: 10.0

In [48]: data_reduce = sc.parallelize([1, 2, .5, .1, 5, .2], 3)

In [49]: data_reduce.reduce(lambda x, y: x / y)
Out[49]: 0.004

In [50]: data_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)

In [51]: data_key.reduceByKey(lambda x, y: x + y).collect()
Out[51]: [('b', 4), ('c', 2), ('a', 12), ('d', 5)]


-count  统计元素个数

```python
In [52]: data_reduce.count()
Out[52]: 6

In [53]: len(data_reduce.collect())
Out[53]: 6

In [54]: data_key.countByKey().items()
Out[54]: dict_items([('a', 2), ('b', 2), ('c', 1), ('d', 2)])
In [55]: data_key.saveAsTextFile('data_key.txt')

In [56]: def parseInput(row):
    ...:     import re
    ...:
    ...:     pattern = re.compile(r'\(\'([a-z])\', ([0-9])\)')
    ...:     row_split = pattern.split(row)
    ...:
    ...:     return (row_split[1], int(row_split[2]))
    ...:

In [57]: data_key_reread = sc.textFile('data_key.txt').map(parseInput)
    ...: data_key_reread.collect()
Out[57]: [('a', 8), ('d', 2), ('a', 4), ('b', 3), ('c', 2), ('b', 1), ('d', 3)]

每个分区到一个单独的文件:
data_key.saveAsTextFile(
'/Users/drabast/Documents/PySpark_Data/data_key.txt')
要将其读回来,您需要将其解析回来,因为所有行都被视为字符串:

In [58]: def f(x):
    ...:     print(x)
    ...:

In [59]: data_key.foreach(f)
('a', 4)
('b', 1)
('d', 3)
('a', 8)
('d', 2)
('b', 3)
('c', 2)

注意每次的顺序可能不同

小结

总结RDD是Spark的支柱; 这些无模式数据结构是我们将在Spark中处理的最基本的数据结构。
在本章中,我们介绍了通过.parallelize(...)方法以及从文本文件中读取数据,从文本文件创建RDD的方法。 此外,还显示了处理非结构化数据的一些方法。
Spark中的转换是惰性的 - 它们仅在调用动作时应用。 在本章中,我们讨论并介绍了最常用的转换和操作; PySpark文档包含更多http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
Scala和Python RDD之间的一个主要区别是速度:Python RDD可能比它们的Scala对应物慢得多。
在下一章中,我们将引导您完成一个数据结构,使PySpark应用程序与Scala编写的数据结构(DataFrames)相同。

上一篇下一篇

猜你喜欢

热点阅读