(Quick Reference)

3.6 Fork-Join - Reference Documentation

Authors: The Whole GPars Gang

Version: 1.1.0

3.6 Fork-Join

Fork/Join or Divide and Conquer is a very powerful abstraction to solve hierarchical problems.

The abstraction

When talking about hierarchical problems, think about quick sort, merge sort, file system or general tree navigation and such.

  • Fork / Join algorithms essentially split a problem at hands into several smaller sub-problems and recursively apply the same algorithm to each of the sub-problems.
  • Once the sub-problem is small enough, it is solved directly.
  • The solutions of all sub-problems are combined to solve their parent problem, which in turn helps solve its own parent problem.

Check out the fancy interactive Fork/Join visualization demo , which will show you how threads cooperate to solve a common divide-and-conquer algorithm.

The mighty JSR-166y library solves Fork / Join orchestration pretty nicely for us, but leaves a couple of rough edges, which can hurt you, if you don't pay attention enough. You still deal with threads, pools or synchronization barriers.

The GPars abstraction convenience layer

GPars can hide the complexities of dealing with threads, pools and recursive tasks from you, yet let you leverage the powerful Fork/Join implementation in jsr166y.

import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPool

withPool() { println """Number of files: ${ runForkJoin(new File("./src")) {file -> long count = 0 file.eachFile { if (it.isDirectory()) { println "Forking a child task for $it" forkOffChild(it) //fork a child task } else { count++ } } return count + (childrenResults.sum(0)) //use results of children tasks to calculate and store own result } }""" }

The runForkJoin() factory method will use the supplied recursive code together with the provided values and build a hierarchical Fork/Join calculation. The number of values passed to the runForkJoin() method must match the number of expected parameters of the closure as well as the number of arguments passed into the forkOffChild() or runChildDirectly() methods.

def quicksort(numbers) {
    withPool {
        runForkJoin(0, numbers) {index, list ->
            def groups = list.groupBy {it <=> list[list.size().intdiv(2)]}
            if ((list.size() < 2) || (groups.size() == 1)) {
                return [index: index, list: list.clone()]
            }
            (-1..1).each {forkOffChild(it, groups[it] ?: [])}
            return [index: index, list: childrenResults.sort {it.index}.sum {it.list}]
        }.list
    }
}

The important piece of the puzzle that needs to be mentioned here is that forkOffChild() doesn't wait for the child to run. It merely schedules it for execution some time in the future. If a child fails by throwing an exception, you should not expect the exception to be fired from the forkOffChild() method itself. The exception ise likely to happen long after the parent has returned from the call to the forkOffChild() method.

It is the getChildrenResults() method that will re-throw exceptions that happened in the child sub-tasks back to the parent task.

Alternative approach

Alternatively, the underlying mechanism of nested Fork/Join worker tasks can be used directly. Custom-tailored workers can eliminate the performance overhead associated with parameter spreading imposed when using the generic workers. Also, custom workers can be implemented in Java and so further increase the performance of the algorithm.

public final class FileCounter extends AbstractForkJoinWorker<Long> {
    private final File file;

def FileCounter(final File file) { this.file = file }

@Override protected Long computeTask() { long count = 0; file.eachFile { if (it.isDirectory()) { println "Forking a thread for $it" forkOffChild(new FileCounter(it)) //fork a child task } else { count++ } } return count + ((childrenResults)?.sum() ?: 0) //use results of children tasks to calculate and store own result } }

withPool(1) {pool -> //feel free to experiment with the number of fork/join threads in the pool println "Number of files: ${runForkJoin(new FileCounter(new File("..")))}" }

The AbstractForkJoinWorker subclasses may be written both in Java or Groovy, giving you the option to easily optimize for execution speed, if row performance of the worker becomes a bottleneck.

Fork / Join saves your resources

Fork/Join operations can be safely run with small number of threads thanks to internally using the TaskBarrier class to synchronize the threads. While a thread is blocked inside an algorithm waiting for its sub-problems to be calculated, the thread is silently returned to the pool to take on any of the available sub-problems from the task queue and process them. Although the algorithm creates as many tasks as there are sub-directories and tasks wait for the sub-directory tasks to complete, as few as one thread is enough to keep the computation going and eventually calculate a valid result.

Mergesort example

import static groovyx.gpars.GParsPool.runForkJoin
import static groovyx.gpars.GParsPool.withPool

/** * Splits a list of numbers in half */ def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }

/** * Merges two sorted lists into one */ List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize)

while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] }

if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }

final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]

withPool(3) { //feel free to experiment with the number of fork/join threads in the pool println """Sorted numbers: ${ runForkJoin(numbers) {nums -> println "Thread ${Thread.currentThread().name[-1]}: Sorting $nums" switch (nums.size()) { case 0..1: return nums //store own result case 2: if (nums[0] <= nums[1]) return nums //store own result else return nums[-1..0] //store own result default: def splitList = split(nums) [splitList[0], splitList[1]].each {forkOffChild it} //fork a child task return merge(* childrenResults) //use results of children tasks to calculate and store own result } } }""" }

Mergesort example using a custom-tailored worker class

public final class SortWorker extends AbstractForkJoinWorker<List<Integer>> {
    private final List numbers

def SortWorker(final List<Integer> numbers) { this.numbers = numbers.asImmutable() }

/** * Splits a list of numbers in half */ def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }

/** * Merges two sorted lists into one */ List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize)

while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] }

if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }

/** * Sorts a small list or delegates to two children, if the list contains more than two elements. */ @Override protected List<Integer> computeTask() { println "Thread ${Thread.currentThread().name[-1]}: Sorting $numbers" switch (numbers.size()) { case 0..1: return numbers //store own result case 2: if (numbers[0] <= numbers[1]) return numbers //store own result else return numbers[-1..0] //store own result default: def splitList = split(numbers) [new SortWorker(splitList[0]), new SortWorker(splitList[1])].each{forkOffChild it} //fork a child task return merge(* childrenResults) //use results of children tasks to calculate and store own result } } }

final def numbers = [1, 5, 2, 4, 3, 8, 6, 7, 3, 4, 5, 2, 2, 9, 8, 7, 6, 7, 8, 1, 4, 1, 7, 5, 8, 2, 3, 9, 5, 7, 4, 3]

withPool(1) { //feel free to experiment with the number of fork/join threads in the pool println "Sorted numbers: ${runForkJoin(new SortWorker(numbers))}" }

Running child tasks directly

The forkOffChild() method has a sibling - the runChildDirectly() method, which will run the child task directly and immediately within the current thread instead of scheduling the child task for asynchronous processing on the thread pool. Typically you'll call _forkOffChild() on all sub-tasks but the last, which you invoke directly without the scheduling overhead.

Closure fib = {number ->
            if (number <= 2) {
                return 1
            }
            forkOffChild(number - 1)                            //  This task will run asynchronously, probably in a different thread
            final def result = runChildDirectly(number - 2)     //  This task is run directly within the current thread
            return (Integer) getChildrenResults().sum() + result
        }

withPool { assert 55 == runForkJoin(10, fib) }

Availability

This feature is only available when using in the Fork/Join-based GParsPool , not in GParsExecutorsPool .