CS190 Scalable Machine Learning
标签(空格分隔): Spark ML
RDDs
• Two types of operations: transformations and actions
• Transformations are lazy (not computed immediately)
• Transformed RDD is executed when action runs on it
• Persist (cache) RDDs in memory or disk
Working with RDDs
• Create an RDD from a data source:
• Apply transformations to an RDD: map filter
• Apply actions to an RDD: collect count
Some Transformations
Transformation | Description |
---|---|
map(func) | return a new distributed dataset formed by passing each element of the source through a function func |
filter(func) | return a new dataset formed by selecting those elements of the source on which func returns true |
distinct([numTasks])) | return a new dataset that contains the distinct elements of the source dataset |
flatMap(func) | similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item) |
例子:
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd.map(lambda x: x * 2)
RDD: [1, 2, 3, 4] → [2, 4, 6, 8]
>>> rdd.filter(lambda x: x % 2 == 0)
RDD: [1, 2, 3, 4] → [2, 4]
>>> rdd2 = sc.parallelize([1, 4, 2, 2, 3])
>>> rdd2.distinct()
RDD: [1, 4, 2, 2, 3] → [1, 4, 2, 3]
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.Map(lambda x: [x, x+5])
RDD: [1, 2, 3] → [[1, 6], [2, 7], [3, 8]]
>>> rdd.flatMap(lambda x: [x, x+5])
RDD: [1, 2, 3] → [1, 6, 2, 7, 3, 8]
Spark Actions
• Cause Spark to execute recipe to transform source
• Mechanism for getting results out of Spark
Action | Description |
---|---|
reduce(func) | aggregate dataset’s elements using function func.func takes two arguments and returns one, and is commutative and associative so that it can be computed correctly in parallel |
take(n) | return an array with the first n elements collect() return all the elements as an array WARNING: make sure will fit in driver program |
takeOrdered(n, key=func) | return n elements ordered in ascending order or as specified by the optional key function |
例子
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.reduce(lambda a, b: a * b)
Value: 6
#(1 * 2 * 3)
>>> rdd.take(2)
Value: [1,2] # as list
>>> rdd.collect()
Value: [1,2,3] # as list
>>> rdd = sc.parallelize([5,3,1,2])
>>> rdd.takeOrdered(3, lambda s: -1 * s)
Value: [5,3,2] # as list
.count()
.cache()
lines = sc.textFile("...", 4)
lines.cache() # save, don't recompute!
comments = lines.filter(isComment)
print lines.count(),comments.count()
Spark Program Lifecycle
- Create RDDs from external data or parallelize a collection in your driver program
- Lazily transform them into new RDDs
- cache() some RDDs for reuse
- Perform actions to execute parallel computation and produce results
Key-Value RDDs
Key-Value Transformation | Description |
---|---|
reduceByKey(func) | return a new distributed dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V |
sortByKey() | return a new dataset (K, V) pairs sorted by keys in ascending order |
groupByKey() | return a new dataset of (K, Iterable<V>) pairs |
! 使用groupByKey()是要注意,可能需要大量数据在网络中移动,同时生成的list可能非常大,导致worker内存耗尽
>>> rdd = sc.parallelize([(1,2), (3,4), (3,6)])
>>> rdd.reduceByKey(lambda a, b: a + b)
RDD: [(1,2), (3,4), (3,6)] → [(1,2), (3,10)]
>>> rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')])
>>> rdd2.sortByKey()
RDD: [(1,'a'), (2,'c'), (1,'b')] → [(1,'a'), (1,'b'), (2,'c')]
>>> rdd2 = sc.parallelize([(1,'a'), (2,'c'), (1,'b')])
>>> rdd2.groupByKey()
RDD: [(1,'a'), (1,'b'), (2,'c')] → [(1,['a','b']), (2,['c'])]
pySpark Shared Variables
Broadcast Variables(广播变量)
» Efficiently send large, read-only value to all workers
» Saved at workers for use in one or more Spark operations
» Like sending a large, read-only lookup table to all the nodes
# Country code lookup for HAM radio call signs
# Lookup the locations of the call signs on the
# RDD contactCounts. We load a list of call sign
# prefixes to country code to support this lookup
signPrefixes = sc.broadcast(loadCallSignTable())
def processSignCount(sign_count, signPrefixes):
country = lookupCountry(sign_count[0], signPrefixes.value)
count = sign_count[1]
return (country, count)
countryContactCounts = (contactCounts
.map(processSignCount)
.reduceByKey((lambda x, y: x+ y)))
Accumulators (累加器)
» Aggregate values from workers back to driver
» Only driver can access value of accumulator
» For tasks, accumulators are write-only
» Use to count errors seen in RDD across workers
# Counting empty lines
file=sc.textFile(inputFile)
# Create Accumulator[Int] initialized to 0
blankLines = sc.accumulator(0)
def extractCallSigns(line):
global blankLines # Make the global variable accessible
if (line == ""):
blankLines += 1
return line.split(" ")
callSigns = file.flatMap(extractCallSigns)
print "Blank lines: %d" % blankLines.value
--
更多文档参考:
Introduction to Big Data with Apache Spark
pySpark 文档
pyspark-pictures