Historically, parallel "hardware" was developed in the early days of the mechanical computers (see: Charles Babbage, "The Analytical Engine"). The apparition of multi-core CPUs in the early 21st Century due to hitting the power wall led to an acceleration in the adoption of parallel computing.
Speedup is the main reason for dealing with this extra complexity.
Parallelism and concurrency are related.
Parallelism uses parallel hardware to execute code faster. Main concerns are:
With concurrency, multiple executions may or may not execute at the same time. Main concerns:
Parallelism relies on an algorithmic approach to problems rather than code organization. Parallelism manifests on various granularity levels:
bit-level parallelism led to the increase of "words" in CPUs from 4 bit to 8-bit, 16-bit, 32-bit and more recently 64-bit;
instruction-level parallelism led to executing multiple instructions at the same time. A program such as:
val b = a1 + a2;
val c = a3 + a4;
val d = c + b;
can execute the first two instructions at the same time, because they are independent. However, the third instruction can only be executed after the first two instructions have completed.
task-level parallelism executes separate instruction streams at the same time, on the same or on different data. This is often achieved through software support and it is the main focus of this course.
There are many classes of parallel hardware:
Assumptions:
Operating system is software that manages the hardware and software resources and schedules program execution.
Process is an instance of a program executing inside an operating system. Processes are the most coarse grained units of concurrency on a shared memory OS. Processes are multiplexed by the CPU using time slices. This is called multitasking. Processes have isolated memory spaces.
Each process can contain multiple concurrency units called threads. Threads share the same memory space. Each thread has a program counter and a program stack.
Each JVM program starts a main thread. On the JVM, in order to start a new execution thread we must:
class HelloThread extends Thread {
override def run() {
println("Hello world!");
}
}
val t = new HelloThread
t.start()
t.join()
The memory model is a set of rules that governs how threads interact when accessing shared memory. For instance, when thread T1 writes a value into shared memory, what value does thread T2 see and when does it see it? There are two core rules of the memory model:
The most simple way of describing parallelism is the following:
parallel(e1, e2)
e1 and e2 are two expressions that must be evaluated in parallel.
Take for instance the calculation of p-norm. A 2-norm is used to calculate the length of a vector:
$$ (a_1^2 + a_2^2)^{1/2}$$
In a p-dimensional space, the p-norm of a vector:
$$(a_1, a_2, ... a_n) $$
is:
$$ (\sum_{i=1}^n |a_i|^p))^{1/p}$$
Subproblem: computing the sum of powers of an array segment. Given:
we must compute:
$$ (\sum_{i=s}^{t-1} |a_i|^p))$$
with
$$|a_i|^p$$
rounded down to an integer.
Writing this sequentially:
def sumSegment(a: Array[Int], p: Double, s: Int, t:Int): Int = {
var i = s;
var sum: Int = 0;
while (i < t) {
sum = sum + power(a[i], p)
i = i+1
}
sum
}
def power(x: Int, p: Double): Int = math.exp(p * math.log(abs(x))).toInt
math.exp returns Euler's number: e raised to the power of a double value.
Once we have sumSegment we can calculate the p-norm as:
def pNorm(a: Array[Int], p: Double): Int
= power(sumSegment(a, p, 0, a.length), 1/p)
Notice that:
$$ (\sum_{i=0}^{n-1} |ai|^p)) = (\sum{i=0}^{m-1} |ai|^p + \sum{i=m}^{n-1} |a_i|^p)$$
For efficiency, we choose m to be somewhere in the middle of the vector:
def pNormTwoPart(a: Array[Int], p: Double): Int = {
val m = a.length / 2
val (sum1, sum2) = parallel(sumSegment(a, p, 0, m),
sumSegment(a, p, m, a.length))
power(sum1+sum2, 1/p)
}
The parallel combinator will cause the execution of the two sumSegments to be run in parallel.
A generic recursive parallel algorithm:
def pNormRec(a: Array[Int], p: Double): Int =
power(segmentRec(a, p, 0, a.length), 1/p))
// like sumSegment, but parallel
def segmentRec(a: Array[Int], p: Double, s: Int, t: Int): Int = {
if (t-s < threshold) {
sumSegment(a, p, s, t) // small segment, do it sequentially
}
else {
val m = s + (t-s)/2
val (sum1, sum2) = parallel(segmentRec(a, p, s, m),
segmentRec(a, p, m, t))
sum1 + sum2
}
}
The signature of parallel is:
def parallel[A, B](taskA: => A, taskB: => B): (A, B) = { ..... }
This is an algorithm for estimating the value of Pi. Take a circle of radius 1 and a square that boxes the circle around. Alternatively, we can use just a quarter of the circle.

