How are common reduce-like actions distributed in Spark?
In Scala sequentional collections, operations such as fold, reduce and aggregate (and their variants such as foldLeft and reduceRight) have something in common regarding the way that they are computed. These operations walk through a collection and combine neighbouring elements of the collection foldLeft together to produce a single combine result (rather than another collection).
Example:
case class Taco(kind:String, price: Double)
val tacoOrder = List(
Taco("Carnitas", 2.25),
Taco("Corn", 1.75),
Taco("Barbacoa", 2.50),
Taco("Chicken", 2.00))
val cost = tacoOrder.foldLeft(0.0)((sum, taco) => sum + taco.price)
Comparing foldLeft and fold, remember that foldLeft is not parallelizable:
def foldLeft[B](z: B)(f: (B, A) => B): B
Intuitively, in a foldLeft operation, the collection is walked from left to right. In a distributed collection, elements are spread through the cluster so implementing foldLeft is not possible.
Concretely, given:
val xs = List(1, 2, 3, 4)
val res = xs.foldLeft("")((str: String, i: Int) => str + i)
The result should be: "1234". What happens if we break the collection in two and parallelize? For two nodes, we would get "12" on one node and "34" on the other node. But both results being of type String, cannot be combined back because foldLeft in the above example expects the types to be combined to be a String and an Int.
In contrast, the fold operation is parallelizable. The fold operation is similar to the foldLeft operation, with the difference that both types involved have to be the same:
def fold(z: A)(f: (A, A) => A): A
This allows us to build parallelizable reduce trees.
Note: remember the Lego pieces analogy from the parallel Scala programming course
def aggregate[B](z: => B)(seqop: (B, A) => B, combop: (B, B) => B): B
aggregate is said to be general because it gives the best of both worlds: it is parallelizable and it allows to change the return type.
| Scala collections | Spark |
|---|---|
| fold | fold |
| foldLeft / foldRight | |
| reduce | reduce |
| aggegate | aggregate |
Spark RDDs have no foldLeft or foldRight operations defined, therefore if you need to change the return type of your reduction, you have to use aggregate instead.
No serial operations are practical in a cluster. Applying order to operations in a cluster is very difficult. It requires lots of synchronization and it doesn't make sense doing that in a cluster.
In fact is more desirable to use aggregate in Spark because usually, when working with big data, we want to project down from complex to simplier data types. As an example, if you have a record for a Wikipedia page like this:
case class WikipediaPage(
title: String,
redirectTitle: String,
timestamp: String,
lastContributorUsername: String,
text: String
)
when reducing this dataset, we might be interested only in the title and timestamp and we don't want to carry around in memory (in our accumulator) the entire full text of the article.
Large datasets are often made up of huge numbers of complex, nested data records. To be able to work with such datasets, it's often desirable to project down these complex data types into key-value pairs.
Example:
{
"definitions": {
"firstname": "string",
"lastname": "string",
"address": {
"type": "object",
"properties": {
"street_address": {
"type": "string"
},
"city": {
"type": "string"
},
"state": {
"type" : "string"
}
},
"required": [
"street_address",
"city",
"state"
]
}
}
}
In this example, we might only be interested in city data and because of that, it may be desirable to create an RDD of properties of type:
RDD[(String, Property)]
where 'String' is a key representing a city, and 'Property' is its corresponding value:
case class Property(street: String, city: String, state: String)
where instances of Properties can be grouped by their respective cities and represented in a RDD of key-value pairs.
Pair RDDs are very useful because they allow you to act on keys in parallel or regroup the data based on key across the network.
Pair RDDs have additional specialized methods for working with data associated with keys. RDDs that are parameterized by a pair are Pair RDDs and Spark treats them specially:
RDD[(K, V)]
Some important extra methods for Pair RDDs are:
def groupByKey(): RDD[(K, Iterable[V])]
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def join[W](other: RDD[(K, W)]): RDD[K, (V, W))]
Pair RDDs are most often created from already-existing non-pair RDDs, for example using the map operation on RDDs:
val rdd: RDD[WikipediaPage] = ...
val pairRdd = rdd.map(page => (page.title, page.text))
Once created, we can use transformations specific to Pair RDDs, such as: groupByKey, reduceByKey and join.
Transformations
Action
Here is how groupBy works in regular Scala collections:
def groupBy[K](f: A => K): Map[K, Traversable[A]]
Partitions this traversable collection into a map of traversable collections according to some discriminator function.
Example: grouping the list below into "child", "adult" and "senior" categories.
val ages = List(2, 52, 44, 23, 17, 14, 12, 82, 51, 64)
val grouped = ages.groupBy {
if (age >= 18 && age < 65) "adult"
else if (age <18) "child"
else "senior"
}
// grouped: scala.collection.immutable.Map[String, List[Int]]
// = Map(senior -> List(82), adult -> List(52, 44, 23, 51, 64),
// child -> List(2, 17, 14, 12))
Similarly, in Spark groupByKey can be thought of as a groupBy on Pair RDDs that is specialized on grouping all values that have the same key. The key is implicit in this case, so there's no more need to use a discriminator function to "calculate" the key.:
def groupByKey(): RDD[(K, Iterable[V])]
Example:
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val groupedRdd = eventRdd.groupByKey()
groupedRdd,collect().foreach(println)
// (Prime Sound, CompactBuffer(42000))
// (Sportorg, CompactBuffer(23000, 12000, 1400))
// ...
Conceptually, we can think of reduceByKey as a combination of groupByKey and reduce-ing on all the values by key. It's more efficient than using groupByKey and reduce separately.
def reduceByKey(func(V, V): V): RDD[(K, V)]
Let's use the same example as before but this time calculate the total budget per organizer:
case class Event(organizer: String, name: String, budget: Int)
val eventsRdd = sc.parallelize(...)
.map(event => (event.organizer, event.budget))
val budgetRdd = eventRdd.reduceByKey(_+_)
groupedRdd,collect().foreach(println)
// (Prime Sound, CompactBuffer(42000))
// (Sportorg, CompactBuffer(36400))
// ...
def mapValues[U](f: V => U): RDD[(K, U)]
mapValues can be thought as a short-hand for:
rdd.map { case (x,y): (x, func(y)) }
That is, it applies a function to only the values in a Pair RDD.
def countByKey(): Map[(K, Long)]
countByKey is an action that counts the number of elements per key in a Pair RDD, returning a normal Scala Map mapping from keys to counts.
Example: let's use the events example from before to calculate the average budget per organizer
// for the key's value, calculate a pair containing (budget, #events)
// we will end up to a map from keys to tuples (budget, #events)
val intermediate =
eventsRdd.mapValues(b => (b, 1))
.reduceByKey((v1, v2) => (v1._1, v2._1, v1._2 + v2._2))
// intermediate: RDD[(String, (Int, Int))]
val avgBudgets = intermediate.mapValues(
(budget, numberOfEvents) => budget / numberOfEvents))
avgBudgets.collect().foreach(println)
// (PrimeSound, 42000)
// (Sportorg, 12133)
// ...
def keys: RDD[K])
Returns an RDD with the keys of each tuple. Note: this method is a transformation and returns an RDD because the number of keys in a Pir RDD might be unbounded. It is possible for every value to have a unique key, and thus it may not be possible to collect all keys at one node.
Example: count the number of unique visitors to a website using the keys transformation:
case class Visitor(ip: String, timestamp: String, duration: String)
val visits: RDD[Visitor] = sc.textfile(...)
.map(v => (visit.ip, visit.duration))
val numUniqueVisists = visits.keys().distinct().count()
Joins are another sort of transformations specific to Pair RDDs. They are used to combine multiple datasets. They are one of the most commonly-used operations on Pair RDDs.
There are two kind of joins:
The key difference between the two kind of joins is what happens to the keys when both RDDs to be joined don't contain the same key. For example, if I were to join two RDDs containing different customerIDs (the key), the difference between inner / outer join is what happens to customer records whose IDs don't exist in both RDDs.
Example: let's pretend the Swiss Rail Company (CFF) has two datasets: one RDD representing customers and their subscriptions (abos) and another representing customers and cities they frequently travel to (locations). Let's assume the following concrete data:
val as = List ((101, ("Ruetli", AG)), (102, ("Brelaz", DemiTarif)),
(103, ("Gress", DemiTarifVisa)), (104, ("Schatten", DemiTarif)))
val abos = sc.parallelize(as)
val ls = List((101, "Bern"), (101, "Thun"), (102, "Lausanne"),
(102, "Geneve"), (102, "Nyon"), (103, "Zurich"), (103, "Chur"))
val locations = sc.parallelize(ls)
Both datasets are Pair RDDs. The same customer can be (but not necessarily) in both RDDs (like customer with ID 101 for instance).
Inner joins return a new RDD containing combined pairs whose keys are present in both input RDDs.
def join[W] (other: RDD[(K, W)]): RDD[K, (V, W)]
Example: how do we combine only customers that have a subscription and where there is location information?
val abos = ... // RDD[(Int, (String, Subscription))
val locations = ... // RDD[(Int, String)]
val trackedCustomers = abos.join(locations)
// RDD[(Int, ((String, Subscription), String))]
Outer joins return a new RDD containing combined pairs whose keys don't have to be present in both RDDs.
Outer joins are particularily useful for customizing how the resulting joined RDD deals with missing keys. With outer joins, we can decide which RDD's keys are most essential to keep - the left or the right RDD in the join expression.
def leftOuterJoin[W](other: RDD[K, W]): RDD[(K, (V, Option[W]))]
def rightOuterJoin[W](other: RDD[K, W]): RDD[(K, (Option(V), W))]
Example: let's assume the CFF wants to know for which subscribers the CFF has managed to collect location information. It is possible that someone has a demi-tarif but doesn't use the CFF app and only pays cash.
val abosWithOptionalLocations = abos.leftOuterJoin(locations)
// RDD[Int, ((String, Subscription), Option[String])]
abosWithOptionalLocations.collect().foreach(println)
val customersWithLocationDataAndOptionalAbos
= abos.rightOuterJoin(locations)
// RDD[Int, (Option[(String, Subscription)], String)]
customersWithLocationDataAndOptionalAbos.collect().foreach(println)