401.01 - Introduction to Spark

From data-parallel to distributed-data-parallel

Hosted infrastructure: databricks.

RDDs - Resilient Distributed Datasets. A distributed counterpart of the Sacala parallel collections.

Example:

val wiki = RDD[WikiArticle] = ...
...
wiki.map {
  article => article.text.toLowerCase
}

In this example, we use ordinary collection APIs to interact with the distributed data.

Latency

Distribution introduces important concerns:

  • Partial failure - crash failures of a subset of the machines involved in a large distributed compuatation;
  • Latency - certain operations (like network communication) have a much higher latency than other operations

Latency cannot be masked completely - it is an important aspect that impacts the programming model.

Important latency numbers here.

RDDs - Spark's distributed data collections

RDDs seem a lot like immutable sequential or parrallel Scala collections.

 abstract class RDD[T] {
   def map[U] (f: T => U): RDD[U] = ...
   def flatMap[U] (f: T => TraversableOnce[U]): RDD[U] = ...
   def filter(f: T -> Boolean): RDD[T] = ...
   def reduce(f: (T, T) => T): T = ...
 }

Most operations on RDDs, like Scala's immutable List or Scala's parallel collections are higher-order functions.

RDDs expose the same combinators as Scala's immutable sequential or parallel collections: map, flatMap, filter, reduce, fold, aggregate. The signature of these combinators differ slightly, but the semantics is identical:

map[B](f: A => B): List[B]    // Scala List
map[B](f: A => B): RDD[B]     // Spark RDD

flatMap[B](f: A => TraversableOnce[B]): List[B]      // Scala List
flatMap[B](f: A => TraversableOnce[B]): RDD[B]       // Spark RDD

filter(pred: A => Boolean): List[A]       // Scala List
filter(pred: A => Boolean): RDD[A]        // Spark RDD

reduce(op: (A, A) => A): A          // Scala List
reduce(op: (A, A) => A): A          // Spark RDD

fold(z: A)(op: (A, A) => A): A    // Scala List
fold(z: A)(op: (A, A) => A): A    // Spark RDD

aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B   // Scala List
aggregate[B](z: B)(seqop: (B, A) => B, combop: (B, B) => B): B      // Spark RDD

Using RDDs in Spark feels a lot like normal sequential/parallel collections, with the added knowledge that your data is distributed across several machines.

Example. Given:

val encyclopedia: RDD[String]

say we want to search all encyclopedia for mentions of EPFL and count the number of pages that mention EPFL.

val result = encyclopedia.filter(page => page.contains("EPFL")).count()

This is identical to regular Scala collections code.

Another example (the "Hello World!" of programming with large-scale data). Assuming rdd is of type RDD[String] and it contains a line of text, to count the number of words in the line:

val rdd = spark.textFile("hdfs://...");

val count = rdd.flatMap(line => line.split(" "))             // separate line into words
                                    .map(word => (word, 1))  // increment for each word
                                    .reduceByKey(_ + +)      // aggregate the result

How to create an RDD

a) by transforming an existing an existing RDD (just as with applying high-order functions to Scala collections);

b) from a SparkContext or SparkSession object. SparkSession is a handle to the Spark cluster. It defines methods to create and populate RDDs:

  • parallelize converts a local Scala collection into an RDD
  • textFile reads a textfile from HDFS or a local file system and return an RDD[String]

RDD transformations and actions

Recall transformers and accessors from Scala sequential and parallel collections:

Scala transformers

Return new collections (not values) as a result. Examples: map, filter, groupBy.

map(f: A => B): Traversable[B]

Scala accessors

Return a single value (not collections) as a result. Examples: reduce, fold, aggregate.

reduce(op: (A, A) => A): A

Similarly, Spark uses transformations and actions.

Spark transformations

Return new RDDs as a result. Transformations are LAZY.

Spark actions

Compute a result basd on an RDD and either return or save to an external storage system (like HDFS). Actions are EAGER.

Laziness / eagerness is how we can limit network communication using the programming model.

Example:

val largeList: List[String] = ...
val wordsRdd = sc.parallelize(largeList)     // RDD[String]
val lengthsRdd = wordsRdd.map(_.length)      // RDD[Int]

At this point, nothing happened in the cluster yet, because we only used transformations so far, and transformations are lazy. To kick-off the computations, we need to add an action:

val totalChars = lengthsRdd.reduce(_ + _)

Common transformations in the wild

map[B](f: A => B): RDD[B]

