301.02 - Basic task parallel algorithms

Parallel sorting

Merge sort algorithm:

  • recursively sort two half of an array (in parallel);
  • sequentially merge the two halves by copying them into a temporary array;
  • copy the temporary array back into the original array;

The parMergeSort method has the following signature:

def parMergeSort(xs: Array[Int], maxDepth: Int): Unit

maxDepth gives the level of parallelism. The method sorts the array in place, as a side-effect.

We start by allocating an intermediary array:

val ys: Array[Int](xs.length)

Our algorithm will alternate between the two arrays.

def sort(from: Int, until: Int, depth: Int): Unit = {
  if (depth == maxDepth) {
    quickSort(xs, from, until-from)
  }
  else {
    val mid = (from + until) / 2
    parallel(sort(from, mid, depth+1), 
             sort(mid, until, depth+1))

    // merge back the sorted array
    val flip = ((maxDepth - depth) % 2 == 0)
    val src = if (flip) ys else xs
    val target = if (flip) xs else ys
    merge(src, target, from, mid, until)
  }
}

sort(0, xs.length, 0)

Copying the array can also be done in parallel:

def copy(src: Array[Int], target: Array[Int], 
         from: Int, until: Int, depth: Int): Unit = {

  if (depth == maxDepth) {
    Array.copy(src, from, target, from, until-from)
  }
  else {
    val mid = (from + until) / 2
    parallel(sort(src, target, from, mid, depth+1),
             sort(src, target, mid, until, depth+1))
  }
}

if ((maxDepth % 2) == 0) copy(ys, xs, 0, xs.length, 0)

Data operations and parallel mapping

Operations on collections are key to functional programming even in the sequential case:

List(1, 3, 8).map(x => x*x) == List(1, 9, 64)

List(1, 3, 8).fold(100)((s, x) => s+x) == 112

// combined fold of all list prefixes List(), List(1), List(1,3), List(1,3,8)
List(1, 3, 8).scan(100)((s, x) => s+x) == List(100, 101, 104, 112)

Lists aren't very good for parallelization, because it is difficult to split and combine them. It is better to use instead arrays and trees, which are easily supported in the functional model.

Map

List(a1, a2, ..., an).map(f) == List(f(a1), f(a2), ..., f(an))

Properties of map:

list.map(x => x) == list  // identity function
list.map(f.compose(g)) == list.map(g).map(f)

(f.compose(g))(x) = f(g(x))

A sequential definition of a recursive map is straightforward:

def mapSeq[A, B](lst: List[A], f: A => B): List[B] = lst match {
  case Nil => Nil
  case h :: t => f(h) :: mapSeq(t, f)
}

We want a version that parallelizes:

  • the calculation of f(h)
  • finding the elements themselves (list is not a good choice)

Sequential map of an array producing an array

def mapASegSeq[A, b](inp: Array[A], left: Int, right: Int, 
                     f: A => B, out: Array[B]) = {
  var i = left
  while (i < right) {
    out(i) = f(inp(i))
    i++
  }
}

val in = Array(2, 3, 4, 5, 6)
val out = Array (0, 0, 0, 0, 0)
val f = (x: Int) => x*x
mapASegSeq(in, 1, 3, f, out)
out

Parallel map of an array producing an array

def mapASegPar[A, b](inp: Array[A], left: Int, right: Int, 
                     f: A => B, out: Array[B]) = {

  if (right - left > threshold) {
    mapASegSeq(inp, left, right, f, out)
  }
  else {
    val mid: Int = (right - left) / 2
    parallel(mapASegPar(inp, left, mid, f, out),
             mapASegPar(inp, mid, right, f, out))
  }
}

Note:

  • writes need to be disjoint
  • threshold needs to be large enough, otherwise we lose efficiency

Parallel fold (reduce) operations

List(1, 3, 8).fold(100)((s, x) => s + x) == 112

