Merge sort algorithm:
The parMergeSort method has the following signature:
def parMergeSort(xs: Array[Int], maxDepth: Int): Unit
maxDepth gives the level of parallelism. The method sorts the array in place, as a side-effect.
We start by allocating an intermediary array:
val ys: Array[Int](xs.length)
Our algorithm will alternate between the two arrays.
def sort(from: Int, until: Int, depth: Int): Unit = {
if (depth == maxDepth) {
quickSort(xs, from, until-from)
}
else {
val mid = (from + until) / 2
parallel(sort(from, mid, depth+1),
sort(mid, until, depth+1))
// merge back the sorted array
val flip = ((maxDepth - depth) % 2 == 0)
val src = if (flip) ys else xs
val target = if (flip) xs else ys
merge(src, target, from, mid, until)
}
}
sort(0, xs.length, 0)
Copying the array can also be done in parallel:
def copy(src: Array[Int], target: Array[Int],
from: Int, until: Int, depth: Int): Unit = {
if (depth == maxDepth) {
Array.copy(src, from, target, from, until-from)
}
else {
val mid = (from + until) / 2
parallel(sort(src, target, from, mid, depth+1),
sort(src, target, mid, until, depth+1))
}
}
if ((maxDepth % 2) == 0) copy(ys, xs, 0, xs.length, 0)
Operations on collections are key to functional programming even in the sequential case:
List(1, 3, 8).map(x => x*x) == List(1, 9, 64)
List(1, 3, 8).fold(100)((s, x) => s+x) == 112
// combined fold of all list prefixes List(), List(1), List(1,3), List(1,3,8)
List(1, 3, 8).scan(100)((s, x) => s+x) == List(100, 101, 104, 112)
Lists aren't very good for parallelization, because it is difficult to split and combine them. It is better to use instead arrays and trees, which are easily supported in the functional model.
List(a1, a2, ..., an).map(f) == List(f(a1), f(a2), ..., f(an))
Properties of map:
list.map(x => x) == list // identity function
list.map(f.compose(g)) == list.map(g).map(f)
(f.compose(g))(x) = f(g(x))
A sequential definition of a recursive map is straightforward:
def mapSeq[A, B](lst: List[A], f: A => B): List[B] = lst match {
case Nil => Nil
case h :: t => f(h) :: mapSeq(t, f)
}
We want a version that parallelizes:
def mapASegSeq[A, b](inp: Array[A], left: Int, right: Int,
f: A => B, out: Array[B]) = {
var i = left
while (i < right) {
out(i) = f(inp(i))
i++
}
}
val in = Array(2, 3, 4, 5, 6)
val out = Array (0, 0, 0, 0, 0)
val f = (x: Int) => x*x
mapASegSeq(in, 1, 3, f, out)
out
def mapASegPar[A, b](inp: Array[A], left: Int, right: Int,
f: A => B, out: Array[B]) = {
if (right - left > threshold) {
mapASegSeq(inp, left, right, f, out)
}
else {
val mid: Int = (right - left) / 2
parallel(mapASegPar(inp, left, mid, f, out),
mapASegPar(inp, mid, right, f, out))
}
}
Note:
List(1, 3, 8).fold(100)((s, x) => s + x) == 112
List(1, 3, 8).foldLeft(100)((s, x) => s - x) == ((100-1)-3)-8 == 88
List(1, 3, 8).foldRight(100)((s, x) => s - x) == 1-(3-(8-100)) == -94
List(1, 3, 8).reduceLeft((s, x) => s - x) == (1-3)-8 == -10
List(1, 3, 8).reduceRight((s, x) => s - x) == 1-(3-8) == 6
To enable parallel processing, we are looking to associative operations:
Operations can be represented as trees, with the values being the leafs and the operators being the nodes.
sealed abstract class Tree[A]
case class Leaf[A](value: A) extends Tree[A]
case class Node[A](left: Tree[A], right: Tree[A]) extends Tree[A]
The result of the evaluation of the expression is given by a reduce of the tree.
A sequential definition of the reduce would be:
def reduce[A](t: Tree[A], f : (A, A) => A): A = t match {
case Leaf(v) => v
case Node(l, r) => f(reduce(l, f), reduce(r, f))
}
We can think of reduce as replacing the constructor of Node with a given f.
Making the reduce function parallel, is easy:
def reduce[A](t: Tree[A], f : (A, A) => A): A = t match {
case Leaf(v) => v
case Node(l, r) => {
val (lV, rV) = parallel(reduce(l, f), reduce(r, f))
f(lV, rV)
}
}
This operation is linear to the height of the tree: n
Assuming that f is associative, we have:
reduce(Node(Node(Leaf(x), Leaf(y)), Leaf(z)), f) ==
reduce(Node(Leaf(x), Node(Leaf(y), Leaf(z))), f)
We can describe the order of elements in the tree, we can consider converting the tree into a list:
def toList[A](t: Tree[A]): List[A] = t match {
case Leaf(v) => List(v)
case Node(l, r) => toList(l) ++ toList(r)
}
Suppose we also have a tree map, that replaces all elements in the tree by mapping them through a function:
def map[A](t: Tree[A], f: A => B): Tree[A] = t match {
case Leaf(v) => f(v)
case Node(l, r) => Node(map(l, f), map(r, f))
}
toList can be expressed in terms of map and reduce as follows:
def toList[A](t: Tree[A]): List[A] = reduce(map(t, List(_)), _ ++ _)
If:
then:
So, applying if f is associative, the result of reduce is the same, as long as the order of elements (toList) is the same.
This rule extends to structures other than trees. For instance, we can convert an array to a balanced tree and apply reduce to it.
Here is how we can reduce an array in parallel:
def reduceSeg[A](inp: Array[A], left: Int, right: Int,
f: (A, A) => A): A = {
if (right - left < threshold) {
var res = inp(left)
var i = left + 1
while (i < right) {
res = f(res, inp(i))
i = i + 1
}
}
else {
val mid = (right - left) / 2
val (a1, a2) = (parallel(reduceSeg(inp, left, mid, f),
reduceSeg(inp, mid, right, f)))
f(a1, a2)
}
}
def reduce[A](inp: Array[A], f: (A, A) => A): A =
reduceSeg(inp, 0, inp.length, f)
Operation:
f: (A, A) => A
is associative if for any x, y, z:
f(x, f(y, z)) == f(f(x, y), z)
Consequences:
Associativity and commutativity aren't inter-related.
Assuming two associative functions:
f1: (A1, A1) => A1
f2: (A2, A2) => A2
we can construct an associative function on tuples as follows:
f: ((A1, A2), (A1, A2)) => (A1, A2)
f((x1, x2), (y1, y2)) = (f1(x1, y1), f2(x2, y2))
Example:
times((x1, y1), (x2, y2)) = (x1*x2, y1*y2)
Because the multiplication is associative, the times function of tuples is also associative.
Another example - the average of elements in a collection:
val sum = reduce(collection, _ + _)
val length = reduce(map(collection, (x: Int) => 1), _ + _)
sum / length
A solution that uses a single reduce instead of two would be:
def f((sum1, len1), (sum2, len2)) = (sum1 + sum2, len1 + len2)
val (sum, length) = reduce(map(collection, (x: Int) => (x, 1)), f)
sum / length
Functions that are commutative aren't necessarily associative. But functions that are commutative AND have rotating arguments are also associative.
Consider the scanLeft operation, that has aspects of both map and fold:
List(1, 3, 8).scanLeft(100)((s, x) => s + x ) = List (100, 101, 104, 112)
Symbolically, we can say that:
List(a1, a2, a3).scanLeft(a0)(f) = List(b0, b1, b2, b3)
b0 = a0
b1 = f(b0, a1)
b2 = f(b1, a2)
b3 = f(b2, a3)
We assume that f is an associative function. Even so, scanRight is different than scanLeft:
List(1, 3, 8).scanRight(100)((s, x) => s + x ) = List (112, 111, 108, 100)
scanRight and scanLeft are dual operations so everything we say about scanLeft here will also apply to scanRight.
Here is a sequential implementation of scanLeft:
def scanLeft[A](inp: Array[A], a0: A, f: (A, A) => A,
out: Array[A]): Unit = {
out(0) = a0
var a = a0
var i = 0
while (i < inp.length) {
a = f(a, inp(i))
i = i + 1
out(i) = a
}
}
Making such an algorithm parallel isn't straightforward, because of the traversal order (we also want to come up with an algorithm that runs in O(log n), which makes it even harder). It seems that this task is impossible, because the value of the last element depends on all the other elements (O(n) even if we have infinite parallel resources) . Running the algorithm in parallel is still possible though, with the following caveats:
Let's rewrite it so that it can be parallelized easier. This can be done by using map and reduce, as before. Assume we have the following functions:
def reduceSeg1[A](inp: Array[A], left: Int, right: Int,
a0: A, f: (A, A) => A): A
def mapSeg[A, B](inp: Array[A], left: Int, right: Int,
a0: A, fi: (Int, A) => B, out: Array[B]): Unit
Notice that the map function takes two arguments, not only one as before. The first argument is the index in the array.
Having these functions, we can implement scanLeft as follows:
def scanLeft[A](inp: Array[A], a0: A, f: (A, A) => A, out: Array[A]) = {
val fi = {(i: Int, val: A) => reduceSeg1(inp, 0, i, a0, f)}
mapSeg(inp, 0, inp.length, fi, out)
val last = inp.length - 1
out(last + 1) = f(out(last), inp(last))
}
In this example we don't reuse any computation. Because reduce applies the operations in a tree, we could try to save the intermediate results of this parallel computation. We can furthermore assume that the input collection is also a tree, which only has values in the leaves.
sealed abstract class Tree[A]
case class Leaf[A](a: A) extends Tree[A]
case class Node[A](l: Tree[A], r: Tree[A]) extends Tree[A]
Trees storing the intermediate values also have (res) values in the nodes:
sealed abstract class TreeRes[A] {val res: A }
case class LeafRes[A](override val res: A) extends TreeRes[A]
case class NodeRes[A](l: TreeRes[A],
override val res: A,
r: TreeRes[A]) extends TreeRes[A]
The reduceRes function required to reduce a Tree[A] into a TreeRes[A] can be implemented as follows:
def reduceRes[A](t: Tree[A], f: (A, A) => A): TreeRes[A] = t match {
case Leaf(v) => LeafRes(v)
case Node(l, r) => {
val (tL, tR) = (reduceRes(l, f), reduceRes(r, f))
NodeRes(tL, f(tL.res, tR.res), tR)
}
}
The parallel version of this function is almost identical, except that the two node reductions are executed in parallel:
def upsweep[A](t: Tree[A], f: (A, A) => A): TreeRes[A] = t match {
case Leaf(v) => LeafRes(v)
case Node(l, r) => {
val (tL, tR) = parallel(upsweep(l, f), upsweep(r, f))
NodeRes(tL, f(tL.res, tR.res), tR)
}
}
We can now use the result tree to build the final collection:
def downsweep[A](t: TreeRes[A], a0: A, f: (A, A) =>A): Tree[A] = t match {
case LeafRes(a) => Leaf(f(a0, a))
case NodeRes(l, _, r) => {
val (tL, tR) = parallel(downsweep[A](tL, a0, f),
downsweep[A](tR, f(a0, l.res), f))
Node(tL, tR)
}
}
The downsweeped tree doesn't contain the initial value, which will have to be prepended:
def prepend[A](x: A, t: Tree[A]): Tree[A] = t match {
case Leaf(v) => Node(Leaf(x), Leaf(v)
case Node(l, r) => Node(prepend(x, l), r)
}
The parallel implementation of scanLeft on trees is then:
def scanLeft[A](t: Tree[A], a0: A, f: (A, A) => A): Tree[A] = {
val tRes = upsweep(t, f)
val scan1 = downsweep(tRes, a0, f)
// adding the initial value
prepend(a0, scan1)
}
If we want to use scanLeft with arrays we will use trees that have arrays in the leaves, instead of individual elements:
sealed abstract class TreeResA[A] {val res: A }
case class LeafRes[A](from: Int, to: Int, override val res: A) extends TreeResA[A]
case class NodeRes[A](l: TreeResA[A],
override val res: A,
r: TreeResA[A]) extends TreeResA[A]
The only difference from before is that now we hold both the from and to indices of the array (instead of copying the array segment into each leaf). The upsweep function for arrays becomes:
def upsweep[A](inp: Array[A], from: Int, to:Int, f: (A, A) => A): TreeResA[A] = {
if (to - from < threshold)
Leaf(from, to, reduceSeg1(inp, from + 1, to, inp(from), f))
}
else {
val mid = (to - from) / 2
val (tL, tR) = parallel(upsweep(inp, from, mid, f),
upsweep(inp, mid, to, f))
Node(tL, f(tL.res, tR.res), tR)
}
}
// sequential algorithm for small segments
def reduceSeg1[A](inp: Array, from: Int, to: Int, a0: A, f: (A, A) => A): A = {
var a = a0
var i = from
while (i < to) {
a = f(a, inp(i))
i = i + 1
}
a
}
The downsweep function becomes:
def downsweep[A](inp: Array[A], a0: A, f: (A, A) => A,
t: TreeRes[A], out: Array[A]): Unit = t match {
case LeafRes(from, to, res) => scanLeftSeg(inp, from, to, a0, f, out)
case NodeRes(l, _, r) => {
val(_, _) = parallel(
downsweep(inp, a0, f, l, out),
downsweep(inp, f(a0, l.res), f, r, out)
)
}
}
The entire scanLeft on arrays will be:
def scanLeft[A](inp: Array[A], a0: A, f: (A, A) => A, out: Array[A]) = {
val t = upsweep(inp, 0, inp.length, f)
downsweep(inp, a0, f, t, out)
out(0) = a0
}