3.5 Composable Asynchronous Functions - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.0-SNAPSHOT
3.5 Composable Asynchronous Functions
Functions are to be composed. In fact, composing side-effect-free functions is very easy. Much easier and reliable than composing objects, for example. Given the same input, functions always return the same result, they never change their behavior unexpectedly nor they break when multiple threads call them at the same time.Functions in Groovy
We can treat Groovy closures as functions. They take arguments, do their calculation and return a value. Provided you don't let your closures touch anything outside their scope, your closures are well-behaved pure functions. Functions that you can combine for a better good.def sum = (0..100000).inject(0, {a, b -> a + b})
def max = myNumbers.inject(0, {a, b -> a>b?a:b})
Are we concurrent yet?
This all works just fine until you realize you're not utilizing the full power of your expensive hardware. The functions are plain sequential. No parallelism in here. All but one processor core do nothing, they're idle, totally wasted.Those paying attention would suggest to use the Parallel Collection techniques described earlier and they would certainly be correct. For our scenario described here, where we process a collection, using those parallel methods would be the best choice. However, we're now looking for a generic way to create and combine asynchronous functions , which would help us not only for collection processing but mostly in other more generic cases, like the one right below.To make things more obvious, here's an example of combining four functions, which are supposed to check whether a particular web page matches the contents of a local file. We need to download the page, load the file, calculate hashes of both and finally compare the resulting numbers.
Closure download = {String url -> url.toURL().text }Closure loadFile = {String fileName -> … //load the file here }Closure hash = {s -> s.hashCode()}.asyncFun()Closure compare = {int first, int second -> first == second }def result = compare(hash(download('http://www.gpars.org')), hash(loadFile('/coolStuff/gpars/website/index.html'))) println "The result of comparison: " + result
Making it all asynchronous
The downside of our code is that we don't leverage the independence of the download() and the loadFile() functions. Neither we allow the two hashes to be run concurrently. They could well run in parallel, but our way to combine functions restricts any parallelism.Obviously not all of the functions can run concurrently. Some functions depend on results of others. They cannot start before the other function finishes. We need to block them till their parameters are available. The hash() functions needs a string to work on. The compare() function needs two numbers to compare.So we can only parallelize some functions, while blocking parallelism of others. Seems like a challenging task.Things are bright in the functional world
Luckily, the dependencies between functions are already expressed implicitly in the code. There's no need for us to duplicate the dependency information. If one functions takes parameters and the parameters need first to be calculated by another function, we implicitly have a dependency here. The hash() function depends on the loadFile() as well as on the download() functions in our example. The inject function in our earlier example depends on the results of the addition functions invoked gradually on all the elements of the collection.However difficult it may seem at first, our task is in fact very simple. We only need to teach our functions to return promises of their future results. And we need to teach the other functions to accept those promises as parameters so that they wait for the real values before they start their work. And if we convince the functions to release the threads they hold while waiting for the values, we get directly to where the magic can happen.In the good tradition of GPars we've made it very straightforward for you to convince any function to believe in other functions' promises. Call the asyncFun() function on a closure and you're asynchronous.
withPool {
def maxPromise = numbers.inject(0, {a, b -> a>b?a:b}.asyncFun())
println "Look Ma, I can talk to the user while the math is being done for me!"
println maxPromise.get()
}
The promise is a good old DataflowVariable , so you may query its status, register notification hooks or make it an input to a Dataflow algorithm.
withPool {
def sumPromise = (0..100000).inject(0, {a, b -> a + b}.asyncFun())
println "Are we done yet? " + sumPromise.bound
sumPromise.whenBound {sum -> println sum}
}
The get() method has also a variant with a timeout parameter, if you want to avoid the risk of waiting indefinitely.
Can things go wrong?
Sure. But you'll get an exception thrown from the result promise get() method.try { sumPromise.get() } catch (MyCalculationException e) { println "Guess, things are not ideal today." }
This is all fine, but what functions can be really combined?
There are no limits. Take any sequential functions you need to combine and you should be able to combine their asynchronous variants as well.Back to our initial example comparing content of a file with a web page, we simply make all the functions asynchronous by calling the asyncFun() method on them and we are ready to set off.Closure download = {String url -> url.toURL().text }.asyncFun() Closure loadFile = {String fileName -> … //load the file here }.asyncFun() Closure hash = {s -> s.hashCode()}.asyncFun() Closure compare = {int first, int second -> first == second }.asyncFun() def result = compare(hash(download('http://www.gpars.org')), hash(loadFile('/coolStuff/gpars/website/index.html'))) println 'Allowed to do something else now' println "The result of comparison: " + result.get()
Calling asynchronous functions from within asynchronous functions
Another very valuable characteristics of asynchronous functions is that their result promises can also be composed.import static groovyx.gpars.GParsPool.withPool withPool { Closure plus = {Integer a, Integer b -> sleep 3000 println 'Adding numbers' a + b }.asyncFun() Closure multiply = {Integer a, Integer b -> sleep 2000 a * b }.asyncFun() Closure measureTime = {-> sleep 3000 4 }.asyncFun() Closure distance = {Integer initialDistance, Integer velocity, Integer time -> plus(initialDistance, multiply(velocity, time)) }.asyncFun() Closure chattyDistance = {Integer initialDistance, Integer velocity, Integer time -> println 'All parameters are now ready - starting' println 'About to call another asynchronous function' def innerResultPromise = plus(initialDistance, multiply(velocity, time)) println 'Returning the promise for the inner calculation as my own result' return innerResultPromise }.asyncFun() println "Distance = " + distance(100, 20, measureTime()).get() + ' m' println "ChattyDistance = " + chattyDistance(100, 20, measureTime()).get() + ' m' }
Methods as asynchronous functions
Methods can be referred to as closures using the .& operator. These closures can then be transformed using asyncFun into composable asynchronous functions just like ordinary closures.class DownloadHelper { String download(String url) { url.toURL().text } int scanFor(String word, String text) { text.findAll(word).size() } String lower(s) { s.toLowerCase() } } //now we'll make the methods asynchronous withPool { final DownloadHelper d = new DownloadHelper() Closure download = d.&download.asyncFun() Closure scanFor = d.&scanFor.asyncFun() Closure lower = d.&lower.asyncFun() //asynchronous processing def result = scanFor('groovy', lower(download('http://www.infoq.com'))) println 'Allowed to do something else now' println result.get() }
Using annotation to create asynchronous functions
Instead of calling the asyncFun() function, the @AsyncFun annotation can be used to annotate Closure-typed fields. The fields have to be initialized in-place and the containing class needs to be instantiated withing a withPool block.import static groovyx.gpars.GParsPool.withPool import groovyx.gpars.AsyncFunclass DownloadingSearch { @AsyncFun Closure download = {String url -> url.toURL().text } @AsyncFun Closure scanFor = {String word, String text -> text.findAll(word).size() } @AsyncFun Closure lower = {s -> s.toLowerCase()} void scan() { def result = scanFor('groovy', lower(download('http://www.infoq.com'))) //synchronous processing println 'Allowed to do something else now' println result.get() } }withPool { new DownloadingSearch().scan() }
Alternative pools
The AsyncFun annotation by default uses an instance of GParsPool from the wrapping withPool block. You may, however, specify the type of pool explicitly:@AsyncFun(GParsExecutorsPoolUtil) def sum6 = {a, b -> a + b }
Blocking functions through annotations
The AsyncFun also allows the user to specify, whether the resulting function should have blocking (true) or non-blocking (false - default) semantics.@AsyncFun(blocking = true)
def sum = {a, b -> a + b }