List(1, 3, 8).foldLeft(100)((s, x) => s - x) == ((100-1)-3)-8 == 88
List(1, 3, 8).foldRight(100)((s, x) => s - x) == 1-(3-(8-100)) == -94
List(1, 3, 8).reduceLeft((s, x) => s - x) == (1-3)-8 == -10
List(1, 3, 8).reduceRight((s, x) => s - x) == 1-(3-8) == 6

To enable parallel processing, we are looking to associative operations:

  • additions, string concatenations;
  • NOT subtraction;

Operations can be represented as trees, with the values being the leafs and the operators being the nodes.

sealed abstract class Tree[A]
case class Leaf[A](value: A) extends Tree[A]
case class Node[A](left: Tree[A], right: Tree[A]) extends Tree[A]

The result of the evaluation of the expression is given by a reduce of the tree.

A sequential definition of the reduce would be:

def reduce[A](t: Tree[A], f : (A, A) => A): A = t match {
  case Leaf(v) => v
  case Node(l, r) => f(reduce(l, f), reduce(r, f))
}

We can think of reduce as replacing the constructor of Node with a given f.

Making the reduce function parallel, is easy:

def reduce[A](t: Tree[A], f : (A, A) => A): A = t match {
  case Leaf(v) => v
  case Node(l, r) => {
      val (lV, rV) = parallel(reduce(l, f), reduce(r, f))
      f(lV, rV)
  }
}

This operation is linear to the height of the tree: n

Assuming that f is associative, we have:

reduce(Node(Node(Leaf(x), Leaf(y)), Leaf(z)), f) ==
reduce(Node(Leaf(x), Node(Leaf(y), Leaf(z))), f)  

We can describe the order of elements in the tree, we can consider converting the tree into a list:

def toList[A](t: Tree[A]): List[A] = t match {
  case Leaf(v) => List(v)
  case Node(l, r) => toList(l) ++ toList(r)
}

Suppose we also have a tree map, that replaces all elements in the tree by mapping them through a function:

def map[A](t: Tree[A], f: A => B): Tree[A] = t match {
  case Leaf(v) => f(v)
  case Node(l, r) => Node(map(l, f), map(r, f))
}

toList can be expressed in terms of map and reduce as follows:

def toList[A](t: Tree[A]): List[A] = reduce(map(t, List(_)), _ ++ _)

If:

  • f : (A, A) => A is associative;
  • t1: Tree[A] and t2: Tree[A];
  • toList(t1) == toList(t2)

then:

  • reduce(t1, f) == reduce(t2, f)

So, applying if f is associative, the result of reduce is the same, as long as the order of elements (toList) is the same.

This rule extends to structures other than trees. For instance, we can convert an array to a balanced tree and apply reduce to it.

Here is how we can reduce an array in parallel:

def reduceSeg[A](inp: Array[A], left: Int, right: Int, 
                 f: (A, A) => A): A = {
  if (right - left < threshold) {
    var res = inp(left)
    var i = left + 1
    while (i < right) {
      res = f(res, inp(i))
      i = i + 1
    }
  }
  else {
    val mid = (right - left) / 2
    val (a1, a2) = (parallel(reduceSeg(inp, left, mid, f),
                             reduceSeg(inp, mid, right, f)))
    f(a1, a2)
  }
}

def reduce[A](inp: Array[A], f: (A, A) => A): A =
   reduceSeg(inp, 0, inp.length, f)

Associativity

Operation:

f: (A, A) => A

is associative if for any x, y, z:

f(x, f(y, z)) == f(f(x, y), z)

Consequences:

  • two expressions with the same list of operands evaluate to the same result, regardless the order of parenthesis;
  • reduce on any tree with this list of operands gives the same result;

Associativity and commutativity aren't inter-related.

Associative operations on tuples

Assuming two associative functions:

f1: (A1, A1) => A1
f2: (A2, A2) => A2

we can construct an associative function on tuples as follows:

