ch1
WordCount in Scala
package com.oreilly.learningsparkexamples.mini.scala
import org.apache.spark._
import org.apache.spark.SparkContext._
object WordCount {
def main(args: Array[String]) {
val inputFile = args(0)
val outputFile = args(1)
val conf = new SparkConf().setAppName("wordCount")
// Create a Scala Spark Context.
val sc = new SparkContext(conf)
// Load our input data.
val input = sc.textFile(inputFile)
// Split up into words.
val words = input.flatMap(line => line.split(" "))
// Transform into word and count.
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
// Save the word count back out to a text file, causing evaluation.
counts.saveAsTextFile(outputFile)
}
}
We can build these using sbt.
mkdir ~/src
cd ~/src
vi WordCount.scala
vi build.properties
sbt.version=0.13.11
vi build.sbt
Example of sbt file
name := “learning-spark-mini-example”
version := “0.0.1”
scalaVersion := “2.10.4”
// additional libraries
libraryDependencies ++= Seq(
“org.apache.spark” %% “spark-core” % “1.2.0” % “provided”
)
sbt package
spark-submit --master local --class com.oreilly.learningsparkexamples.mini.scala.WordCount *.jar input output
RDD
immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster.
RDD can contain any type of Python, Java or Scala objects, including user-defined classes.
User create RDDs in two ways: by loading an external dataset, or by distributing a collection of objects (a list or set) in their driver program.
sc.textFile("readme.md")
Once created, RDDs offer two types of operations: transformations and actions.
Transformations construct a new RDD from a previous one. For example, one common transformation is filtering data that matches a predicate. In our text file example, we can use this to create a new RDD holding just the strings that contain the word Python
pythonLines = lines.filter(lambda line : "Python" in line)
Action, on the other hand, compute a result based on an RDD, and either return it to the driver program or save it to an external storage system(HDFS). One example of an action we called earlier is first(), which returns the first element in an RDD and is demonstrated below:
pythonLines.first()
Although you can define new RDDs any time, Spark computes them only in a lazy fashion-- that is, the first time they are used in an action. This approach might seem unusual at first, but makes a lot of sense when you are working with Big Data. If Spark were to load and store all lines in the file as soon as we wrote lines = sc.textFile(), it would waste a lot of storage space, given that we then immediately filter out many lines. Instead once Spark sees the whole chain of transformations, it can copmute just the data needed for its result. In fact, for the first() action, Spark scans the file only until it finds the first matching line; it doesn't even read the whole file.
Spark's RDDs are by default recomputed each time you run an action on them. If you would like to reuse an RDD in multiple actions, you can ask Spark to persist it using RDD.persist(). We can ask Spark to persist data in a number of different places. After computing it the first time, Spark will store the RDD contents in memory (partitioned across the machines in your cluster) and reuse them in future actions. Persisting RDDs on disk instead of memory is also possible. The behaviour of not persisting by default may again seem unsual, but it makes a lot of sense for big datasets: if you will not reuse the RDD, there's no reason to waste storage space when Spark could instead stream through the data once and just compute the result.
In practice, you will often use persist() to load a subset of your data into memory and query it repeatedly. For example, if we know that we wanted to compute multiple results about the README lines that conain Python, we could write the script shown below:
pythonLines.persist
pythonLines.count()
pythonLines.first()
To summarize, every Spark program and shell session will work as follows:
- Create some input RDDs from external data.
- Transform them to define new RDDs using transformation like filter().
- Ask Spark to persist() any intermediate RDDs to kick off a parallel computation, which is then optimized and executed by Spark.
Creating RDDs
Spark provides two ways to create RDDs: loading an external dataset and parallelizing a collection in your driver program.
The simplest way to create RDDs is to take an existign collection in your program and pass it to SparkContext's parallelized() method.
parallelize() method in Scala
val lines = sc.parallelize(List("pandas","i like pandas"))
parallelize() method in Python
line = sc.parallelize(["pandas","i like pandas"])
paralleize() in Java
JavaRDD<String> lines = sc.paralleize(Arrays.asList("pandas","i like pandas"));
A more common way to create RDDs is to load data from external storage.
textFile() method in Python
lines = sc.textFile("README.md")
in Scala
val lines = sc.textFile("README.md")
in Java
JavaRDD<String> lines = sc.textFile("README.md")
RDD Operations
Transformations are operations on RDDs that return a new RDD, such as map() and filter(). Actions are operations that return a result to the driver program or write it to storage, and kick off a computation, such as count() and first(). Spark treats transformations and actions very differently, so understanding which type of operation you are performing will be very important. If you are ever confused whether a given function is a transformation or an action, you can look at its return type: transformations return RDDs, whereas actions return some other data type.