Until now we talked about task-parallel programming: distribution of computations over parallel nodes.
Data-parallel programming distributes data across parallel nodes.
The simple form of data parallelization is the parallel for loop. Example: initializing the array values:
def initializeArray(xs: Array[Int])(v: Int): Unit = {
for (i <- (0 until xs.length).par) {
xs(i) = v
}
}
Adding .par to the Range converts it to a Parallel Range . This parallel for loop doesn't return any values, it only acts through side-effects such as writing to an array. Because of the side-effects, the parallel for loop only works if the body of the loop writes to different areas of the array, which is the case. If the body of the look would do something like this, instead:
xs(0) = v
the result of the computation would be non-deterministic.
Another example of parallel for loop is visualizing a Mandelbrot Set. This is rendering a set of complex numbers in the plane for which the sequence: $$z_{n+1} = z_n^2 + c$$ does not approach infinity.
In Scala, most collection operations can be made data-parallel:
(1 until 100).par
.filter(n => n %3 == 0)
.count(n => n.toString == n.toString.reverse)
The filter and count operations above are parallelized. However, some collection operations cannot be parallelized. For instance, the implementation of sum using foldLeft:
def sum(xs: Array[Int]): Int = {
xs.par.foldLeft(0)(_ + _)
}
Despite the fact that the array has been made data-parallel, the execution of the foldLeft is still sequential. The foldLeft has the following signature:
def foldLeft[B](z: B)(f: (B, A) => B): B
our collection: B B B B B will be folded as:
A A A B -> B -> B -> ... -> B
In order to calculate the value at step x we must have the value at step x-1. For this reason, foldLeft cannot be easily parallelized.
A variation of the operation, called fold can be implemented in parallel:
def fold[A](z: A)(f: (A, A) => A): A
Because both arguments of the folding function take the same type of argument, the operation can be parallelized:
A A -> A -> A A -> A A
This is very similar to the reduction tree that we studied before.
Many other operations can be made data-parallel based on fold. For instance:
def sum(xs: Array[Int]): Int = {
xs.par.fold(0)(_ + _)
}
Another example is the max function:
def max(xs: Array[Int]): Int = {
xs.par.fold(Int.MinValue)(math.max)
}
The common things about sum and max is that:
The id element and the folding function constitute a monoid.
Commutativity isn't necessary for the fold operation.
The fold operation has a big limitation: the accumulator must have the same type as the collection elements. For instance, something like this wouldn't even compile:
val a = Array('A', 'X', 'M', 'P')
val vowelCount
= a.par.fold(0)((count, ch)=>if(isVowel(ch)) count+1 else count)
because the accumulator is an Int while the collection elements are chars.
We can define, though, anoother function: aggregate with the following signature:
def aggregate[B](z: B)(f: (B, A) => B, g: (B, B) => B): B
A A A B -> B -> B ... -> B -> B B -> B -> B ... -> B A A A
This is a combination of foldLeft (f) and fold (g) . We can now do:
val a = Array('A', 'X', 'M', 'P')
val vowelCount
= a.par.aggregate(0)
((count, ch)=>if(isVowel(ch)) count+1 else count, _ + _)
Again, the parallel reduction operator + and the neutral value 0 must for a monoid.
Operations such as fold, sum, aggregate are accessor (read-only) operations. Other operations, such as map, filter, flatMap and groupBy are transformer (read-write) combinators.

