pyspark
pyspark version
输出spark的版本
print("pyspark version"+str(sc.version))
map
sc = spark context, parallelize creates an RDD from the passed object
x = sc.parallelize([1,2,3])
y = x.map(lambda x: (x,x**2))
collect copies RDD elements to a list on the driver
print(x.collect())
print(y.collect())
[1, 2, 3]
[(1, 1), (2, 4), (3, 9)]
map进行分片,collect进行合并,和parallelize负责并行创建数组
但是rdd中的map只能生成一个指定的新的rdd.任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
这里指的是1->1,2->4,3->9
flatMap
x = sc.parallelize([1,2,3])
y = x.flatMap(lambda x: (x, 100x, x*2))
print(x.collect())
print(y.collect())
[1, 2, 3]
[1, 100, 1, 2, 200, 4, 3, 300, 9]
原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。
这里指的是1->100,1 2->200,4 3->300,9
mapPartitions
mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。
x = sc.parallelize([1,2,3], 2)#这句话一个参数是创建一个列表参数,第二个参数是创建时分区的个数
mapPartitions
x = sc.parallelize([1,2,3], 2)
def f(iterator): yield sum(iterator) #创建一个求和的函数
y = x.mapPartitions(f)#x调用mapPartitions传入函数f
glom() flattens elements on the same partition
print(x.glom().collect())
print(y.glom().collect())
[[1], [2, 3]]
[[1], [5]]
mapPartitionsWithIndex
这个函数同上一个函数是一致的,只是加上了index标签
x = sc.parallelize([1,2,3], 2)
def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator))
y = x.mapPartitionsWithIndex(f)
glom() flattens elements on the same partition
print(x.glom().collect())
print(y.glom().collect())
[[1], [2, 3]]
[[(0, 1)], [(1, 5)]]
getNumPartitions
输出分区的个数
x = sc.parallelize([1,2,3], 2)
y = x.getNumPartitions()
print(x.glom().collect())
print(y)
[[1], [2, 3]]
2
filter
filter
x = sc.parallelize([1,2,3])
y = x.filter(lambda x: x%2 == 1) # filters out even elements
print(x.collect())
print(y.collect())
过滤器
distinct
distinct
x = sc.parallelize(['A','A','B'])
y = x.distinct()
print(x.collect())
print(y.collect())
['A', 'A', 'B']
['A', 'B']
去重,distinct去重是保留了后面的元素,舍去了前面的元素
sample
sample
x = sc.parallelize(range(7))
call 'sample' 5 times
ylist = [x.sample(withReplacement=False, fraction=0.5) for i in range(5)]
print('x = ' + str(x.collect()))
for cnt,y in zip(range(len(ylist)), ylist):#zip就是将两个集合合并到一起
print('sample:' + str(cnt) + ' y = ' + str(y.collect()))
x = [0, 1, 2, 3, 4, 5, 6]
sample:0 y = [0, 2, 5, 6]
sample:1 y = [2, 6]
sample:2 y = [0, 4, 5, 6]
sample:3 y = [0, 2, 6]
sample:4 y = [0, 3, 4]
抽样,每个数的取出存在一定的概率
takeSample
takeSample
x = sc.parallelize(range(7))
call 'sample' 5 times
ylist = [x.takeSample(withReplacement=False, num=3) for i in range(5)]
print('x = ' + str(x.collect()))
for cnt,y in zip(range(len(ylist)), ylist):
print('sample:' + str(cnt) + ' y = ' + str(y)) # no collect on y
x = [0, 1, 2, 3, 4, 5, 6]
sample:0 y = [0, 2, 6]
sample:1 y = [6, 4, 2]
sample:2 y = [2, 0, 4]
sample:3 y = [5, 4, 1]
sample:4 y = [3, 1, 4]
从样例中随机取出三个数字
union
union
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['D','C','A'])
z = x.union(y)
print(x.collect())
print(y.collect())
print(z.collect())
['A', 'A', 'B']
['D', 'C', 'A']
['A', 'A', 'B', 'D', 'C', 'A']
合并并不去重
intersection
intersection
x = sc.parallelize(['A','A','B'])
y = sc.parallelize(['A','C','D'])
z = x.intersection(y)
print(x.collect())
print(y.collect())
print(z.collect())
['A', 'A', 'B']
['A', 'C', 'D']
['A']
交集
sortByKey
sortByKey
x = sc.parallelize([('B',1),('A',2),('C',3)])
y = x.sortByKey()
print(x.collect())
print(y.collect())
[('B', 1), ('A', 2), ('C', 3)]
[('A', 2), ('B', 1), ('C', 3)]
排序
glom
glom
x = sc.parallelize(['C','B','A'], 2)
y = x.glom()
print(x.collect())
print(y.collect())
['C', 'B', 'A']
[['C'], ['B', 'A']]
分区合并
cartesian
cartesian
x = sc.parallelize(['A','B'])
y = sc.parallelize(['C','D'])
z = x.cartesian(y)
print(x.collect())
print(y.collect())
print(z.collect())
['A', 'B']
['C', 'D']
[('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')]
类似与笛卡尔积进行组合
groupBy
groupBy
x = sc.parallelize([1,2,3])
y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' )
print(x.collect())
y is nested, this iterates through it
print([(j[0],[i for i in j[1]]) for j in y.collect()])
[1, 2, 3]
[('A', [1, 3]), ('B', [2])]
分组
pipe
pipe
x = sc.parallelize(['A', 'Ba', 'C', 'AD'])
y = x.pipe('grep -i "A"') # calls out to grep, may fail under Windows
print(x.collect())
print(y.collect())
['A', 'Ba', 'C', 'AD']
['A', 'Ba', 'AD']
管道可以输入命令来进行再次操作
foreach
from future import print_function
x = sc.parallelize([1,2,3])
def f(el):
'''side effect: append the current RDD elements to a file'''
f1=open("./foreachExample.txt", 'a+')
print(el,file=f1)
first clear the file contents
open('./foreachExample.txt', 'w').close()
y = x.foreach(f) # writes into foreachExample.txt
print(x.collect())
print(y) # foreach returns 'None'
print the contents of foreachExample.txt
with open("./foreachExample.txt", "r") as foreachExample:
print (foreachExample.read())
[1, 2, 3]
None
3
1
2
循环操作,但是操作的过程可能是并发的并不是按顺序
foreachPartition
foreachPartition
from future import print_function
x = sc.parallelize([1,2,3],5)
def f(parition):
'''side effect: append the current RDD partition contents to a file'''
f1=open("./foreachPartitionExample.txt", 'a+')
print([el for el in parition],file=f1)
first clear the file contents
open('./foreachPartitionExample.txt', 'w').close()
y = x.foreachPartition(f) # writes into foreachExample.txt
print(x.glom().collect())
print(y) # foreach returns 'None'
print the contents of foreachExample.txt
with open("./foreachPartitionExample.txt", "r") as foreachExample:
print (foreachExample.read())
[[], [1], [], [2], [3]]
None
[]
[]
[1]
[2]
[3]
按分布进行循环遍历
reduce
reduce
x = sc.parallelize([1,2,3])
y = x.reduce(lambda obj, accumulated: obj + accumulated) # computes a cumulative sum
print(x.collect())
print(y)
[1, 2, 3]
6
合并
fold
fold
x = sc.parallelize([1,2,3])
neutral_zero_value = 0 # 0 for sum, 1 for multiplication
y = x.fold(neutral_zero_value,lambda obj, accumulated: accumulated + obj) # computes cumulative sum
print(x.collect())
print(y)
[1, 2, 3]
6
折叠
aggregate
aggregate
x = sc.parallelize([2,3,4])
neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x
seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el))
combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1]))
y = x.aggregate(neutral_zero_value,seqOp,combOp) # computes (cumulative sum, cumulative product)
print(x.collect())
print(y)
[2, 3, 4]
(9, 24)
聚集
histogram
histogram (example #1)
x = sc.parallelize([1,3,1,2,3])
y = x.histogram(buckets = 2)
print(x.collect())
print(y)
[1, 3, 1, 2, 3]
([1, 2, 3], [2, 3])
histogram (example #2)
x = sc.parallelize([1,3,1,2,3])
y = x.histogram([0,0.5,1,1.5,2,2.5,3,3.5])
print(x.collect())
print(y)
[1, 3, 1, 2, 3]
([0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5], [0, 0, 2, 0, 1, 0, 2])
输出的第一参数是桶的范围,第二个参数为每一个桶中数据的频数
variance
variance
x = sc.parallelize([1,3,2])
y = x.variance() # divides by N
print(x.collect())
print(y)
[1, 3, 2]
0.666666666667
方差
stdev
stdev
x = sc.parallelize([1,3,2])
y = x.stdev() # divides by N
print(x.collect())
print(y)
[1, 3, 2]
0.816496580928
标准差
sampleStdev
x = sc.parallelize([1,3,2])
y = x.sampleStdev() # divides by N-1
print(x.collect())
print(y)
[1, 3, 2]
1.0
抽样标准差除数为N-1
countByValue
countByValue
x = sc.parallelize([1,3,1,2,3])
y = x.countByValue()
print(x.collect())
print(y)
[1, 3, 1, 2, 3]
defaultdict(<type 'int'>, {1: 2, 2: 1, 3: 2})
top
top
x = sc.parallelize([1,3,1,2,3])
y = x.top(num = 3)
print(x.collect())
print(y)
[1, 3, 1, 2, 3]
[3, 3, 2]
排序取前几个,从大到小
takeOrdered
takeOrdered
x = sc.parallelize([1,3,1,2,3])
y = x.takeOrdered(num = 3)
print(x.collect())
print(y)
[1, 3, 1, 2, 3]
[1, 1, 2]
从小到大排序取值
take
take
x = sc.parallelize([1,3,1,2,3])
y = x.take(num = 3)
print(x.collect())
print(y)
[1, 3, 1, 2, 3]
[1, 3, 1]
不排序直接取
first
first
x = sc.parallelize([1,3,1,2,3])
y = x.first()
print(x.collect())
print(y)
[1, 3, 1, 2, 3]
1
取第一个
collectAsMap
collectAsMap
x = sc.parallelize([('C',3),('A',1),('B',2)])
y = x.collectAsMap()
print(x.collect())
print(y)
[('C', 3), ('A', 1), ('B', 2)]
{'A': 1, 'C': 3, 'B': 2}
将列表转化为map
keys
keys
x = sc.parallelize([('C',3),('A',1),('B',2)])
y = x.keys()
print(x.collect())
print(y.collect())
[('C', 3), ('A', 1), ('B', 2)]
['C', 'A', 'B']
只取出key值
values
只取出value
countByKey
countByKey
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
y = x.countByKey()
print(x.collect())
print(y)
[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
defaultdict(<type 'int'>, {'A': 3, 'B': 2})
统计key的次数
join
join
x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])
y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])
z = x.join(y)
print(x.collect())
print(y.collect())
print(z.collect())
[('C', 4), ('B', 3), ('A', 2), ('A', 1)]
[('A', 8), ('B', 7), ('A', 6), ('D', 5)]
[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7))]
合并
leftOuterJoin
rightOuterJoin
partitionBy
partitionBy
x = sc.parallelize([(0,1),(1,2),(2,3)],2)
y = x.partitionBy(numPartitions = 3, partitionFunc = lambda x: x) # only key is passed to paritionFunc
print(x.glom().collect())
print(y.glom().collect())
[[(0, 1)], [(1, 2), (2, 3)]]
[[(0, 1)], [(1, 2)], [(2, 3)]]
分区,每一个分区是一个列表
combineByKey
combineByKey
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
createCombiner = (lambda el: [(el,el2)])
mergeVal = (lambda aggregated, el: aggregated + [(el,el2)]) # append to aggregated
mergeComb = (lambda agg1,agg2: agg1 + agg2 ) # append agg1 with agg2
y = x.combineByKey(createCombiner,mergeVal,mergeComb)
print(x.collect())
print(y.collect())
[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
通过key进行数据合并
aggregateByKey
aggregateByKey
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
zeroValue = [] # empty list is 'zero value' for append operation
mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)])
mergeComb = (lambda agg1,agg2: agg1 + agg2 )
y = x.aggregateByKey(zeroValue,mergeVal,mergeComb)
print(x.collect())
print(y.collect())
[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
foldByKey
通过key进行聚集
foldByKey
foldByKey
x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])
zeroValue = 1 # one is 'zero value' for multiplication
y = x.foldByKey(zeroValue,lambda agg,x: agg*x ) # computes cumulative product within each key
print(x.collect())
print(y.collect())
[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]
[('A', 60), ('B', 2)]
按key值折叠
groupByKey
groupByKey
x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])
y = x.groupByKey()
print(x.collect())
print([(j[0],[i for i in j[1]]) for j in y.collect()])
[('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)]
[('A', [3, 2, 1]), ('B', [5, 4])]
flatMapValues
flatMapValues
x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])
y = x.flatMapValues(lambda x: [i**2 for i in x]) # function is applied to entire value, then result is flattened
print(x.collect())
print(y.collect())
[('A', (1, 2, 3)), ('B', (4, 5))]
[('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)]
对map的值进行操作,并拆分为单维map
mapValues
仅仅对map值操作,其他不改变
groupWith
groupWith
x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])
y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))])
z = sc.parallelize([('D',9),('B',(8,8))])
a = x.groupWith(y,z)
print(x.collect())
print(y.collect())
print(z.collect())
print("Result:")
for key,val in list(a.collect()):
print(key, [list(i) for i in val])
[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]
[('B', (7, 7)), ('A', 6), ('D', (5, 5))]
[('D', 9), ('B', (8, 8))]
Result:
D [[], [(5, 5)], [9]]
C [[4], [], []]
B [[(3, 3)], [(7, 7)], [(8, 8)]]
A [[2, (1, 1)], [6], []]