pySpark弹性分布式数据集(RDD)入门
弹性分布式数据集(RDD)是一组不可变的JVM(java虚拟机)对象的分布集,基于某种关键字,该数据被划分成块(partition),同时分发到执行器节点,以使得这类数据集能够高速执行并行运算。
全局作用域与局部作用域
提交任务时,任务被分到主节点(驱动程序),驱动程序为任务创建DAG,并决定哪一个执行节点将运行特定的任务。执行者完成任务后将结果返回给驱动程序。而在执行器执行任务时,其实从驱动程序中获得了一份变量和方法的副本,这是驱动程序为方便执行器在RDD上执行任务准备的。执行器在执行任务时改变这些副本不会影响其他执行器和驱动程序的变量和方法。
RDD的操作类型分为两种:转换(transformation)和动作(action)。每一个转换操作都会从一个RDD生成一个新的RDD,但不改变原来的RDD,RDD的转换操作是惰性的,只有在执行动作操作时才会进行运算求值。
创建RDD的两种方法
有两种方法可以创建RDD:第一种是使用sc.parallelize()
从一个集合或者列表中构建RDD,第二种是通过读入本地或者外部的某个文件(或者多个文件),此时每行就是RDD中的一个元素。如下所示:
# 第一种方式
rdd1 = sc.parallelize([('panda',0),('pink',3),('pirate',3),('panda',1),('pink',4)])
rdd1.collect()
# [('panda', 0), ('pink', 3), ('pirate', 3), ('panda', 1), ('pink', 4)]
# 第二种方式
rdd2 = sc.textFile('test.py')
rdd2.collect()
# ['# coding:utf-8', '', 'print("hello world!")', '']
需要注意的是,collect()
方法会将所有执行节点的数据全部取到主节点(驱动程序)中,如果数据很大的话会导致内存溢出,因此需要慎用,更常见的事使用take()
方法,其从RDD中取出指定个数元素。
1、基本转换操作
1.1 针对各个元素的转化操作
-
map()
和filter()
map()
和filter()
是最基本的转换操作,与python中的map类似,map作用于RDD中的每个元素,并生成一个新的元素。而filter则是过滤掉符合特定条件的元素,如下:
rdd = sc.parallelize(range(10))
rdd.collect()
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
rdd.map(lambda x: x+1).collect()
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd.filter(lambda x: x%2==0).collect()
# [0, 2, 4, 6, 8]
flatMap()
flatMap首先对RDD中的各个元素进行操作,然后将RDD中的第一层内层嵌套展开,如下:
rdd = sc.parallelize(zip(range(3),range(3,6)))
rdd.flatMap(lambda x: (x[0]+1, x[1]**2)).collect()
# [1, 9, 2, 16, 3, 25]
rdd = sc.parallelize(['hello world', 'boya!'])
rdd.flatMap(lambda x:x.split()).collect()
# ['hello', 'world', 'boya!']
distinct()
distinct函数用于去重:
rdd = sc.parallelize([1,1,1,2,3,4])
rdd.distinct().collect()
- 抽样方法:
sample(withReplacement, fraction, seed=None)
,sampleByKey(withReplacement, fractions, seed=None)
和takeSample(withReplacement, num, seed=None)
这三个方法都是用于随机抽样的,sample是直接按照比例来抽样,takeSample是从样本中抽出指定数量的样本,而sampleByKey则是类似于一种分层采样的方法,withReplacement表示是否采用有放回抽样,例子如下:
rdd = sc.parallelize(range(0, 10))
rdd.sample(False, 0.5).collect()
# [1, 3, 4, 5, 8]
rdd.takeSample(False, 15)
# [2, 5, 6, 4, 7, 9, 3, 8, 1, 0]
fractions = {"a": 0.2, "b": 0.1}
rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())
print(rdd.count())
print(len(sample['a']))
print(len(sample['b']))
# 2000
# 197
# 103
- 伪集合操作:
union(rdds)
,intersection(other)
,subtract(other)
,cartesian(other)
在前面已经介绍过的distinct方法能够实现对数据集的去重,union 返回一个包含两个RDD中所有元素的RDD,intersection则是返回两个RDD的交集,subtract从一个RDD中减去另一个RDD中的元素。而cartesian则是返回两个RDD的笛卡尔积。
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([3,4,5])
rdd1.union(rdd2).collect()
# [1, 2, 3, 3, 4, 5]
rdd1.intersection(rdd2).collect()
# [3]
rdd1.subtract(rdd2).collect()
# [1, 2]
rdd1.cartesian(rdd2).collect()
[(1, 3), (1, 4), (1, 5), (2, 3), (2, 4), (2, 5), (3, 3), (3, 4), (3, 5)]
1.2 行动操作
RDD中常见的操作有reduce、take、first等方法,其用法见下表:

1.3 持久化操作
Spark RDD是惰性求值的,而有时我们希望能多次使用同一个RDD。为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。持久化方法有cache()
和persist()
。
出于不同的目的,我们可以为RDD选择不同的持久化级别。RDD持久化可保存在内存中也可选择保存在磁盘上。具体的持久化级别如下所示:

1.4 统计操作
RDD提供了简单的统计操作如max,min,count,stats,sum,mean,stddev,variance,sampleStdev (抽样无偏估计标准差),sampleVariance (抽样无偏估计方差),如下所示:
rdd = sc.parallelize([1,2,3])
print('sum:', rdd.sum())
print('count:', rdd.count())
print('max:', rdd.max())
print('min:', rdd.min())
print('mean:', rdd.mean())
print('stdev:', rdd.stdev())
print('variance:', rdd.variance())
print('sampleVariance:', rdd.sampleVariance())
print('sampleStdev:', rdd.sampleStdev())
print('stats:', rdd.stats())
# 输出:
sum: 6
count: 3
max: 3
min: 1
mean: 2.0
stdev: 0.816496580927726
variance: 0.6666666666666666
sampleVariance: 1.0
sampleStdev: 1.0
stats: (count: 3, mean: 2.0, stdev: 0.816496580927726, max: 3.0, min: 1.0)
2、键值对操作
键值对RDD是Spark中许多操作所需要的常见数据类型。键值对通常用来进行聚合计算。我们一般要先通过一些ETL(抽取、转化、装载)操作来将数据转化为键值对形式。
在spark中键值对类型的RDD被称为pair RDD。普通RDD有多种方式可转化为pair RDD,如:
lines = sc.parallelize(['python, hello!', 'scala, hello!', 'java, hello!'])
print(lines.map(lambda x:(x.split(',')[0], x)).collect())
# [('python', 'python, hello!'), ('scala', 'scala, hello!'), ('java', 'java, hello!')]
rdd1 = sc.parallelize(range(3))
rdd2 = sc.parallelize(range(1000,1003))
rdd1.zip(rdd2).collect()
# [(0, 1000), (1, 1001), (2, 1002)]
2.1 Pair RDD的转化操作
常见的Pair RDD操作有reduceByKey, groupByKey, sortByKey, keys, values等:
rdd = sc.parallelize([(0, 1000), (1, 1001), (2, 1002), (2,1003)])
print(rdd.reduceByKey(lambda x,y: x+y).collect())
# [(0, 1000), (1, 1001), (2, 2005)]
print(rdd.groupByKey().mapValues(list).collect())
# [(0, [1000]), (1, [1001]), (2, [1002, 1003])]
更多的Pair RDD转化操作如下:



2.2 Pair RDD 的行动操作
和转化操作一样,所有基础RDD支持的传统行动操作也都在pair RDD上可用。pair RDD提供了一些额外的行动操作,可以让我们充分利用数据的键值对特性。

除了上面的这些操作外,pyspark还为RDD提供了更多的操作,参见https://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD。
参考:
spark快速大数据分析.
pyspark官方文档.