The ratio between the area of the square and the circle segment is:
$$ \lambda = \frac{(\frac{(\pi 1^2)}{4})}{1^2} = \frac{\pi}{4}$$
In order to estimate this ratio, we can randomly sample points within the square and calculate how many of them fall inside the circle. We then multiply the estimated ratio by 4 in order to obtain the estimation of Pi.
import scala.util.Random
def mcCount(iter: Int): Int = {
val randomX = new Random
val randomY = new Random
var hits = 0
for (i <- 0 until iter) {
val x = randomX.nextDouble // in [0, 1)
val y = randomY.nextDouble // in [0, 1)
if ((x*x + y*y) < 1) {
hits = hits + 1
}
}
hits
}
def monteCarloPiSeq(iter: Int): Double = 4.0 * mcCount(iter) / iter
def monteCarloPiPar(iter: Int): Double = {
val ((count1, count2), (count3, count4)) = parallel(
parallel(mcCount(iter/4), mcCount(iter/4)),
parallel(mcCount(iter/4), mcCount(iter-3*(iter/4))))
4.0 * (count1 + count2 + count3 + count4) / iter
}
A more flexible construct to describe parallel computations. Instead of using:
val (v1, v2) = parallel(e1, e2)
we can use the task construct as follows:
val t1 = task(e1)
val t2 = task(e2)
val v1 = t1.join
val v2 = t2.join
t = task(e) will start the computation of e in the background.
A minimal interface for task looks like this:
def task(c: => A): Task[A]
trait Task[A] {
def join: A
}
task and join establish maps between computations and tasks. In terms of the computed value, the following equation holds: task(e).join = e. We can ommit writing .join if we also define an implicit conversion:
implicit def getJoin[T](x: Task[T]): T = x.join
We can now rewrite the four-way parallel p-norm:
val ((part1, part2), (part3, part4)) = parallel(
parallel(sumSegment(a, p, 0, mid1),
sumSegment(a, p, mid1, mid2)),
parallel(sumSegment(a, p, mid2, mid3),
sumSegment(a, p, mid3, a.length))
)
power (part1 + part2 + part3 + part4, 1/p)
as follows:
val t1 = task {sumSegment(a, p, 0, mid1)}
val t2 = task {sumSegment(a, p, mid1, mid2)}
val t3 = task {sumSegment(a, p, mid2, mid3)}
val t4 = task {sumSegment(a, p, mid3, a.length)}
power (t1 + t2 + t3 + t4, 1/p)
Notice that because of the default declaration, reading the value of t1 will essentially invoke t1.join.
Assuming we have the definition for task, how can we rewrite the *parallel construction using it?
def parallel[A, B](cA: => A, cB: => b): (A, B) = {
val tb: Task[B] = task {cB} // tb execution spawned in parallel
val ta: A = cA // ta execution occurs in the "main" thread
(ta, tb.join) // tb.join blocks until the tb computation is finished
}
Performance of parallel programs can be estimated using:
Asymptotic analysis is important to understand how algorithms scale when:
We examine worst-case (not average) bounds.
For instance: the time bound on the sequential execution of the sumSegment function is: O(t-s), where t and s are the segment bounds. The time bound on the parallel execution of the sumRec function is: O(log(t-s)) due to the parallelism.
The conclusions above assumed an infinite number of parallel resources. When doing the asymptotic analysis of parallel algorithms, we use two measures:
Measuring performance usually leads to variable measurement results. In order to analyse the performance results we must:
ScalaMeter is a benchmarking and performance regression testing tool on the JVM. Using ScalaMeter is done by adding its library to the sbt project:
libraryDependencies += "com.storm-enroute" %% "scalameter-core" %% "0.6"
Then, in the testing code we would use the framework as follows:
import org.scalameter._
val time = measure {
(0 until 1000000).toArray
}
println(s"Array initialization time: $time ms")
Running this program multiple times, we notice a wide range of outputs. This is because of the way that the JVM warms up:
What we want to measure is the steady state performance. With ScalaMeter we can do that:
import org.scalameter._
val time = withWarmer(new Warmer.Default) measure {
(0 until 1000000).toArray
}
println(s"Array initialization time: $time ms")
A configuration clause can be used to tweak the parameters of the Warmer:
val time = config(
Key.exec.minWarmupRuns -> 20,
Key.exec.maxWarmupRuns -> 60,
Key.verbose -> true
) withWarmer(new Warmer.Default) measure {
(0 until 1000000).toArray
}
ScalaMeter can measure more than just running time:
withMeasurer(new Mesurer.MemoryFootPrint) measure {
(0 until 1000000).toArray
}