f: ((A1, A2), (A1, A2)) => (A1, A2)
f((x1, x2), (y1, y2)) = (f1(x1, y1), f2(x2, y2))

Example:

times((x1, y1), (x2, y2)) = (x1*x2, y1*y2)

Because the multiplication is associative, the times function of tuples is also associative.

Another example - the average of elements in a collection:

val sum = reduce(collection, _ + _)
val length = reduce(map(collection, (x: Int) => 1), _ + _)
sum / length

A solution that uses a single reduce instead of two would be:

def f((sum1, len1), (sum2, len2)) = (sum1 + sum2, len1 + len2)

val (sum, length) = reduce(map(collection, (x: Int) => (x, 1)), f)
sum / length

Functions that are commutative aren't necessarily associative. But functions that are commutative AND have rotating arguments are also associative.

Parallel scan left

Consider the scanLeft operation, that has aspects of both map and fold:

List(1, 3, 8).scanLeft(100)((s, x) => s + x ) = List (100, 101, 104, 112)

Symbolically, we can say that:

List(a1, a2, a3).scanLeft(a0)(f) = List(b0, b1, b2, b3)

b0 = a0
b1 = f(b0, a1)
b2 = f(b1, a2)
b3 = f(b2, a3)

We assume that f is an associative function. Even so, scanRight is different than scanLeft:

List(1, 3, 8).scanRight(100)((s, x) => s + x ) = List (112, 111, 108, 100)

scanRight and scanLeft are dual operations so everything we say about scanLeft here will also apply to scanRight.

Here is a sequential implementation of scanLeft:

def scanLeft[A](inp: Array[A], a0: A, f: (A, A) => A, 
                out: Array[A]): Unit = {

  out(0) = a0
  var a = a0
  var i = 0
  while (i < inp.length) {
    a = f(a, inp(i))
    i = i + 1
    out(i) = a
  }
}

Making such an algorithm parallel isn't straightforward, because of the traversal order (we also want to come up with an algorithm that runs in O(log n), which makes it even harder). It seems that this task is impossible, because the value of the last element depends on all the other elements (O(n) even if we have infinite parallel resources) . Running the algorithm in parallel is still possible though, with the following caveats:

  • we are going to give up reusing all intermediary results;
  • we are going to do more work (more f applications);
  • the parallel solution will fully compensate for the extra computation;

Let's rewrite it so that it can be parallelized easier. This can be done by using map and reduce, as before. Assume we have the following functions:

def reduceSeg1[A](inp: Array[A], left: Int, right: Int, 
                  a0: A, f: (A, A) => A): A

def mapSeg[A, B](inp: Array[A], left: Int, right: Int, 
                 a0: A, fi: (Int, A) => B, out: Array[B]): Unit

Notice that the map function takes two arguments, not only one as before. The first argument is the index in the array.

Having these functions, we can implement scanLeft as follows:

def scanLeft[A](inp: Array[A], a0: A, f: (A, A) => A, out: Array[A]) = {
  val fi = {(i: Int, val: A) => reduceSeg1(inp, 0, i, a0, f)}
  mapSeg(inp, 0, inp.length, fi, out)
  val last = inp.length - 1
  out(last + 1) = f(out(last), inp(last))
}

In this example we don't reuse any computation. Because reduce applies the operations in a tree, we could try to save the intermediate results of this parallel computation. We can furthermore assume that the input collection is also a tree, which only has values in the leaves.

sealed abstract class Tree[A]
case class Leaf[A](a: A) extends Tree[A]
case class Node[A](l: Tree[A], r: Tree[A]) extends Tree[A]

Trees storing the intermediate values also have (res) values in the nodes:

sealed abstract class TreeRes[A] {val res: A }
case class LeafRes[A](override val res: A) extends TreeRes[A]
case class NodeRes[A](l: TreeRes[A], 
                      override val res: A, 
                      r: TreeRes[A]) extends TreeRes[A]

