(Quick Reference)

7.2 Selects - Reference Documentation

Authors: The Whole GPars Gang

Version: 1.2.1

7.2 Selects

Frequently a value needs to be obtained from one of several dataflow channels (variables, queues, broadcasts or streams). The Select class is suitable for such scenarios. Select can scan multiple dataflow channels and pick one channel from all the input channels, which currently have a value available for read. The value from that channels is read and returned to the caller together with the index of the originating channel. Picking the channel is either random, or based on channel priority, in which case channels with lower position index in the Select constructor have higher priority.

Selecting a value from multiple channels

import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowVariable
import static groovyx.gpars.dataflow.Dataflow.select
import static groovyx.gpars.dataflow.Dataflow.task

/** * Shows a basic use of Select, which monitors a set of input channels for values and makes these values * available on its output irrespective of their original input channel. * Note that dataflow variables and queues can be combined for Select. * * You might also consider checking out the prioritySelect method, which prioritizes values by the index of their input channel */ def a = new DataflowVariable() def b = new DataflowVariable() def c = new DataflowQueue()

task { sleep 3000 a << 10 }

task { sleep 1000 b << 20 }

task { sleep 5000 c << 30 }

def select = select([a, b, c]) println "The fastest result is ${select().value}"

Note that the return type from select() is SelectResult , holding the value as well as the originating channel index.

There are multiple ways to read values from a Select:

def sel = select(a, b, c, d)
def result = sel.select()                                       //Random selection
def result = sel()                                              //Random selection (a short-hand variant)
def result = sel.select([true, true, false, true])              //Random selection with guards specified
def result = sel([true, true, false, true])                     //Random selection with guards specified (a short-hand variant)
def result = sel.prioritySelect()                               //Priority selection
def result = sel.prioritySelect([true, true, false, true])      //Priority selection with guards specifies

By default the Select blocks the caller until a value to read is available. The alternative selectToPromise() and prioritySelectToPromise() methods give you a way to obtain a promise for the value that will be selected some time in the future. Through the returned Promise you may register a callback to get invoked asynchronously whenever the next value is selected.

def sel = select(a, b, c, d)
Promise result = sel.selectToPromise()                                       //Random selection
Promise result = sel.selectToPromise([true, true, false, true])              //Random selection with guards specified
Promise result = sel.prioritySelectToPromise()                               //Priority selection
Promise result = sel.prioritySelectToPromise([true, true, false, true])      //Priority selection with guards specifies

Alternatively, Select allows to have the value sent to a provided MessageStream (e.g. an actor) without blocking the caller.

def handler = actor {...}
def sel = select(a, b, c, d)

sel.select(handler) //Random selection sel(handler) //Random selection (a short-hand variant) sel.select(handler, [true, true, false, true]) //Random selection with guards specified sel(handler, [true, true, false, true]) //Random selection with guards specified (a short-hand variant) sel.prioritySelect(handler) //Priority selection sel.prioritySelect(handler, [true, true, false, true]) //Priority selection with guards specifies

Guards

Guards allow the caller to omit some input channels from the selection. Guards are specified as a List of boolean flags passed to the select() or prioritySelect() methods.

def sel = select(leaders, seniors, experts, juniors)
def teamLead = sel([true, true, false, false]).value        //Only 'leaders' and 'seniors' qualify for becoming a teamLead here

A typical use for guards is to make Selects flexible to adopt to the changes in the user state.

import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.select
import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates the ability to enable/disable channels during a value selection on a select by providing boolean guards. */ final DataflowQueue operations = new DataflowQueue() final DataflowQueue numbers = new DataflowQueue()

def t = task { final def select = select(operations, numbers) 3.times { def instruction = select([true, false]).value def num1 = select([false, true]).value def num2 = select([false, true]).value final def formula = "$num1 $instruction $num2" println "$formula = ${new GroovyShell().evaluate(formula)}" } }

task { operations << '+' operations << '+' operations << '*' }

task { numbers << 10 numbers << 20 numbers << 30 numbers << 40 numbers << 50 numbers << 60 }

t.join()

Priority Select

When certain channels should have precedence over others when selecting, the prioritySelect methods should be used instead.

/**
 * Shows a basic use of Priority Select, which monitors a set of input channels for values and makes these values
 * available on its output irrespective of their original input channel.
 * Note that dataflow variables, queues and broadcasts can be combined for Select.
 * Unlike plain select method call, the prioritySelect call gives precedence to input channels with lower index.
 * Available messages from high priority channels will be served before messages from lower-priority channels.
 * Messages received through a single input channel will have their mutual order preserved.
 *
 */
def critical = new DataflowVariable()
def ordinary = new DataflowQueue()
def whoCares = new DataflowQueue()

task { ordinary << 'All working fine' whoCares << 'I feel a bit tired' ordinary << 'We are on target' }

task { ordinary << 'I have just started my work. Busy. Will come back later...' sleep 5000 ordinary << 'I am done for now' }

task { whoCares << 'Huh, what is that noise' ordinary << 'Here I am to do some clean-up work' whoCares << 'I wonder whether unplugging this cable will eliminate that nasty sound.' critical << 'The server room goes on UPS!' whoCares << 'The sound has disappeared' }

def select = select([critical, ordinary, whoCares]) println 'Starting to monitor our IT department' sleep 3000 10.times {println "Received: ${select.prioritySelect().value}"}

Collecting results of asynchronous computations

Asynchronous activities, no matter whether they are dataflow tasks , active objects' methods or asynchronous functions , return Promises . Promises implement the SelectableChannel interface and so can be passed in selects for selection together with other Promises as well as read channels . Similarly to Java's CompletionService , GPars Select enables you to obtain results of asynchronous activities as soon as each of them becomes available. Also, you may employ Select to give you the first/fastest result of several computations running in parallel.

import groovyx.gpars.dataflow.Promise
import groovyx.gpars.dataflow.Select
import groovyx.gpars.group.DefaultPGroup
/**
 * Demonstrates the use of dataflow tasks and selects to pick the fastest result of concurrently run calculations.
 */

final group = new DefaultPGroup() group.with { Promise p1 = task { sleep(1000) 10 * 10 + 1 } Promise p2 = task { sleep(1000) 5 * 20 + 2 } Promise p3 = task { sleep(1000) 1 * 100 + 3 }

final alt = new Select(group, p1, p2, p3) def result = alt.select() println "Result: " + result }

Timeouts

The Select.createTimeout() method will create a DataflowVariable that gets bound to a value after a given time period. This can be leveraged in Selects so that they unblock after a desired delay, if none of the other channels delivers a value before that moment. Just pass the timeout channel as another input channel to the Select .

import groovyx.gpars.dataflow.Promise
import groovyx.gpars.dataflow.Select
import groovyx.gpars.group.DefaultPGroup
/**
 * Demonstrates the use of dataflow tasks and selects to pick the fastest result of concurrently run calculations.
 */

final group = new DefaultPGroup() group.with { Promise p1 = task { sleep(1000) 10 * 10 + 1 } Promise p2 = task { sleep(1000) 5 * 20 + 2 } Promise p3 = task { sleep(1000) 1 * 100 + 3 }

final timeoutChannel = Select.createTimeout(500)

final alt = new Select(group, p1, p2, p3, timeoutChannel) def result = alt.select() println "Result: " + result }

Cancellation

In case you need to cancel the other tasks once a value has been calculated or a timeout expired, the best way is to set a flag that the tasks periodically monitor. There's intentionally no cancellation machinery built into DataflowVariables or Tasks .

import groovyx.gpars.dataflow.Promise
import groovyx.gpars.dataflow.Select
import groovyx.gpars.group.DefaultPGroup

import java.util.concurrent.atomic.AtomicBoolean

/** * Demonstrates the use of dataflow tasks and selects to pick the fastest result of concurrently run calculations. * It shows a waz to cancel the slower tasks once a result is known */

final group = new DefaultPGroup() final done = new AtomicBoolean()

group.with { Promise p1 = task { sleep(1000) if (done.get()) return 10 * 10 + 1 } Promise p2 = task { sleep(1000) if (done.get()) return 5 * 20 + 2 } Promise p3 = task { sleep(1000) if (done.get()) return 1 * 100 + 3 }

final alt = new Select(group, p1, p2, p3, Select.createTimeout(500)) def result = alt.select() done.set(true) println "Result: " + result }