1) Traversable[T] - collection of elements of type T with operations implemented using foreach; 2) Iterable[T] - collection of elements of type T with operations implemented using iterator; 3) Seq[T] - ordered sequence of elements of type T; 4) Set[T] - set of elements of type T (no duplicates); 5) Map[K, V] - maps of keys of type K to values of type V (no duplicate keys);
ParIterable[T], ParSeq[T], ParSet[T], ParMap[K, V] are the parallel counterparts of the collections above.
For code agnostic to parallelism there are generic counterparts of the above collections: GenIterable[T], GenSeq[T], GenSet[T], GenMap[K, V]:
def largestPalindrome(xs: GenSeq[Int]): Int = {
xs.aggregate(Int.MinValue)(
(largest, n) =>
if ((n > largest) && (n.toString == n.toString.reverse))
n else largest, math.max
)
}
val array = (0 until 1000000).toArray
// sequential
largestPalindrom(array)
// parallel
largestPalindrom(array.par)
Other parallelizable collections include ParArray[T], ParVector[T] and ParRange., as well as ParHashSet[T], ParHashMap[K, V] and ParTrieMap[K, V].
The conversion from a regular collection to a parallelizable collection (achieved by invoking .par) takes some processing time. Parallel collections should be used only if there's an overall benefit.
Another example, the intersection of two generic sets:
def intersection(a: GenSet[Int], b: GenSet[Int]): Set[Int] = {
val result = mutable.Set[Int]()
for (x <- a) if (b contains x) result += x
result
}
intersection((0 until 10000).toSet, (0 until 10000 by 4).toSet)
intersection((0 until 10000).par.toSet, (0 until 10000 by 4).par.toSet)
This program is NOT correct, because result is mutable and will be accessed simultaneously by multiple threads.
A possible solution would be to use a thread-safe implementation of the result Set:
def intersection(a: GenSet[Int], b: GenSet[Int]): Set[Int] = {
val result = new ConcurrentSkipListSet[Int]()
for (x <- a) if (b contains x) result += x
result
}
intersection((0 until 10000).toSet, (0 until 10000 by 4).toSet)
intersection((0 until 10000).par.toSet, (0 until 10000 by 4).par.toSet)
An even better solution is to avoid mutation altogether, by using the right combinators (in this case the filter combinator):
def intersection(a: GenSet[Int], b: GenSet[Int]): Set[Int] = {
if (a.size < b.size) a.filter(b(_)) else b.filter(a(_))
}
intersection((0 until 10000).toSet, (0 until 10000 by 4).toSet)
intersection((0 until 10000).par.toSet, (0 until 10000 by 4).par.toSet)
Never modify common memory locations without proper synchronization
Never modify a parallel collection on which a data-parallel operation is in progress
An exception to the above rules is the TrieMap[K, V] collection, that takes a snapshot of the original data either on demand or when iterating over it. This allows separating the processing such as reads are done from the snapshot, while writes operate on the original collection.
Each Iterable collection can create its own Iterator object:
trait Iterator[A] {
def next: A
def hasNext: Boolean
}
def iterator: Iterator[A] // on each collection
The Iterator contract:
Splitters are counterpart of Iterators used for parallel programming.
trait Splitter[A] extends Iterator[A] {
def split: Seq[Splitter[A]]
def remaining: Int // estimate on the number of remaining elements
}
def splitter: Splitter[A] // on each *parallel* collection
The Splitter contract:
Here is how you can enhance a Splitter with the fold operation we saw before:
def fold(z: A)(f: (A, A) => A): A = {
if (remaining < threshold) foldLeft(z)(f)
else {
val children = for (child <- split) yield task {child.fold(z)(f) }
children.map(_.join()).foldLeft(z)(f) // foldLeft merges the partial results
}
}
Builders are abstractions used for creating new collections.:
// A is the type of elements that we can add to the Builder and Repr is the
// type of the collection that the Builder creates
trait Builder[A, Repr] {
def +=(elem: A): Builder[A, Repr]
def result: Repr
}
def newBuilder: Builder[A, Repr] // on each collection
The Builder contract:
Here is an example on how the filter method can be implemented on collections:
def filter(p: T => Boolean): Traversable[T] = {
val b = newBuilder
for(x <- this) { if p(x) b += x }
b.result
}
A Combiner is a parallel version of a Builder.
trait Combiner[A, Repr] extends Builder[A, Repr] = {
def combine(that: Combiner[A, Repr]): Combiner[A, Repr]
}
def newCombiner = Combiner[T, Repr] // on each *parallel* collection
The Combiner contract: