(Quick Reference)

3.1.1 GParsPool - Reference Documentation

Authors: The Whole GPars Gang

Version: 1.0-SNAPSHOT

3.1.1 GParsPool

Use of GParsPool - the JSR-166y based concurrent collection processor

Usage of GParsPool

The GParsPool class enables a ParallelArray-based (from JSR-166y) concurrency DSL for collections and objects.

Examples of use:

//summarize numbers concurrently
 GParsPool.withPool {
     final AtomicInteger result = new AtomicInteger(0)
     [1, 2, 3, 4, 5].eachParallel {result.addAndGet(it)}
     assert 15 == result
 }

//multiply numbers asynchronously GParsPool.withPool { final List result = [1, 2, 3, 4, 5].collectParallel {it * 2} assert ([2, 4, 6, 8, 10].equals(result)) }

The passed-in closure takes an instance of a ForkJoinPool as a parameter, which can be then used freely inside the closure.
//check whether all elements within a collection meet certain criteria
 GParsPool.withPool(5) {ForkJoinPool pool ->
     assert [1, 2, 3, 4, 5].everyParallel {it > 0}
     assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
 }
The GParsPool.withPool() method takes optional parameters for number of threads in the created pool and an unhandled exception handler.
withPool(10) {...}
withPool(20, exceptionHandler) {...}

The GParsPool.withExistingPool() takes an already existing ForkJoinPool instance to reuse. The DSL is valid only within the associated block of code and only for the thread that has called the withPool() or withExistingPool() methods. The withPool() method returns only after all the worker threads have finished their tasks and the pool has been destroyed, returning back the return value of the associated block of code. The withExistingPool() method doesn't wait for the pool threads to finish.

Alternatively, the GParsPool class can be statically imported import static groovyx.gpars.GParsPool.`*` , which will allow omitting the GParsPool class name.

withPool {
     assert [1, 2, 3, 4, 5].everyParallel {it > 0}
     assert ![1, 2, 3, 4, 5].everyParallel {it > 1}
 }

The following methods are currently supported on all objects in Groovy:

  • eachParallel()
  • eachWithIndexParallel()
  • collectParallel()
  • collectManyParallel()
  • findAllParallel()
  • findAnyParallel
  • findParallel()
  • everyParallel()
  • anyParallel()
  • grepParallel()
  • groupByParallel()
  • foldParallel()
  • minParallel()
  • maxParallel()
  • sumParallel()
  • splitParallel()
  • countParallel()
  • foldParallel()

Meta-class enhancer

As an alternative you can use the ParallelEnhancer class to enhance meta-classes of any classes or individual instances with the parallel methods.

import groovyx.gpars.ParallelEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9] ParallelEnhancer.enhanceInstance(list) println list.collectParallel {it * 2 }

def animals = ['dog', 'ant', 'cat', 'whale'] ParallelEnhancer.enhanceInstance animals println (animals.anyParallel {it ==~ /ant/} ? 'Found an ant' : 'No ants found') println (animals.everyParallel {it.contains('a')} ? 'All animals contain a' : 'Some animals can live without an a')

When using the ParallelEnhancer class, you're not restricted to a withPool() block with the use of the GParsPool DSLs. The enhanced classed or instances remain enhanced till they get garbage collected.

Exception handling

If an exception is thrown while processing any of the passed-in closures, the first exception gets re-thrown from the xxxParallel methods and the algorithm stops as soon as possible.

The exception handling mechanism of GParsPool builds on the one built into the Fork/Join framework. Since Fork/Join algorithms are by nature hierarchical, once any part of the algorithm fails, there's usually little benefit from continuing the computation, since some branches of the algorithm will never return a result.

Bear in mind that the GParsPool implementation doesn't give any guarantees about its behavior after a first unhandled exception occurs, beyond stopping the algorithm and re-throwing the first detected exception to the caller. This behavior, after all, is consistent with what the traditional sequential iteration methods do.

Transparently parallel collections

On top of adding new xxxParallel() methods, GPars can also let you change the semantics of the original iteration methods. For example, you may be passing a collection into a library method, which will process your collection in a sequential way, let say using the collect() method. By changing the semantics of the collect() method on your collection you can effectively parallelize the library sequential code.

GParsPool.withPool {

//The selectImportantNames() will process the name collections concurrently assert ['ALICE', 'JASON'] == selectImportantNames(['Joe', 'Alice', 'Dave', 'Jason'].makeConcurrent()) }

/** * A function implemented using standard sequential collect() and findAll() methods. */ def selectImportantNames(names) { names.collect {it.toUpperCase()}.findAll{it.size() > 4} }

The makeSequential() method will reset the collection back to the original sequential semantics.

import static groovyx.gpars.GParsPool.withPool

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]

println 'Sequential: ' list.each { print it + ',' } println()

withPool {

println 'Sequential: ' list.each { print it + ',' } println()

list.makeConcurrent()

println 'Concurrent: ' list.each { print it + ',' } println()

list.makeSequential()

println 'Sequential: ' list.each { print it + ',' } println() }

println 'Sequential: ' list.each { print it + ',' } println()

The asConcurrent() convenience method will allow you to specify code blocks, in which the collection maintains concurrent semantics.

import static groovyx.gpars.GParsPool.withPool

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]

println 'Sequential: ' list.each { print it + ',' } println()

withPool {

println 'Sequential: ' list.each { print it + ',' } println()

list.asConcurrent { println 'Concurrent: ' list.each { print it + ',' } println() }

println 'Sequential: ' list.each { print it + ',' } println() }

println 'Sequential: ' list.each { print it + ',' } println()

Transparent parallelizm, including the makeConcurrent() , makeSequential() and asConcurrent() methods, is also available in combination with ParallelEnhancer .

/**
 * A function implemented using standard sequential collect() and findAll() methods.
 */
def selectImportantNames(names) {
    names.collect {it.toUpperCase()}.findAll{it.size() > 4}
}

def names = ['Joe', 'Alice', 'Dave', 'Jason'] ParallelEnhancer.enhanceInstance(names) //The selectImportantNames() will process the name collections concurrently assert ['ALICE', 'JASON'] == selectImportantNames(names.makeConcurrent())

import groovyx.gpars.ParallelEnhancer

def list = [1, 2, 3, 4, 5, 6, 7, 8, 9]

println 'Sequential: ' list.each { print it + ',' } println()

ParallelEnhancer.enhanceInstance(list)

println 'Sequential: ' list.each { print it + ',' } println()

list.asConcurrent { println 'Concurrent: ' list.each { print it + ',' } println()

} list.makeSequential()

println 'Sequential: ' list.each { print it + ',' } println()

Avoid side-effects in functions

We have to warn you. Since the closures that are provided to the parallel methods like eachParallel() or collectParallel() may be run in parallel, you have to make sure that each of the closures is written in a thread-safe manner. The closures must hold no internal state, share data nor have side-effects beyond the boundaries the single element that they've been invoked on. Violations of these rules will open the door for race conditions and deadlocks, the most severe enemies of a modern multi-core programmer.

Don't do this:

def thumbnails = []
images.eachParallel {thumbnails << it.thumbnail}  //Concurrently accessing a not-thread-safe collection of thumbnails, don't do this!
At least, you've been warned.