7.2 Selects - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.2.1
7.2 Selects
Frequently a value needs to be obtained from one of several dataflow channels (variables, queues, broadcasts or streams). The Select class is suitable for such scenarios. Select can scan multiple dataflow channels and pick one channel from all the input channels, which currently have a value available for read. The value from that channels is read and returned to the caller together with the index of the originating channel. Picking the channel is either random, or based on channel priority, in which case channels with lower position index in the Select constructor have higher priority.Selecting a value from multiple channels
import groovyx.gpars.dataflow.DataflowQueue import groovyx.gpars.dataflow.DataflowVariable import static groovyx.gpars.dataflow.Dataflow.select import static groovyx.gpars.dataflow.Dataflow.task/** * Shows a basic use of Select, which monitors a set of input channels for values and makes these values * available on its output irrespective of their original input channel. * Note that dataflow variables and queues can be combined for Select. * * You might also consider checking out the prioritySelect method, which prioritizes values by the index of their input channel */ def a = new DataflowVariable() def b = new DataflowVariable() def c = new DataflowQueue()task { sleep 3000 a << 10 }task { sleep 1000 b << 20 }task { sleep 5000 c << 30 }def select = select([a, b, c]) println "The fastest result is ${select().value}"
Note that the return type from select() is SelectResult , holding the value as well as the originating channel index.There are multiple ways to read values from a Select:
def sel = select(a, b, c, d) def result = sel.select() //Random selection def result = sel() //Random selection (a short-hand variant) def result = sel.select([true, true, false, true]) //Random selection with guards specified def result = sel([true, true, false, true]) //Random selection with guards specified (a short-hand variant) def result = sel.prioritySelect() //Priority selection def result = sel.prioritySelect([true, true, false, true]) //Priority selection with guards specifies
def sel = select(a, b, c, d) Promise result = sel.selectToPromise() //Random selection Promise result = sel.selectToPromise([true, true, false, true]) //Random selection with guards specified Promise result = sel.prioritySelectToPromise() //Priority selection Promise result = sel.prioritySelectToPromise([true, true, false, true]) //Priority selection with guards specifies
def handler = actor {...} def sel = select(a, b, c, d)sel.select(handler) //Random selection sel(handler) //Random selection (a short-hand variant) sel.select(handler, [true, true, false, true]) //Random selection with guards specified sel(handler, [true, true, false, true]) //Random selection with guards specified (a short-hand variant) sel.prioritySelect(handler) //Priority selection sel.prioritySelect(handler, [true, true, false, true]) //Priority selection with guards specifies
Guards
Guards allow the caller to omit some input channels from the selection. Guards are specified as a List of boolean flags passed to the select() or prioritySelect() methods.def sel = select(leaders, seniors, experts, juniors) def teamLead = sel([true, true, false, false]).value //Only 'leaders' and 'seniors' qualify for becoming a teamLead here
import groovyx.gpars.dataflow.DataflowQueue import static groovyx.gpars.dataflow.Dataflow.select import static groovyx.gpars.dataflow.Dataflow.task/** * Demonstrates the ability to enable/disable channels during a value selection on a select by providing boolean guards. */ final DataflowQueue operations = new DataflowQueue() final DataflowQueue numbers = new DataflowQueue()def t = task { final def select = select(operations, numbers) 3.times { def instruction = select([true, false]).value def num1 = select([false, true]).value def num2 = select([false, true]).value final def formula = "$num1 $instruction $num2" println "$formula = ${new GroovyShell().evaluate(formula)}" } }task { operations << '+' operations << '+' operations << '*' }task { numbers << 10 numbers << 20 numbers << 30 numbers << 40 numbers << 50 numbers << 60 }t.join()
Priority Select
When certain channels should have precedence over others when selecting, the prioritySelect methods should be used instead./** * Shows a basic use of Priority Select, which monitors a set of input channels for values and makes these values * available on its output irrespective of their original input channel. * Note that dataflow variables, queues and broadcasts can be combined for Select. * Unlike plain select method call, the prioritySelect call gives precedence to input channels with lower index. * Available messages from high priority channels will be served before messages from lower-priority channels. * Messages received through a single input channel will have their mutual order preserved. * */ def critical = new DataflowVariable() def ordinary = new DataflowQueue() def whoCares = new DataflowQueue()task { ordinary << 'All working fine' whoCares << 'I feel a bit tired' ordinary << 'We are on target' }task { ordinary << 'I have just started my work. Busy. Will come back later...' sleep 5000 ordinary << 'I am done for now' }task { whoCares << 'Huh, what is that noise' ordinary << 'Here I am to do some clean-up work' whoCares << 'I wonder whether unplugging this cable will eliminate that nasty sound.' critical << 'The server room goes on UPS!' whoCares << 'The sound has disappeared' }def select = select([critical, ordinary, whoCares]) println 'Starting to monitor our IT department' sleep 3000 10.times {println "Received: ${select.prioritySelect().value}"}
Collecting results of asynchronous computations
Asynchronous activities, no matter whether they are dataflow tasks , active objects' methods or asynchronous functions , return Promises . Promises implement the SelectableChannel interface and so can be passed in selects for selection together with other Promises as well as read channels . Similarly to Java's CompletionService , GPars Select enables you to obtain results of asynchronous activities as soon as each of them becomes available. Also, you may employ Select to give you the first/fastest result of several computations running in parallel.import groovyx.gpars.dataflow.Promise import groovyx.gpars.dataflow.Select import groovyx.gpars.group.DefaultPGroup /** * Demonstrates the use of dataflow tasks and selects to pick the fastest result of concurrently run calculations. */final group = new DefaultPGroup() group.with { Promise p1 = task { sleep(1000) 10 * 10 + 1 } Promise p2 = task { sleep(1000) 5 * 20 + 2 } Promise p3 = task { sleep(1000) 1 * 100 + 3 } final alt = new Select(group, p1, p2, p3) def result = alt.select() println "Result: " + result }
Timeouts
The Select.createTimeout() method will create a DataflowVariable that gets bound to a value after a given time period. This can be leveraged in Selects so that they unblock after a desired delay, if none of the other channels delivers a value before that moment. Just pass the timeout channel as another input channel to the Select .import groovyx.gpars.dataflow.Promise import groovyx.gpars.dataflow.Select import groovyx.gpars.group.DefaultPGroup /** * Demonstrates the use of dataflow tasks and selects to pick the fastest result of concurrently run calculations. */final group = new DefaultPGroup() group.with { Promise p1 = task { sleep(1000) 10 * 10 + 1 } Promise p2 = task { sleep(1000) 5 * 20 + 2 } Promise p3 = task { sleep(1000) 1 * 100 + 3 } final timeoutChannel = Select.createTimeout(500) final alt = new Select(group, p1, p2, p3, timeoutChannel) def result = alt.select() println "Result: " + result }
Cancellation
In case you need to cancel the other tasks once a value has been calculated or a timeout expired, the best way is to set a flag that the tasks periodically monitor. There's intentionally no cancellation machinery built into DataflowVariables or Tasks .import groovyx.gpars.dataflow.Promise import groovyx.gpars.dataflow.Select import groovyx.gpars.group.DefaultPGroupimport java.util.concurrent.atomic.AtomicBoolean/** * Demonstrates the use of dataflow tasks and selects to pick the fastest result of concurrently run calculations. * It shows a waz to cancel the slower tasks once a result is known */final group = new DefaultPGroup() final done = new AtomicBoolean()group.with { Promise p1 = task { sleep(1000) if (done.get()) return 10 * 10 + 1 } Promise p2 = task { sleep(1000) if (done.get()) return 5 * 20 + 2 } Promise p3 = task { sleep(1000) if (done.get()) return 1 * 100 + 3 } final alt = new Select(group, p1, p2, p3, Select.createTimeout(500)) def result = alt.select() done.set(true) println "Result: " + result }