The reduceRes function required to reduce a Tree[A] into a TreeRes[A] can be implemented as follows:

def reduceRes[A](t: Tree[A], f: (A, A) => A): TreeRes[A] = t match {
  case Leaf(v) => LeafRes(v)
  case Node(l, r) => {
    val (tL, tR) = (reduceRes(l, f), reduceRes(r, f))
    NodeRes(tL, f(tL.res, tR.res), tR)
  }
}

The parallel version of this function is almost identical, except that the two node reductions are executed in parallel:

def upsweep[A](t: Tree[A], f: (A, A) => A): TreeRes[A] = t match {
  case Leaf(v) => LeafRes(v)
  case Node(l, r) => {
    val (tL, tR) = parallel(upsweep(l, f), upsweep(r, f))
    NodeRes(tL, f(tL.res, tR.res), tR)
  }
}

We can now use the result tree to build the final collection:

def downsweep[A](t: TreeRes[A], a0: A, f: (A, A) =>A): Tree[A] = t match {
  case LeafRes(a) => Leaf(f(a0, a))
  case NodeRes(l, _, r) => {
    val (tL, tR) = parallel(downsweep[A](tL, a0, f),
                            downsweep[A](tR, f(a0, l.res), f))
    Node(tL, tR)
  }
} 

The downsweeped tree doesn't contain the initial value, which will have to be prepended:

def prepend[A](x: A, t: Tree[A]): Tree[A] = t match {
  case Leaf(v) => Node(Leaf(x), Leaf(v)
  case Node(l, r) => Node(prepend(x, l), r)
}

The parallel implementation of scanLeft on trees is then:

def scanLeft[A](t: Tree[A], a0: A, f: (A, A) => A): Tree[A] = {
  val tRes = upsweep(t, f)
  val scan1 = downsweep(tRes, a0, f)
  // adding the initial value
  prepend(a0, scan1)
}

If we want to use scanLeft with arrays we will use trees that have arrays in the leaves, instead of individual elements:

sealed abstract class TreeResA[A] {val res: A }
case class LeafRes[A](from: Int, to: Int, override val res: A) extends TreeResA[A]
case class NodeRes[A](l: TreeResA[A], 
                      override val res: A, 
                      r: TreeResA[A]) extends TreeResA[A]

The only difference from before is that now we hold both the from and to indices of the array (instead of copying the array segment into each leaf). The upsweep function for arrays becomes:

def upsweep[A](inp: Array[A], from: Int, to:Int, f: (A, A) => A): TreeResA[A] = {
  if (to - from < threshold)
    Leaf(from, to, reduceSeg1(inp, from + 1, to, inp(from), f))
  }
  else {
    val mid = (to - from) / 2
    val (tL, tR) = parallel(upsweep(inp, from, mid, f),
                            upsweep(inp, mid, to, f))
    Node(tL, f(tL.res, tR.res), tR)
  }
}

// sequential algorithm for small segments
def reduceSeg1[A](inp: Array, from: Int, to: Int, a0: A, f: (A, A) => A): A = {
  var a = a0
  var i = from
  while (i < to) {
    a = f(a, inp(i))
    i = i + 1
  }
  a
} 

The downsweep function becomes:

def downsweep[A](inp: Array[A], a0: A, f: (A, A) => A, 
                 t: TreeRes[A], out: Array[A]): Unit = t match {
  case LeafRes(from, to, res) => scanLeftSeg(inp, from, to, a0, f, out)
  case NodeRes(l, _, r) => {
    val(_, _) = parallel(
      downsweep(inp, a0, f, l, out),
      downsweep(inp, f(a0, l.res), f, r, out)
    )
  } 
}

The entire scanLeft on arrays will be:

def scanLeft[A](inp: Array[A], a0: A, f: (A, A) => A, out: Array[A]) = {
  val t = upsweep(inp, 0, inp.length, f)
  downsweep(inp, a0, f, t, out)
  out(0) = a0
}