What happens when you call a groupBy or a groupByKey operation, when the data is distributed?
Example:
val pairs = sc.parallelize(List((1, "one), (2, "two"), (3, "three")))
pairs.groupByKey()
// res2: org.apache.spark.rdd.RDD[(Int, Iterable[String])]
// = ShuffledRDD[16] at groupByKey at <console>:37
In order to perform this operation, data has to be moved over the network in order to be aggregated. This data move is called shuffling.
Shuffles can incur enormous performance hits, because Spark has to move data around the network, which is slow.
Another example. Given:
case class CFFPurchase(customerId: Int, destination: String, price: Double)
Assume we have an RDD of the purchases that customers made in the past month:
val purchasesRdd: RDD[CFFPurchase] = sc.textFile(...)
We want to calculate how many trips and how much money was spent by each individual customer over the course of the month.
val purchasesPerMonth = purchasesRdd.map(p =>(p.customerId, p.price)) // Pair RDD
.groupByKey() // RDD[K, Iterable[V]]
,map(p => (p._1, (p._2,size, p._2.sum)))
.collect()
What happens with the data in the following scenario:
val purchases = List(
CFFPurchase(100, "Geneva", 22.25),
CFFPurchase(300, "Zurich", 42.50),
CFFPurchase(100, "Fribourg", 12.40),
CFFPurchase(200, "St. Gallen", 8.20),
CFFPurchase(100, "Lucerne", 31.60),
CFFPurchase(300, "Basel", 16.20))
How would the data be distributed on 3 computation nodes:
map:
groupByKey (requires shuffling):
We don't want to be sending all the data over the network if it's not absolutely required. Too much network communication kills the performance.
In order to minimize the amount of data that gets sent over the network during shuffling, we could use a reduceByKey operation before grouping. reduceByKey can be thought as a combination of first doing a groupByKey and then reducing all the values grouped by key.
def reduceByKey(func: (V, V) => V: RDD[(K, V)]
Here is how the code would look like, using reduceByKey:
val purchasesPerMonth = purchasesRdd.map(p => (p.customerId, (1, p.price)))
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.collect()
How does this look on the cluster?
How would the data be distributed on 3 computation nodes:
map:
reduceByKey (requires some shuffling). This executes in two stages: first on the mapper side:
then between the nodes:
The amount of data that gets moved over the network is minimized.
We saw that operations like *groupByKey require shuffling. Grouping all values of key-value pairs with the same key requires collecting all key-value pairs with the same key on the same node. But how does Spark know which key to put on which node?
Data within an RDD is split into several partitions. Partitions have the following properties:
Two kinds of partitioning are available in Spark:
Customizing a partitioning is only possible on Pair RDDs !
Back to our example. Given a Pair RDD that should be grouped:
val purchasesPerMonth = purchasesRdd.map(p =>(p.customerId, p.price)) // Pair RDD
.groupByKey() // RDD[K, Iterable[V]]
groupByKey will first compute per tuple (k, v) its partition p:
p = k.hashCode() % numPartitions
Then, all tuples in the same partition p are sent to the machine hosting p.
Hash partitioning attempts to spread the data evenly across partitions based on the key.
Pair RDDs may contain keys that have an ordering defined (ex: Int, Char, String). For such RDDs, range partitioning may be more efficient. Using a range partitioner, keys are partitioned accordng to:
Tuples with keys in the same range appear on the same machine.
Example: consider that we want to use a hash partitioning for it an RDD with the following keys:
[8, 96, 240, 400, 401, 800]
and a desired number of partitions of 4. Let's assume that the hash function is the identity:
(n.hashCode() == n)
We end up with the following partitions:
- [8, 96, 240, 400, 800]
- [401]
-
-
The result is a very unbalanced distribution that hurts peformance.
Using range partitioning can improve the distribution significantly. The set of ranges for our RDD will be:
- [1, 200]
- [201, 400]
- [401, 600]
- [601, 800]
and the data will be partitioned as follows:
- [8, 96]
- [240, 400]
- [401]
- [800]
The resulting partitioning is more balanced.
a) by calling partitionBy on an RDD, providing a specific Partitioner; b) using transformations that return RDDs with specific partitioners;
Invoking partitionBy creates an RDD with a specified partitioner. Example:
val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tunedPartitioner = new RangePartitioner(8, pairs) // 8 is the number of partitions
val partitioned = pairs.partitionBy(tunedPartitioner).persist()
The call to persist() prevents Spark partitioning the data over and over again.
Creating a RangePartitioner requires:
1) Specifying the desired number of partitions; 2) Providing a Pair RDD with ordered keys. This RDD is sampled by Spark to create a suitable set of sorted ranges; 3) The result of partitionBy should be persisted in order to prevent repeated partitioning of the data;
Pair RDDs that are the result of a transformation on a partitioned Pair RDD typically is configured to use the hash partitioner that was used to construct it.
Some operations on RDDs automatically result in an RDD with a known partitioner (when it makes sense). For example, by default, when using sortByKey, a RangePartitioner is used. Also, the default partitioner when using groupByKey is a HashPartitioner.
Operations that hold to and propagate a partitioner:
All other operations (this includes map and flatMap) will produce a result without a partitioner. This is because these operations can completely change the type of the key. This isn't the case in, say, mapValues where the key is preserved.
Partitioning can bring enormous performance gains, especially in the face of shuffles. For instance, using range partitioners in our previous example, when using reduceByKey, we can end up with no shuffling at all!!!
val pairs = purchasesRdd.map(p => (p.customerId, p.price))
val tunedPartitioner = new rangePartitioner(8, pairs)
val partitioned = pairs.partitionBy(tunedPartitioner).persist()
val purchasesPerCustomer = partitioned.map(p => (p._1, (1, p._2)))
val purchasesPerMonth = purchasesPerCustomer
.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.collect()
Because all tuples for the same customer will be located on each node, the computation time will be in the order of 10x faster.
Let's look at another example. Consider an application that keeps a large table of user information in memory:
The application periodically combines this BIG table with a smaller file representing events that happened in the past 5 minutes:
For example, we might wish to count how many users visited a link that was not one of their subscribed topics. We can perform this combination using a join operation, which can be used to group the UserInfo and LinkInfo pairs for each UserID by key.
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.persist()
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events) //RDD[UserID, (UserInfo, LinkInfo))]
val offTopicVisists = joined.filter {
case (userId, (userInfo, linkInfo)) => // expand the tuple
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits;
}
This implementation is very inefficient. The join operation doesn't know anything about how the keys are partitioned in the datasets. The operation will hash all of the keys in both datasets, sending elements with the same hash value across the network to the same machine and then join together the elements with the samke key on that machine. This happens even though the user data in the BIG file doesn't change.
Fixing this is easy, just by using partitionBy on the BIG userData RDD at the start of the program.
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new HashPartitioner(100))
.persist()
Since we called partitionBy, Spark now knows that userData is hash-partitioned and calls to join will take advantage of this information. When calling the join operation, Spark will shuffle only the events RDD, sending events with each particular UserID to the machine that contains the corresponding hash partition of the userData.
Recall the example using groupByKey:
val purchasesPerMonth = purchasesRdd.map(p =>(p.customerId, p.price)) // Pair RDD
.groupByKey() // RDD[K, Iterable[V]]
Grouping all values of key-value pairs with the same key requires collecting all key-value pairs with the same key on the same machine. But grouping is done using a hash partitioner with default parameters. The result RDD, purchasesPerCust will use the same partitioner that was use to construct it.
Rule of thumb: a shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD. You can also figure out whether a shuffle has been planned and executed via:
1) the return type of certain transformations, like ShuffledRDD[366] 2) using function toDebugString to see its execution plan:
partitioned.reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2 + v2._2))
.toDebugString
// res9: String =
// (8) MapPartitionRDD[622] at reduceByKey
// | ShuffledRDD[615] at partitionBy
// | CacedPartitions: 8; MemorySize: 1754.8 MB; DiskSize: 0.0 B
There are a few ways to use operations that might cause a shuffle and to still avoid much or all of the network shuffling:
Some transformations are significantly more expensive in terms of latency than others (for instance: requiring lots of data to be transferred over the network unnecessarily.
Computations on RDDs are represented as a lineage graph; a Directed Acyclic Graph (DAG) representing the computations done on the RDD.
Example:
val rdd = sc.textFile(...)
val filtered = rdd.map(...).filter(...).persist()
val count = filtered.count()
val reduced = filtered.reduce(...)

Spark will analyze the DAG in order to optimize.
RDDs are represented as:


Previously we arrived at the following rule of thumb: a shuffle can occur when the resulting RDD depends on other elements from the same RDD or another RDD.
In fact dependencies encode when data must move across the network.
Transformations cause shuffles. Transformations can have two kind of dependencies:


Example: let's assume we have the following DAG:


Transformations with narrow dependencies:
Transformations with wide dependencies( might cause a shuffle):
We can find out more about the dependencies of an RDD by invoking the dependencies() method:
Example:
val wordsRdd = sc.parallelize(largeList)
val pairs = wordsRdd.map(c => (c, 1))
.groupByKey()
.dependencies
// pairs: Seq[org.apache.spark.Dependency[_]] =
// List(org.apache.spark.ShuffleDependency@12345)
The toDebugString() method prints out a visualization of the RDD's lineage, and other information related to scheduling.Indentations in the output separate groups of narrow transformations that may be pipelined together and wide transformations that require shuffles. These groupings are called stages.
val wordsRdd = sc.parallelize(largeList)
val pairs = wordsRdd.map(c => (c, 1))
.groupByKey()
.toDebugString
// pairs: String
// (8) ShuffledRDD[219] at groupByKey
// +- (8) MapPartitionsRDD[218] at map
// | ParallelCollection[217] at parallelize
Lineage graphs are the key to fault tolerance in Spark. Functional programming enables the fault tolerance:
This allows recomputing at any given time any subset of transformations in the entire lineage graph. In this way Spark can recover from failure by recomputing lost partitions from the lineage graph.