Spark Python API Docs(part one)
pyspark package
subpackages
- pyspark.sql module
- pyspark.streaming module
- pyspark.ml package
- pyspark.mllib package
contents
PySpark是Spark的Python API。
Public classes:
- SparkContext:
Spark功能的主要入口点。 - RDD:
弹性分布式数据集(RDD),Spark中的基本抽象。 - Broadcast:
一个广播变量,可以在任务间重用。 - Accumulator:
任务只能添加值的“add-only”共享变量。 - SparkConf:
用于配置Spark。 - SparkFiles:
访问作业附带的文件。 - StorageLevel:
更细粒度的缓存持久化级别。 - TaskContext:
有关当前正在运行的任务的信息,可在workers和实验室中获得。
class pyspark.SpackConf(loadDefaults=True,_jvm=None,_jconf=None)
Spark应用程序的配置。 用于设置spark变量,参数以键值对的形式传递。大多数情况下,您将使用SparkConf()创建一个SparkConf对象,该对象将从spark.* Java系统属性中加载值。 在这种情况下,您直接在SparkConf对象上设置的任何参数都优先于系统属性。
对于单元测试,您也可以调用SparkConf(false)来跳过加载外部设置并获取相同的配置,而不管系统属性是什么。这个类中的所有setter方法都支持链接。 例如,你可以写conf.setMaster("local").setAppName("Myapp")。
- 注意:一旦SparkConf对象被传递给Spark,它就被克隆,不能再被用户修改。
- contains(key)
这个配置是否包含给定的key? - get(key, defaultValue=None)
获取配置中某个key的value,否则返回默认值。 - getAll()
获取所有的值,以key-value pairs list的形式。 - set(key, value)
设置配置属性。 - setAll(pairs)
设置多个参数,以key-value pairs list的形式传递。 -
setAppName(value)
设置application的name。 - setExecutorEnv(key=None, value=None, pairs=None)
设置要传递给executors的环境变量。 - setIfMissing(key,value)
设置配置属性(如果尚未设置)。 -
setMaster(value)
设置要连接到的 master URL。 - setSparkHome(value)
设置在worker nodes安装spark的path - toDebugString()
以key=value pairs list的形式返回配置的可打印版本,每行一个。
class pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)
Spark功能的主要入口点。 SparkContext表示与Spark集群的连接,可用于在该集群上创建RDD和广播变量。
PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
- accumulator(value, accum_param=None)
用给定的初始值创建一个累加器(accumulator),如果提供的话,使用给定的AccumulatorParam 帮助对象来定义如何 add 某种数据类型的值。 默认AccumulatorParams用于整数和浮点数,如果你没有提供。 对于其他类型,可以使用自定义的AccumulatorParam。 - addFile(path, recursive=False)
在每个节点上添加一个文件,用这个Spark job下载。传递的路径可以是本地文件,HDFS中的文件(或其他Hadoop支持的文件系统),也可以是HTTP,HTTPS或FTP URI。要访问Spark jobs中的文件,请使用文件名L {SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>}来查找其下载位置。如果递归(recursive)选项设置为True,则可以给出一个目录。 当前目录仅支持Hadoop支持的文件系统。
#下面代码通过键入 bin/pysaprk 命令进入交互式模式运行,sc为sparkContext()的实例对象,spark已经为我们自动创建好了。
import os
from pyspark import SparkFiles
with open('/home/test.txt', 'w') as f: #在本地建一个文件并写入内容
f.write('100')
#加载文件,若是hdfs中的文件可以这样写:
# hdfs://hostname:port/file/path
# 例如 hdfs://master:9000/user/hadoop/test.txt
sc.addFile('/home/test.txt')
def func(iterator):
with open(SparkFiles.get('test.txt')) as testFile:
fileVal = int(testFile.readline())
return [x * fileVal for x in iterator]
sc.parallelize([1,2,3,4]).mapPartitions(func).collect() #创建一个RDD并把函数func作用到每个元素上,最后输出结果。
[100, 200, 300, 400] #output
- addPyFile(path)
为将来在此SparkContext上执行的所有tasks添加一个.py或.zip依赖项。 传递的路径(path)可以是本地文件,HDFS中的文件(或Hadoop支持的其他文件系统),也可以是HTTP,HTTPS或FTP URI。 - applicationId
Spark应用程序的唯一标识符。 其格式取决于调度程序的实现。
- 例如 local spark app something like 'local-1433865536131'
- in case of YARN something like ‘application_1433865536131_34483’
sc.applicationId
u'local-1513691270280' #output
- binaryFiles(path, minPartitions=None)
从HDFS,本地文件系统(所有节点上都可用)或Hadoop支持的任何文件系统URI读取二进制文件的目录作为字节数组。 每个被读取的文件作为单个记录,并返回一个key-value pair,其中key是每个文件的路径,value是每个文件的内容。
- Note:小文件是首选,大文件也是允许的,但可能会导致性能不佳。
- binaryRecords(path, recordLength)
- Note: 实验阶段(Experimental)
从一个扁平的二进制文件加载数据,假定每个记录都是一组具有指定数字格式的数字(请参阅ByteBuffer),并且每个记录的字节数是恒定的。
参数(Parmenters):
path - 输入数据文件的目录;
recordLength - 记录的分割长度
-
broadcast(value)
将一个只读变量广播到集群,返回一个L {Broadcast <pyspark.broadcast.Broadcast>}对象来在分布式函数中读取它。 变量只会发送到每个群集一次。 - cancelAllJobs()
取消所有已被调度的或正在运行的作业。 - cancelJobGroup(groupId)
取消指定组的活动作业。 有关更多信息,请参见SparkContext.setJobGroup。 - defaultMinPartitions
当用户不给定时,Hadoop RDD分区的默认最小数量。 - defaultParallelism
当用户没有给出时,默认的并行度级别(例如 reduce tasks) - dump_profiles(path)
将配置文件转储到目录路径 - emptyRDD()
创建一个没有分区或元素的RDD。 - getConf()
- getLocalProperty(key)
获取在此线程中设置的本地属性,如果缺失,则返回null。 请参阅setLocalProperty
classmethod getOrCreate(conf=None)
获取或实例化一个SparkContext并将其注册为一个单例对象。
parameters:
conf - SparkConf(optional) - hadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None,batchSize=0)
使用HDFS,本地文件系统(所有节点上都可用)或任何Hadoop支持的文件系统URI读取具有任意键和值类的“old”Hadoop InputFormat。 机制与sc.sequenceFile相同。Hadoop配置可以作为Python字典的形式传入。 这将被转换成Java中的配置。
parameters:
path - hadoop 文件的路径;
inputFormatClass - Hadoop InputFormat的完全限定类名(e.g. "org.apache.hadoop.mapred.TextInputFormat");
keyClass - key Writable类的完全限定类名(e.g. "org.apache.hadoop.io.Text");
valueClass - value Writable类的完全限定类名(e.g. "org.apache.hadoop.io.LongWritable");
keyConverter - (None by default);
valueConverter - (None by default);
conf - 配置Hadoop,以dict的形式传入(None by default);
batchSize - 表示为单个Java对象的Python对象的数量。(默认为0,batchSize自动选择) - hadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
从任意的Hadoop配置中读取具有任意键和值类的“old”Hadoop InputFormat,以Python dict 的形式传入。 这将被转换成Java中的配置。 机制与sc.sequenceFile相同。 - newAPIHadoopFile(path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
使用HDFS,本地文件系统(所有节点上都可用)或任何Hadoop支持的文件系统URI读取具有任意键和值类的“new API”Hadoop InputFormat。 机制与sc.sequenceFile相同。hadoop 配置可以以python中dict的形式被传入,这将被转换成Java中的配置。 - newAPIHadoopRDD(inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0)
从任意的Hadoop配置中读取具有任意键和值类的“new API”Hadoop InputFormat,以Python dict 的形式传入。 这将被转换成Java中的配置。 机制与sc.sequenceFile相同。 -
parallelize(c, numSlices=None)
分发本地Python集合转成RDD形式。 如果输入表示性能范围,则建议使用xrange。
sc.parallelize([0, 2, 3, 4, 6], 5) #分成5个partitions
[[0], [2], [3], [4], [6]] # output
sc.parallelize(xrange(0, 6, 2),5)
[[], [0], [], [2], [4]] # output
- pickleFile(name, minPartitions=None)
加载之前使用RDD.saveAsPickleFile方法保存的RDD。
from tempfile import NamedTemporaryFile
#创建一个有名字的临时文件,并自动删除
tmpFile = NamedTemporaryFile(delete=True)
tmpFile.close() #关闭这个临时文件
sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
sorted(sc.pickleFile(tmpFile.name, 3).collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] #output
- range(start, end=None, step=1, numSlices=None)
创建一个新的包含从开始到结束(独占)的元素的int类型的RDD,通过step增加每个元素。 可以像python内置的range()函数一样调用。 如果使用单个参数调用,参数被解释为结束,并且start被设置为0。numSlices - 新RDD的分区数量,返回Int类型的RDD。
sc.range(5).collect()
[0, 1, 2, 3, 4]
sc.range(2, 4).collect()
[2, 3]
sc.range(1, 7, 2).collect()
[1, 3, 5]
- runJob(rdd, partitionFunc, partitions=None, allowLocal=False)
在指定的一组分区上执行给定的partitionFunc,并将结果作为元素数组的形式返回。如果未指定“partitions”,则将在所有分区上运行。
myRDD = sc.parallelize(range(6), 3)
sc.runJob(myRDD, lambda part: [x * x for x in part])
[0, 1, 4, 9, 16, 25]
myRDD = sc.parallelize(range(6), 3)
myRDD.glom().collect()
[[0, 1], [2, 3], [4, 5]]
#匿名函数只作用在0和2两个分区上
sc.runJob(myRDD, lambda part: [x * x for x in part],[0,2], True)
[0, 1, 16, 25]
- sequenceFile(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)
从HDFS,本地文件系统(所有节点都可用)或任何Hadoop支持的文件系统URI读取任意键和值 Writable class 的Hadoop SequenceFile。minSplits - 数据集中的最小分割(default min(2, sc.defaultParallelism)) -
setCheckpointDir(dirName)
设置将RDD作为检查点的目录。 如果在群集上运行,该目录必须是HDFS路径。 - setJobGroup(groupId, description, interruptOnCancel=False)
为此线程启动的所有作业分配一个组ID,直到组ID被设置为不同的值或清除。通常,应用程序中的执行单元由多个Spark actions或jobs组成。 应用程序员可以使用这种方法将所有这些作业分组在一起,并给出一个组描述。 一旦设置,Spark Web UI将把这些作业与这个组关联起来。应用程序可以使用SparkContext.cancelJobGroup取消该组中的所有正在运行的jobs。
import threading
from time import sleep
result = "Not Set"
lock = threading.Lock() #定义一个锁
def map_func(x):
sleep(100)
raise Exception("Task should have been cancelled")
def start_job(x):
global result
try:
sc.setJobGroup("job_to_cancel", "some description")
result = sc.parallelize(range(x)).map(map_func).collect()
except Exception as e:
result = "Cancelled"
lock.release() #打开锁,释放资源
def stop_job():
sleep(5)
sc.cancelJobGroup("job_to_cancel")
supress = lock.acquire() #锁定资源
supress = threading.Thread(target=start_job, args=(10, )).start()
supress = threading.Thread(target=stop_job).start()
supress = lock.acquire()
print(result)
Cancelled #output
如果作业组的interruptOnCancel设置为true,那么作业取消将导致在作业的执行程序线程上调用Thread.interrupt()。 这有助于确保任务实际上被及时停止,但由于HDFS-1208的原因,默认情况下会关闭,HDFS可能会通过将节点标记为死亡来响应Thread.interrupt()。
- setLocalProperty(key, value)
设置一个本地属性,将影响从此线程提交的作业,例如Spark Fair Scheduler pool。 - setLogLevel(logLevel)
控制我们的logLevel。 这将覆盖任何用户定义的日志设置。 有效的日志级别包括:ALL,DEBUG,ERROR,FATAL,INFO,OFF,TRACE,WARN
classmethod setSystemProperty(key, value)
设置一个Java系统属性,如spark.executor.memory。 这必须在实例化SparkContext之前调用。 - show_profiles()
将配置文件统计信息打印到标准输出 - sparkUser()
为用户获取SPARK_USER,运行SparkContext的用户。 - startTime
返回Spark Context启动时的纪元时间。 - statusTracker()
返回StatusTracker对象 - stop()
终止SparkContext -
textFile(name, minPartitions=None, use_unicode=True)
从HDFS、本地文件系统(所有节点都可用)或任何Hadoop支持的文件系统URI读取文本文件,并将其作为字符串的RDD返回。如果use_unicode为False,则字符串将保持为str(编码为utf-8),这比unicode更快更小。(在Spark 1.2中添加)
with open('/home/test.txt', 'w') as testFile:
testFile.write("Hello world!")
#读取本地文件。例子: file:/home/test.txt
testFile = sc.textFile('file:/home/test.txt')
testFile.collect()
[u'Hello world!'] #output
- uiWebUrl
返回由此SparkContext启动的SparkUI实例的URL -
union(rdds)
建立一个RDD列表的联合。这支持具有不同序列化格式的RDDs的unions(),尽管这迫使它们使用默认序列化器被重新序列化。
path = os.path.join('/home/', 'union-text.txt')
with open(path, 'w') as testFile:
testFile.write("hello")
textFile = sc.textFile(path)
textFile.collect()
[u'Hello'] #output
parallelize = sc.parallelize(["World!"])
#union类似于列表的append方法
sorted(sc.union([textFile, parallelize]).collect())
[u'Hello', 'World!'] #output
- version
运行此应用程序的Spark版本。 -
wholeTextFiles(path, minPartitions=None, use_unicode=True)
从HDFS,本地文件系统(所有节点都可用)或任何支持Hadoop的文件系统URI读取文本文件的目录。 每个文件被读取为单个记录,并返回一个键值对,其中键是每个文件的路径,值是每个文件的内容。如果use_unicode为False,则字符串将保持为str(编码为utf-8),这比unicode更快更小,(在Spark 1.2中添加)。小文件是首选,因为每个文件将被完全加载到内存中。
dirPath = os.path.join(/home/, 'files')
os.mkdir(dirPath)
with open(os.path.join(dirPath, '1.txt'), 'w') as file1:
file1.write('1')
with open(os.path.join(dirPath, '2.txt'), 'w') as file2:
file2.write('2')
textFiles = sc.wholeTextFiles(dirPath)
sorted(textFiles.collect())
[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]
class pyspark.SparkFiles
解析通过L {SparkContext.addFile()<pyspark.context.SparkContext.addFile>}添加的文件的路径。parkFiles仅包含类方法; 用户不应该创建SparkFiles实例。
classmethod get(filename)
获取通过SparkContext.addFile()添加的文件的绝对路径。
classmethod getRootDirectory()
获取通过SparkContext.addFile()添加的文件的根目录。
class pyspark.RDD(jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer()))
弹性分布式数据集(RDD),Spark中的基本抽象。 表示可以并行操作的不可变分区元素集合。
-
aggregate(zeroValue, seqOp, combOp)
先聚合每个分区中的元素,然后使用给定的组合函数和一个中性“零值”对所有分区的结果进行聚合。
函数op(t1,t2)允许修改t1并将其返回为结果值,以避免对象分配; 但是,该函数不应该修改t2。
第一个函数(seqOp)可以返回与此RDD类型不同的结果类型U. 因此,我们需要一个合并T到U的操作和一个合并两个U的操作。
seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
(10, 4) #output
sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
(0, 0) #output
解释如下:
假设[1,2,3,4]被分成两个分区,为 分区1([1,2]),分区2([3,4])
seqOp作用在每一个分区上
首先用seqOp对分区1进行操作:
x=(0,0) y=1 --> (1,1) #对分区进行第一次seqOp操作时,x为zero value
x=(1,1) y=2 --> (3,2) #对分区进行的第二次及以后的seqOp操作,x为前一次seqOp的执行结果
同样对分区2进行操作:
x=(0,0) y=3 --> (3,1)
x=(3,1) y=4 --> (7,2)
然后用combOp对两个分区seqOp作用后的结果进行操作:
分区1:
x=(0,0) y=(3,2) > (3,2) #对第一个分区进行combOp操作时,x为zero value
分区2:
x=(3,2) y=(7,2) > (10,4) #对第二个及以后分区进行combOp操作时,x为前一分区combOp处理后的结果
-
aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
使用给定的combine(组合)函数和一个中性的“零值”来聚合每个键的值。这个函数可以返回一个不同的结果类型U,比这个RDD中的值的类型V.因此,我们需要一个操作来合并一个V到一个U和一个合并两个U的操作,前一个操作用于 合并分区内的值,后者用于合并分区之间的值。 为了避免内存分配,这两个函数都允许修改并返回它们的第一个参数,而不是创建一个新的U. -
cache()
使用默认存储级别(MEMORY_ONLY)持久化RDD。 - cartesian(other)
返回一个RDD和另一个RDD的笛卡尔乘积,即所有元素对(a,b)的RDD,其中a在自身中,b在另一个中。
rdd = sc.parallelize([1, 2])
sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)] #output
-
checkpoint()
将此RDD标记为检查点。 它将被保存到使用SparkContext.setCheckpointDir()设置的检查点目录内的文件中,并且对其父RDD的所有引用都将被删除。 在此RDD上执行任何作业之前,必须调用此函数。 强烈建议将此RDD保存在内存中,否则将其保存在文件中将需要重新计算。 -
coalesce(numPartitions, shuffle=False)
返回一个减少到numPartitions分区的新RDD。
把RDD的分区数降低到通过参数numPartitions指定的值。在得到的更大一些数据集上执行操作,会更加高效。
sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
[[1], [2, 3], [4, 5]]
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
[[1, 2, 3, 4, 5]]
-
cogroup(other, numPartitions=None)
对于自己或其他中的每个关键字k,返回一个包含一个元组的结果RDD,以及该关键字在自身以及其他值中的值列表。
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
[(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
[('a', ([1], [2])), ('b', ([4], []))]
-
collect()
返回包含此RDD中所有元素的列表。
在驱动程序中,以数组的形式返回数据集的所有元素。通常用于filter或其它产生了大量小数据集的情况。
- Note:因为所有的数据都被加载到驱动程序的内存中,所以只能在结果数组很小的情况下使用此方法。
-
collectAsMap()
将此RDD中的键值对作为字典返回给master。
- Note: 因为所有的数据都被加载到驱动程序的内存中,所以只有在结果数据很小的情况下才能使用此方法。
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
print(m)
{1: 2, 3: 4}
m[1]
2
-
combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
通用函数使用一组自定义的聚合函数来组合每个键的元素。
将RDD [(K,V)]转换为类型RDD [(K,C)]的结果,以用于“combined type”C.
使用者提供三个函数:
- createCombiner,which turns a V into a C (e.g., creates a one-element list)
- mergeValue,to merge a V into a C (e.g., adds it to the end of a list)
- mergeCombiners,to combine two C’s into a single one (e.g., merges the lists)
为了避免内存分配,mergeValue和mergeCombiners都允许修改并返回它们的第一个参数,而不是创建一个新的C.
另外,用户可以控制输出RDD的分区。
x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
def to_list(a):
return [a]
def append(a, b):
a.append(b)
return a
def extend(a, b):
a.extend(b)
return a
sorted(x.combineByKey(to_list, append, extend).collect())
[('a', [1, 2]), ('b', [1])]
解释如下:
由于聚合操作会遍历分区中所有的元素,
因此每个元素(这里指的是键值对)的键只有两种情况:出现过和没出现过。
如果以前没出现过,则执行的是createCombiner函数,
在该例子中首先是('a',1)中key:a没有出现过,
我们执行createConmbiner函数作用在value:1上,变成了列表[1]。('b',1)同理。
如果key以前出现过就执行mergeValue函数作用在value上,
('a',2),由于key:a以前出现过所以就把value:2 append到列表[1]上,结果为[1,2]。
mergeCombiners函数是作用在每个分区上,因为只有一个分区,所以并没有用上这个函数。
- context
此RDD创建的SparkContext。 -
count()
返回此RDD中的元素数量。
sc.parallelize([2, 3, 4]).count()
3
- countApprox(timeout, confidence=0.95)
- Note: 实验阶段
count()的近似版本,即使并非所有任务都已完成,在超时内返回潜在的不完整结果。
- countApproxDistinct(relativeSD=0.05)
- Note: 实验阶段
返回RDD中不同元素的近似数量。
-
countByKey()
计算每个键的元素数量,并将结果作为字典返回给master。
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)]
-
countByValue()
将此RDD中每个唯一值的计数返回为(值,计数)对的字典。
sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
[(1, 2), (2, 3)] -
distinct(numPartitions=None)
返回包含此RDD中不同元素的新RDD。去重。
sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1,2,3]
-
filter(f)
返回仅包含满足f的元素的新RDD。
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd.filter(lambda x: x % 2 == 0).collect()
[2,4]
-
first()
返回此RDD中的第一个元素。
sc.parallelize([2, 3, 4]).first()
2
-
flatMap(f, preservesPartitioning=False)
通过首先将一个函数应用于此RDD的所有元素,然后展平结果,返回一个新的RDD。
rdd = sc.parallelize([2, 3, 4])
rdd.flatMap(lambda x: range(1, x)).collect()
[1, 1, 2, 1, 2, 3]
sc.parallelize([(1,2),(3,4)]).flatMap(lambda x: x).collect()
[1, 2, 3, 4]
sc.parallelize([(1,2),(3,4)]).map(lambda x: x).collect()
[(1, 2), (3, 4)]
-
flatMapValues(f)
通过flatMap函数传递键值对RDD中的每个值而不更改键; 这也保留了原来的RDD的分区。
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
def f(x):
return x
x.flatMapValues(f).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
- fold(zeroValue, op)
使用给定的关联函数和中性“零值”来聚合每个分区的元素,然后聚合所有分区的结果。
函数op(t1,t2)允许修改t1并将其作为结果值返回,以避免对象分配; 但是,它不应该修改t2。
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15
- foldByKey(zeroValue, func, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
from operator import add
sorted(rdd.foldByKey(0, add).collect())
[('a', 2), ('b', 1)]
-
foreach(f)
将一个函数应用于此RDD的所有元素。
def f(x):
print(x)
sc.parallelize([1, 2, 3]).foreach(f)
3
2
1
- foreachPartition(f)
将此功能应用于此RDD的每个分区。 - fullOuterJoin(other, numPartitions=None)
执行self和other的右外连接。
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("c", 8)])
sorted(x.fullOuterJoin(y).collect())
[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
- getCheckpointFile()
获取此RDD检查点所在的文件的名称
RDD在本地检查点不定义。 - getNumPartitions()
返回RDD中的分区数量 - getStorageLevel()
获取RDD的当前存储级别。 - glom()
将通过合并每个分区内的所有元素创建的RDD返回到列表中。
rdd = sc.parallelize([1, 2, 3, 4], 2)
sorted(rdd.glom().collect())
[[1, 2], [3, 4]]
- groupBy(f, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
通过函数f对RDD进行分组。
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result = rdd.groupBy(lambda x: x % 2).collect()
sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]
- groupByKey(numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
- groupWith(other, *others)
cogroup的别名,但支持多个RDD。 - histogram(buckets)
使用提供的桶计算直方图。 除了最后一个关闭之外,桶都向右开放。
桶必须被排序,不包含任何重复,并且至少有两个元素。
rdd = sc.parallelize(range(51))
rdd.histogram(2) #两个桶
([0, 25, 50], [25, 26])
#第一个桶在区间[1,25),包含25个元素,第二个桶在区间[25,50],包含26个元素。
- id()
此RDD的唯一ID(在其SparkContext中)。 - intersection(other)
返回此RDD与另一个的交集。
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
rdd1.intersection(rdd2).collect()
[1, 2, 3]
- isCheckpointed()
返回这个RDD是否是检查点和物化的,可靠的或本地的。 - isEmpty()
当且仅当RDD完全不包含任何元素时才返回true。 - isLocallyCheckpointed()
返回此RDD是否标记为本地检查点。 - join(other, numPartitions=None)
返回一个RDD,其中包含所有自身和其他匹配键的元素对。
用于操作两个键值对格式的数据集,操作两个数据集(K,V)和(K,W)返回(K, (V, W))格式的数据集。通过leftOuterJoin、rightOuterJoin、fullOuterJoin完成外连接操作。
每一对元素将作为(k,(v1,v2))元组返回,其中(k,v1)是自身的,(k,v2)是另一个的。
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())
[('a', (1, 2)), ('a', (1, 3))]
- keyBy(f)
通过应用函数f来创建此RDD中元素的元组。
x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
x.collect()
[(0, 0), (1, 1), (4, 2)]
- keys()
每个元组的键返回一个RDD。
m = sc.parallelize([(1, 2), (3, 4)]).keys()
m.collect()
[1, 3]
- leftOuterJoin(other, numPartitions=None)
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("c", 8)])
x.fullOuterJoin(y).collect()
[('a', (1, 2)), ('c', (None, 8)), ('b', (4, None))]
x.leftOuterJoin(y).collect()
[('a', (1, 2)), ('b', (4, None))]
x.rightOuterJoin(y).collect()
[('a', (1, 2)), ('c', (None, 8))]
- localCheckpoint()
使用Spark的现有缓存层将此RDD标记为本地检查点。 - lookup(key)
返回RDD中键值的列表。 如果RDD具有已知的分区程序,则只需搜索键映射到的分区即可高效地执行此操作。
sc.parallelize([('a',1),('b',2)]).lookup('b')
[2]
-
map(f, preservesPartitioning=False)
通过对这个RDD的每个元素应用一个函数来返回一个新的RDD。
rdd = sc.parallelize(["b", "a", "c"])
sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]
- mapPartitions(f, preservesPartitioning=False)
通过对该RDD的每个分区应用一个函数来返回一个新的RDD。
rdd = sc.parallelize([1, 2, 3, 4], 2)
rdd.glom().collect()
[[1, 2], [3, 4]]
def f(iterator):
yield sum(iterator)
rdd.mapPartitions(f).collect()
[3, 7]
- mapPartitionsWithIndex(f, preservesPartitioning=False)
通过对该RDD的每个分区应用函数f,同时跟踪原始分区的索引,返回新的RDD。 - mapPartitionsWithSplit(f, preservesPartitioning=False)
- 弃用,用mapPartitionsWithIndex(f, preservesPartitioning=False)代替
- mapValues(f)
通过映射函数传递键值对RDD中的每个值,而不更改键; 这也保留了原来的RDD的分区。
x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
x.mapValues(f).collect()
[('a', 3), ('b', 1)]
- max(key=None)
找到这个RDD中的最大项。 - mean()
计算这个RDD元素的平均值。 - meanApprox(timeout, confidence=0.95)
- 实验阶段
- min(key=None)
找到此RDD中的最小项。 - name()
返回此RDD的名称。 - partitionBy(numPartitions, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
使用指定的分区程序返回分区的RDD副本。 - persist(storageLevel=StorageLevel(False, True, False, False, 1))
设置此RDD的存储级别以便在第一次计算之后在操作之间保留其值。 如果RDD尚未设置存储级别,则只能用于分配新的存储级别。 如果未指定存储级别,则默认为(MEMORY_ONLY)。 - pipe(command, env=None, checkCode=False)
以管道(pipe)方式将 RDD的各个分区(partition)使用 shell命令处理(比如一个 Perl或 bash脚本)。 RDD的元素会被写入进程的标准输入(stdin),将进程返回的一个字符串型 RDD(RDD of strings),以一行文本的形式写入进程的标准输出(stdout)中。 - randomSplit(weights, seed=None)
根据提供的权重随机分配这个RDD。
rdd = sc.parallelize(range(500), 1)
rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
rdd1.count()
213
rdd2.count()
287
- reduce(f)
使用函数func聚集数据集中的元素,这个函数func输入为两个元素,返回为一个元素。这个函数应该符合结合律和交换律,这样才能保证数据集中各个元素计算的正确性。
from operator import add
sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
- reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>)
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
sorted(rdd.reduceByKey(add).collect())
[('a', 3), ('b', 1)]
- reduceByKeyLocally(func)
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
rdd.reduceByKeyLocally(add)
{'a': 3, 'b': 1}
- repartition(numPartitions)
- repartitionAndSortWithinPartitions(numPartitions=None, partitionFunc=<function portable_hash at 0x7f51f1ac0668>, ascending=True, keyfunc=<function <lambda> at 0x7f51f1ab3ed8>)
根据给定的分区器对RDD进行重新分区,在每个结果分区中,按照key值对记录排序。这在每个分区中比先调用repartition再排序效率更高,因为它可以将排序过程在shuffle操作的机器上进行。 - rightOuterJoin(other, numPartitions=None)
- sample(withReplacement, fraction, seed=None)
对数据采样。用户可以设定是否有放回(withReplacement)、采样的百分比(fraction)、随机种子(seed)。 - sampleByKey(withReplacement, fractions, seed=None)
- sampleStdev()
计算RDD元素的样本方差(通过除以N-1而不是N来估计方差中的偏差)。 - saveAsHadoopDataset(conf, keyConverter=None, valueConverter=None)
- saveAsHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None)
- saveAsNewAPIHadoopDataset(conf, keyConverter=None, valueConverter=None)
- saveAsNewAPIHadoopFile(path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None)
- saveAsPickleFile(path, batchSize=10)
- saveAsSequenceFile(path, compressionCodecClass=None)
-
saveAsTextFile(path, compressionCodecClass=None)
将此RDD保存为文本文件,使用元素的字符串表示形式。
sc.parallelize(range(10)).saveAsTextFile('file:/home/test')
- setName(name)
为此RDD指定一个名称。 - sortBy(keyfunc, ascending=True, numPartitions=None)
按给定的keyfunc对此RDD进行排序。 - sortByKey(ascending=True, numPartitions=None, keyfunc=<function <lambda> at 0x7f51f1ab5050>)
- stats()
返回一个StatCounter对象,在一次操作中捕获RDD元素的均值,方差和计数。
sc.parallelize([1,2,3,4,5]).stats()
(count: 5, mean: 3.0, stdev: 1.41421356237, max: 5, min: 1)
- stdev()
计算这个RDD元素的标准偏差。 - subtract(other, numPartitions=None)
返回一个值,这个值包含在self中,且不包含在other中。
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]
- subtractByKey(other, numPartitions=None)
- sum()
- sumApprox(timeout, confidence=0.95)
- 实验阶段
- take(num)
返回RDD中前num个元素。 - takeOrdered(num, key=None)
返回排序之后的前num个元素。
返回RDD按自然顺序或自定义顺序排序后的前n个元素。 - takeSample(withReplacement, num, seed=None)
随机取num个元素。withReplacement若为True表示有放回的取样。
对一个数据集随机抽样,返回一个包含num个随机抽样元素的数组,参数withReplacement指定是否有放回抽样,参数seed指定生成随机数的种子。 - toDebugString()
- toLocalIterator()
返回包含此RDD中所有元素的迭代器。 迭代器将消耗与RDD中最大分区一样多的内存。
rdd = sc.parallelize(range(10))
[x for x in rdd.toLocalIterator()]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
- top(num, key=None)
- treeAggregate(zeroValue, seqOp, combOp, depth=2)
以多级树形模式聚合此RDD的元素。 - treeReduce(f, depth=2)
- union(other)
rdd = sc.parallelize([1, 1, 2, 3])
rdd.union(rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]
- unpersist()
将RDD标记为非持久性,并从内存和磁盘中删除所有块。 - values()
m = sc.parallelize([(1, 2), (3, 4)]).values()
m.collect()
[2, 4]
- variance()
计算RDD元素的方差。 - zip(other)
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
x.zip(y).collect()
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
- zipWithIndex()
- zipWithUniqueId()
class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)
用于控制RDD存储的标志。 每个StorageLevel记录是否使用内存,如果内存不足,是否将RDD丢弃到磁盘,是否以特定于JAVA的序列化格式将数据保存在内存中,以及是否在多个节点上复制RDD分区。 还包含一些常用存储级别MEMORY_ONLY的静态常量。 由于数据总是在Python端序列化,所有的常量使用序列化的格式。
DISK_ONLY = StorageLevel(True, False, False, False, 1)
DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
MEMORY_AND_DISK = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False, 1)
MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
MEMORY_ONLY = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
MEMORY_ONLY_SER = StorageLevel(False, True, False, False, 1)
MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
OFF_HEAP = StorageLevel(True, True, True, False, 1)
class pyspark.Broadcast(sc=None, value=None, pickle_registry=None, path=None)
使用SparkContext.broadcast()创建的广播变量。 通过value
方法获取值。
- destroy()
销毁与此广播变量相关的所有数据和元数据。 谨慎使用; 广播变量一旦被销毁,就不能再使用。 这个方法阻塞直到销毁完成。 - dump(value, f)
- load(path)
- unpersist(blocking=False)
在每个executor上删除这个广播的缓存副本。 如果在调用之后使用广播,则需要将其重新发送给每个executor。 - value
返回广播的值
class pyspark.Accumulator(aid, value, accum_param)
可以累积的共享变量,即具有可交换和关联的“add”操作。 Spark群集上的worker tasks可以使用+ =操作符将值添加到累加器,但只有驱动程序可以访问其值。 workers的更新会自动传播给驱动程序。
虽然SparkContext支持像int和float这样的原始数据类型的累加器,但是用户也可以通过提供一个自定义的AccumulatorParam对象来为自定义类型定义累加器。
- add(term)
- value
获取累加器的值; 只能在driver program中使用。
class pyspark.AccumulatorParam
帮助对象,定义如何累积给定类型的值。
- addInPlace(value1, value2)
添加累加器数据类型的两个值,返回一个新值; 为了效率,也可以更新value1并返回它。 - zero(value)
class pyspark.MarshalSerializer
使用Python的Marshal序列化器序列化对象:
http://docs.python.org/2/library/marshal.html
这个序列化器比PickleSerializer更快,但支持更少的数据类型。
- dumps(obj)
- loads(obj)
class pyspark.PickleSerializer
使用Python的pickle序列化器序列化对象:
http://docs.python.org/2/library/pickle.html
这个序列化程序支持几乎所有的Python对象,但是可能不像更专用的序列化程序那么快。
- dumps(obj)
- loads(obj, encoding=None)
class pyspark.StatusTracker(jtracker)
用于monitoring job and stage progress的Low-level报告API。
这些API有意提供非常弱的一致性语义; 这些API的消费者应该准备好处理空的/丢失的信息。 例如,作业的阶段id可能是已知的,但状态API可能没有关于这些阶段的细节的任何信息,所以getStageInfo可能会返回无效的有效阶段id。
为了限制内存使用量,这些API只提供最近作业/阶段的信息。 这些API将提供最后一个spark.ui.retainedStages阶段和spark.ui.retainedJobs作业的信息。
- getActiveJobsIds()
返回包含所有活动jobs的ids的数组。 - getActiveStageIds()
返回一个包含所有活动stages的id的数组。 - getJobIdsForGroup(jobGroup=None)
返回特定工作组中所有已知作业的列表。 如果jobGroup为None,则返回所有与作业组不相关的作业。
返回的列表可能包含正在运行的,失败的和已完成的作业,并且可能因该方法的调用而有所不同。 该方法不保证结果中元素的顺序。 - getJobInfo(jobId)
返回一个SparkJobInfo对象,如果作业信息找不到或被垃圾回收,则返回None。 - getStageInfo(stageId)
返回SparkStageInfo对象,如果找不到阶段信息或垃圾收集,则返回None。
class pyspark.SparkJobInfo
公开有关Spark作业的信息。
class pyspark.SparkStageInfo
公开有关Spark Stage的信息。
class pyspark.Profiler(ctx)
PySpark支持自定义分析器,这是为了允许使用不同的分析器,以及输出不同于BasicProfiler中提供的格式。
- dump(id, path)
将配置文件转储到路径中,id是RDD id。 - profile(func)
做函数func的分析 - show(id)
将配置文件统计信息打印到标准输出,id是RDD id。 - stats()
返回收集的性能分析统计信息(pstats.Stats)
class pyspark.BasicProfiler(ctx)
BasicProfiler是默认的profiler,它是基于cProfile和Accumulator实现的。
- profile(func)
运行并分析传入的方法to_profile。返回配置文件对象。 - stats()
class pyspark.TaskContext
- 实验阶段