Apply function f to each element of the input RDD and return a new RDD of the result.

flatMap[B](f: A => TraversableOnce[B]): RDD[B]

Apply a function to each element of the RDD and return an RDD of the iterators returned.

filter(pred: A => Boolean): RDD[A]

Apply predicate to each element in the RDD and returns an RDD of elements that passed the predicate condition

distinct(): RDD[B]

return RDD with all duplicates removed.

Common actions in the wild

collect(): Array[T]

Return all elements from an RDD.

count(): Long

Returns the number of elements in the RDD.

take(num: Int): Array[T]

Returns the first num elements in an RDD.

reduce(op: (A, A) => A): A

Combines the elements in the RDD together using the op functions and return the result.

foreach(f: T => Unit): Unit

Apply the function f to each element in the RDD.

Another example: let's assume we have an RDD[String] which contains gigabayte of logs collected during the previous year. Each element in the RDD represents one line of logging. Assuming that dates are formatted as: YYYY-MM-DD:HH:MM:SS and errors are logged with a prefix that includes the word "error". How do we determine the number of errors that were logged in December 2016?

val lastYearsLogs: RDD[String] = ...
val numDecErrorLogs = lastYearsLogs.filter(lg = > lg.contains("2016-12") && lg.contains("error")).count()

The filter doesn't execute until count is invoked, because filter is lazy and count is eager.

Transformations on two RDDs

RDDs support set-like operations, likee union and intersection. Two-RDD transformations combine two RDDs into a single one.

union(other: RDD[T]): RDD[T]

Returns an RDD that contains elements from both RDDs.

intersection(other: RDD[T]): RDD[T]

Returns an RDD containing only elements found in both RDDs.

subtract(other: RDD[T]): RDD[T]

Return an RDD with the contents of the other RDD removed.

cartesian[U](other: RDD[U]): RDD[(T,U)]

Cartesian product of the two RDDs.

Other useful RDD actions

Important actions unrelated to regularular Scala collections.

takeSample(withRepl: Boolean, num: Int): Array[T]

Return an array with a random sample of num elements of the dataset, with or without replacement.

takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

Return the first num elements of the RDD using either their natural order or a custom comparator.

saveAsTextFile(path: String): Unit

Write the elements of the dataset as a text file in the local filesystem or HDFS.

saveAsSequenceFile(path: String): Unit

Write the elements of the dataset as a Hadoop SequenceFile in the local filesystem or HDFS.

Evaluation in Spark

Iterations in Hadoop are like a pipeline which:

  • read some data from a file (HDFS)
  • perform some iterations (transformations, map-reduce)
  • write intermediary data into a file
  • repeat

Lots of time (about 90%) is spent in I/O.

In Spark, intermediary data is kept in memory, so the I/O overhead is eliminated.

Example: Logistic Regression algorithm, used for classification. Classification algorithms are iterative - the classifier's weights are updated based on a training dataset.

To tell Spark to cache a calculated RDD in memory in order to avoid recomputing them all the time, we can call the persist() or cache() functions.

Example:

val lastYearsLogs: RDD[String] = ...
val logsWithErrors = lastYearsLogs.filter(_.co /ntains("ERROR")).persist()
val firstLogsWithErrors = logsWithErrors.take(10)
val numErrors = logsWithErrors.count()

Because we called persist(), the logsWithErrors RDD won't be evaluated again when we use the second action count() on it.

It is possible to persist an RDD in many different ways:

  • as regular in-memory Java objects;
  • on disk, as regular Java objects;
  • in memory, as serialized Java objects (more compact);
  • on disk, as serialized Java objects;
  • both in memory and on disk (spill over to disk to avoid recomputation);

cache()

Shorthand for using teh default storage level (in memory, as Java objects)

persist()

Persistence can be customized using this method. The storage level can be passed as parameter.

Cluster topology matters

topology

Example: assume we have an RDD populated with Person obects:

case class Person(name: String, age: Int)

val people: RDD[Person] = ...
people.foreach(println)

How does Spark actually work?

1) A driver program runs the Spark application, which creates a SparkContext upon startup;

2) The SparkContext connects to a cluster manager (Mesos / YARN) which allocates the resources;

3) Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application;

4) The driver program sends your application code to the executors;

5) The SparkContext sends tasks for the executors to run;

For this reason, in the above example, the foreach action is executed on the worker nodes, therefore nothing will be prnted in the driver program (it will be printed in the worker nodes).