spark programming

2017-08-11  本文已影响0人  xncode

driver进程用于运行用户的主程序,然后在集群的机子上分布执行并行操作。

概念

RDD

RDD resilient distributed dataset,是分布在集群节点中的各数据元素分片的集合,可被并行地操作。

RDD是通过读取hdfs中的文件或是通过已经存在的集合转换。

shared variables

在分布式执行时,传递的是变量的复制,如果需要在任务之间共享的:

broadcast variables

accumulators

连接

SparkContext是用于告知Spark如何连接到集群中

conf = SparkConf().setAppName(appName)
# 但是首先得创建一个SparkConf
# 可以在此处直接调用setmaster设置运行方式 但是一般会在运行时通过参数设置
sc = SparkContext(conf=conf)

如果使用的是shell,则已经有了创建好的SparkContext sc来使用,不能再次创建。可在运行是加--py-file、--packages、--repositories来添加python依赖。

RDD

创建的两种方式

parallelized collections

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
# 会根据集群的配置情况自动分片
# 然后复制到各节点来形成分布式的数据集 可以并行地操作

值得注意的是parallelize可接受第二个参数来设置分片的数量

parallelize(data, 10)

external dataset

distFile = sc.textFile("data.txt")
# 读取text文件 可以使用hdfs s3n的uri
# 如果使用的是本地文件路径 需要所有worker的对应路径上都有
# 支持文件路径 文件名称通配符 压缩 
textFile("/my/directory")
textFile("/my/directory/*.txt")
textFile("/my/directory/*.gz")

textfile也可以接受第二个参数声明文件分片大小 默认是128MB

除了textfile外还可以使用

wholeTextFiles可以读取路径下的所有文件作为键值对返回(一般是处理目录下包含多个小文件的情况)

saveAsPickleFile pickleFile 可以按python的Pickle方式存取 默认的batch大小是10

rdd.saveAsSequenceFile()
sc.sequenceFile()
sequenceFile和HDFS

operation

有两种操作类型:

变换:从已存在的dataset中创建出来
动作:通过一定的操作计算后的返回值

basic

lines = sc.textFile("")
lineLength = lines.map(lambda s: len(s))
lineLength.persist()
totalLength = lineLength.reduce(lambda a, b: a+b)

传递

lambda
本地函数(作用域内定义的函数)
全局函数

虽然说可以传递类的方法,但是这样会传递整个对象。如果用到了类,最好是把使用到的类中的东西接出来到局部变量中然后传递。

作用域

如果定义了一个函数,然后通过rdd的foreach传递运行该函数,如果在函数中引用的是driver的全局变量,则可能会有问题。

在调用分布式函数之前,spark会计算该任务的作用域,即必须对执行器可见的变量和方法,然后把该作用域序列化并传递给各个执行器。

传递给执行器的是一份复制的变量,每个执行器操作的是他自己的变量,所以driver中的全局变量不变。

但是如果是在本地运行的同时是在一个jvm中,那么全局变量可能是会被修改的。但是应该是accumulater来实现这一功能。

对于变量的打印,如果在集群模式下运行,打印的输出是各个节点。正确的方式是先调用collect方法来收集到本地。如果只想看一些元素,可以调用take

变换

map(func) 把func作用于rdd中的每个元素 返回
filter(func) 返回func为true的元素
flatmap(func) func的返回值是 seq,把func作用到rdd中的每个元素
mapPartitions(func)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed) 抽样
union(rdd) 合集
intersection(rdd) 交集
distinct([numTasks])
groupByKey reduceByKey aggregateByKey SortByKey
join
cogroup
cartesian
pipe
coalesce
repartition
repartitionAndSortWithinPartitions

动作

reduce(func) func接受两个参数然后返回一个值
collect()
count()
first()
take(n)
takeSample(withReplacement, num, seed)
takeOrdered(n, [ordering])
saveAsTextFile(path)
saveAsSequenceFile(path)
saveAsObjectFile(path)
countByKey()
foreach(func)

shuffle operation

是spark重新分布数据的机制 通常会触发执行器和机器的数据复制,是一个耗时、复杂的动作,包含:repartition coalesce groupByKey reduceByKey cogroup join

RDD持久化

MEOMORY_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
DISK_ONLY
MEMORY_ONLY_2 MEMORY_AND_DISK_2...

spark会在一些shuffle操作时自动持久化,例如reducebykey

可以显式调用unpersist

Shared Variable

Broadcast Variables

broadcast = sc.broadcast([1, 2, 3])
broadcast.value

Accumulators

只有driver可以读取accumulator的数据,其他执行器只能加
上一篇下一篇

猜你喜欢

热点阅读