7.1 Tasks - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.1.0
7.1 Tasks
The Dataflow tasks give you an easy-to-grasp abstraction of mutually-independent logical tasks or threads, which can run concurrently and exchange data solely through Dataflow Variables, Queues, Broadcasts and Streams. Dataflow tasks with their easy-to-express mutual dependencies and inherently sequential body could also be used as a practical implementation of UML Activity Diagrams .Check out the examples.A simple mashup example
In the example we're downloading the front pages of three popular web sites, each in their own task, while in a separate task we're filtering out sites talking about Groovy today and forming the output. The output task synchronizes automatically with the three download tasks on the three Dataflow variables through which the content of each website is passed to the output task.import static groovyx.gpars.GParsPool.withPool import groovyx.gpars.dataflow.DataflowVariable import static groovyx.gpars.dataflow.Dataflow.task /** * A simple mashup sample, downloads content of three websites * and checks how many of them refer to Groovy. */def dzone = new DataflowVariable() def jroller = new DataflowVariable() def theserverside = new DataflowVariable()task { println 'Started downloading from DZone' dzone << 'http://www.dzone.com'.toURL().text println 'Done downloading from DZone' }task { println 'Started downloading from JRoller' jroller << 'http://www.jroller.com'.toURL().text println 'Done downloading from JRoller' }task { println 'Started downloading from TheServerSide' theserverside << 'http://www.theserverside.com'.toURL().text println 'Done downloading from TheServerSide' }task { withPool { println "Number of Groovy sites today: " + ([dzone, jroller, theserverside].findAllParallel { it.val.toUpperCase().contains 'GROOVY' }).size() } }.join()
Grouping tasks
Dataflow tasks can be organized into groups to allow for performance fine-tuning. Groups provide a handy task() factory method to create tasks attached to the groups. Using groups allows you to organize tasks or operators around different thread pools (wrapped inside the group). While the Dataflow.task() command schedules the task on a default thread pool (java.util.concurrent.Executor, fixed size=#cpu+1, daemon threads), you may prefer being able to define your own thread pool(s) to run your tasks.import groovyx.gpars.group.DefaultPGroupdef group = new DefaultPGroup()group.with { task { … } task { … } }
The default thread pool for dataflow tasks contains daemon threads, which means your application will exit as soon as the main thread finishes and won't wait for all tasks to complete. When grouping tasks, make sure that your custom thread pools either use daemon threads, too, which can be achieved by using DefaultPGroup or by providing your own thread factory to a thread pool constructor, or in case your thread pools use non-daemon threads, such as when using the NonDaemonPGroup group class, make sure you shutdown the group or the thread pool explicitly by calling its shutdown() method, otherwise your applications will not exit.You may selectively override the default group used for tasks, operators, callbacks and other dataflow elements inside a code block using the _Dataflow.usingGroup() method:
Dataflow.usingGroup(group) { task { 'http://gpars.codehaus.org'.toURL().text //should throw MalformedURLException } .then {page -> page.toUpperCase()} .then {page -> page.contains('GROOVY')} .then({mentionsGroovy -> println "Groovy found: $mentionsGroovy"}, {error -> println "Error: $error"}).join() }
Dataflow.usingGroup(group) { anotherGroup.task { 'http://gpars.codehaus.org'.toURL().text //should throw MalformedURLException } .then(anotherGroup) {page -> page.toUpperCase()} .then(anotherGroup) {page -> page.contains('GROOVY')}.then(anotherGroup) {println Dataflow.retrieveCurrentDFPGroup();it} .then(anotherGroup, {mentionsGroovy -> println "Groovy found: $mentionsGroovy"}, {error -> println "Error: $error"}).join() }
A mashup variant with methods
To avoid giving you wrong impression about structuring the Dataflow code, here's a rewrite of the mashup example, with a downloadPage() method performing the actual download in a separate task and returning a DataflowVariable instance, so that the main application thread could eventually get hold of the downloaded content. Dataflow variables can obviously be passed around as parameters or return values.package groovyx.gpars.samples.dataflowimport static groovyx.gpars.GParsExecutorsPool.withPool import groovyx.gpars.dataflow.DataflowVariable import static groovyx.gpars.dataflow.Dataflow.task /** * A simple mashup sample, downloads content of three websites and checks how many of them refer to Groovy. */ final List urls = ['http://www.dzone.com', 'http://www.jroller.com', 'http://www.theserverside.com']task { def pages = urls.collect { downloadPage(it) } withPool { println "Number of Groovy sites today: " + (pages.findAllParallel { it.val.toUpperCase().contains 'GROOVY' }).size() } }.join()def downloadPage(def url) { def page = new DataflowVariable() task { println "Started downloading from $url" page << url.toURL().text println "Done downloading from $url" } return page }
A physical calculation example
Dataflow programs naturally scale with the number of processors. Up to a certain level, the more processors you have the faster the program runs. Check out, for example, the following script, which calculates parameters of a simple physical experiment and prints out the results. Each task performs its part of the calculation and may depend on values calculated by some other tasks as well as its result might be needed by some of the other tasks. With Dataflow Concurrency you can split the work between tasks or reorder the tasks themselves as you like and the dataflow mechanics will ensure the calculation will be accomplished correctly.import groovyx.gpars.dataflow.DataflowVariable import static groovyx.gpars.dataflow.Dataflow.taskfinal def mass = new DataflowVariable() final def radius = new DataflowVariable() final def volume = new DataflowVariable() final def density = new DataflowVariable() final def acceleration = new DataflowVariable() final def time = new DataflowVariable() final def velocity = new DataflowVariable() final def decelerationForce = new DataflowVariable() final def deceleration = new DataflowVariable() final def distance = new DataflowVariable()def t = task { println """ Calculating distance required to stop a moving ball. ==================================================== The ball has a radius of ${radius.val} meters and is made of a material with ${density.val} kg/m3 density, which means that the ball has a volume of ${volume.val} m3 and a mass of ${mass.val} kg. The ball has been accelerating with ${acceleration.val} m/s2 from 0 for ${time.val} seconds and so reached a velocity of ${velocity.val} m/s.Given our ability to push the ball backwards with a force of ${decelerationForce.val} N (Newton), we can cause a deceleration of ${deceleration.val} m/s2 and so stop the ball at a distance of ${distance.val} m.======================================================================================================================= This example has been calculated asynchronously in multiple tasks using GPars Dataflow concurrency in Groovy. Author: ${author.val} """ System.exit 0 }task { mass << volume.val * density.val }task { volume << Math.PI * (radius.val ** 3) }task { radius << 2.5 density << 998.2071 //water acceleration << 9.80665 //free fall decelerationForce << 900 }task { println 'Enter your name:' def name = new InputStreamReader(System.in).readLine() author << (name?.trim()?.size()>0 ? name : 'anonymous') }task { time << 10 velocity << acceleration.val * time.val }task { deceleration << decelerationForce.val / mass.val }task { distance << deceleration.val * ((velocity.val/deceleration.val) ** 2) * 0.5 }t.join()
Deterministic deadlocks
If you happen to introduce a deadlock in your dependencies, the deadlock will occur each time you run the code. No randomness allowed. That's one of the benefits of Dataflow concurrency. Irrespective of the actual thread scheduling scheme, if you don't get a deadlock in tests, you won't get them in production.task { println a.val b << 'Hi there' }task { println b.val a << 'Hello man' }
Dataflows map
As a handy shortcut the Dataflows class can help you reduce the amount of code you have to write to leverage Dataflow variables.def df = new Dataflows()
df.x = 'value1'
assert df.x == 'value1'Dataflow.task {df.y = 'value2}assert df.y == 'value2'
Mixing Dataflows and Groovy with blocks
When inside a with block of a Dataflows instance, the dataflow variables stored inside the Dataflows instance can be accessed directly without the need to prefix them with the Dataflows instance identifier.new Dataflows().with {
x = 'value1'
assert x == 'value1' Dataflow.task {y = 'value2} assert y == 'value2'
}
Returning a value from a task
Typically dataflow tasks communicate through dataflow variables. On top of that, tasks can also return values, again through a dataflow variable. When you invoke the task() factory method, you get back an instance of Promise (implemented as DataflowVariable), through which you can listen for the task's return value, just like when using any other Promise or DataflowVariable.final Promise t1 = task { return 10 } final Promise t2 = task { return 20 } def results = [t1, t2]*.val println 'Both sub-tasks finished and returned values: ' + results
def task = task { println 'The task is running and calculating the return value' 30 } task >> {value -> println "The task finished and returned $value"}
task { final Promise t1 = task { println 'First sub-task running.' } final Promise t2 = task { println 'Second sub-task running' } [t1, t2]*.join() println 'Both sub-tasks finished' }.join()