7.3 Operators - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.2.1
7.3 Operators
Dataflow Operators and Selectors provide a full Dataflow implementation with all the usual ceremony.Concepts
Full dataflow concurrency builds on the concept of channels connecting operators and selectors, which consume values coming through input channels, transform them into new values and output the new values into their output channels. While Operators wait for all input channels to have a value available for read before they start process them, Selectors are triggered by a value available on any of the input channels.operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
…
bindOutput 0, x + y + z
}
/** * CACHE * * Caches sites' contents. Accepts requests for url content, outputs the content. Outputs requests for download * if the site is not in cache yet. */ operator(inputs: [urlRequests], outputs: [downloadRequests, sites]) {request -> if (!request.content) { println "[Cache] Retrieving ${request.site}" def content = cache[request.site] if (content) { println "[Cache] Found in cache" bindOutput 1, [site: request.site, word:request.word, content: content] } else { def downloads = pendingDownloads[request.site] if (downloads != null) { println "[Cache] Awaiting download" downloads << request } else { pendingDownloads[request.site] = [] println "[Cache] Asking for download" bindOutput 0, request } } } else { println "[Cache] Caching ${request.site}" cache[request.site] = request.content bindOutput 1, request def downloads = pendingDownloads[request.site] if (downloads != null) { for (downloadRequest in downloads) { println "[Cache] Waking up" bindOutput 1, [site: downloadRequest.site, word:downloadRequest.word, content: request.content] } pendingDownloads.remove(request.site) } } }
The standard error handling will print out an error message to the standard error output and terminate the operator in case an uncaught exception is thrown from withing the operator's body. To alter the behavior, you can register your own event listener:def listener = new DataflowEventAdapter() { @Override boolean onException(final DataflowProcessor processor, final Throwable e) { logChannel << e return false //Indicate whether to terminate the operator or not } }op = group.operator(inputs: [a, b], outputs: [c], listeners: [listener]) {x, y -> … } See the Operator lifecycle section for more details.
Types of operators
There are specialized versions of operators serving specific purposes:- operator - the basic general-purpose operator
- selector - operator that is triggered by a value being available in any of its input channels
- prioritySelector - a selector that prefers delivering messages from lower-indexed input channels over higher-indexed ones
- splitter - a single-input operator copying its input values to all of its output channels
Wiring operators together
Operators are typically combined into networks, when some operators consume output by other operators.operator(inputs:[a, b], outputs:[c, d]) {...}
splitter(c, [e, f])
selector(inputs:[e, d]: outputs:[]) {...}
def op1 = operator(inputs:[a, b], outputs:[c, d]) {...}
def sp1 = splitter(op1.outputs[0], [e, f]) //takes the first output of op1
selector(inputs:[sp1.outputs[0], op1.outputs[1]]: outputs:[]) {...} //takes the first output of sp1 and the second output of op1
Grouping operators
Dataflow operators can be organized into groups to allow for performance fine-tuning. Groups provide a handy operator() factory method to create tasks attached to the groups.import groovyx.gpars.group.DefaultPGroupdef group = new DefaultPGroup()group.with { operator(inputs: [a, b, c], outputs: [d]) {x, y, z -> … bindOutput 0, x + y + z } }
The default thread pool for dataflow operators contains daemon threads, which means your application will exit as soon as the main thread finishes and won't wait for all tasks to complete. When grouping operators, make sure that your custom thread pools either use daemon threads, too, which can be achieved by using DefaultPGroup or by providing your own thread factory to a thread pool constructor, or in case your thread pools use non-daemon threads, such as when using the NonDaemonPGroup group class, make sure you shutdown the group or the thread pool explicitly by calling its shutdown() method, otherwise your applications will not exit.You may selectively override the default group used for tasks, operators, callbacks and other dataflow elements inside a code block using the _Dataflow.usingGroup() method:
Dataflow.usingGroup(group) {
operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
…
bindOutput 0, x + y + z
}
}
Dataflow.usingGroup(group) {
anotherGroup.operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
…
bindOutput 0, x + y + z
}
}
Constructing operators
The construction properties of an operator, such as inputs , outputs , stateObject or maxForks cannot be modified once the operator has been build. You may find the groovyx.gpars.dataflow.ProcessingNode class helpful when gradually collecting channels and values into lists before you finally build an operator.import groovyx.gpars.dataflow.Dataflow import groovyx.gpars.dataflow.DataflowQueue import static groovyx.gpars.dataflow.ProcessingNode.node/** * Shows how to build operators using the ProcessingNode class */final DataflowQueue aValues = new DataflowQueue() final DataflowQueue bValues = new DataflowQueue() final DataflowQueue results = new DataflowQueue()//Create a config and gradually set the required properties - channels, code, etc. def adderConfig = node {valueA, valueB -> bindOutput valueA + valueB } adderConfig.inputs << aValues adderConfig.inputs << bValues adderConfig.outputs << results//Build the operator final adder = adderConfig.operator(Dataflow.DATA_FLOW_GROUP)//Now the operator is running and processing the data aValues << 10 aValues << 20 bValues << 1 bValues << 2assert [11, 22] == (1..2).collect { results.val }
State in operators
Although operators can frequently do without keeping state between subsequent invocations, GPars allows operators to maintain state, if desired by the developer. One obvious way is to leverage the Groovy closure capabilities to close-over their context:int counter = 0 operator(inputs: [a], outputs: [b]) {value -> counter += 1 }
operator(inputs: [a], outputs: [b], stateObject: [counter: 0]) {value ->
stateObject.counter += 1
}
Parallelize operators
By default an operator's body is processed by a single thread at a time. While this is a safe setting allowing the operator's body to be written in a non-thread-safe manner, once an operator becomes "hot" and data start to accumulate in the operator's input queues, you might consider allowing multiple threads to run the operator's body concurrently. Bear in mind that in such a case you need to avoid or protect shared resources from multi-threaded access. To enable multiple threads to run the operator's body concurrently, pass an extra maxForks parameter when creating an operator:def op = operator(inputs: [a, b, c], outputs: [d, e], maxForks: 2) {x, y, z ->
bindOutput 0, x + y + z
bindOutput 1, x * y * z
}
Please always make sure the group serving the operator holds enough threads to support all requested forks. Using groups allows you to organize tasks or operators around different thread pools (wrapped inside the group). While the Dataflow.task() command schedules the task on a default thread pool (java.util.concurrent.Executor, fixed size=#cpu+1, daemon threads), you may prefer being able to define your own thread pool(s) to run your tasks.The default group uses a resizeable thread pool as so will never run out of threads.def group = new DefaultPGroup(10) group.operator((inputs: [a, b, c], outputs: [d, e], maxForks: 5) {x, y, z -> ...}
Synchronizing the output
When enabling internal parallelization of an operator by setting the value for maxForks to a value greater than 1 it is important to remember that without explicit or implicit synchronization in the operators' body race-conditions may occur. Especially bear in mind that values written to multiple output channels are not guarantied to be written atomically in the same order to all the channelsoperator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
bindOutput 0, msg
bindOutput 1, msg
}
inputChannel << 1
inputChannel << 2
inputChannel << 3
inputChannel << 4
inputChannel << 5
a -> 1, 3, 2, 4, 5 b -> 2, 1, 3, 5, 4
def lock = new Object() operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg -> doStuffThatIsThreadSafe() synchronized(lock) { doSomethingThatMustNotBeAccessedByMultipleThreadsAtTheSameTime() bindOutput 0, msg bindOutput 1, 2*msg } }
operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
doStuffThatIsThreadSafe()
bindAllOutputValuesAtomically msg, 2*msg
}
}
Using the bindAllOutputs or the bindAllOutputValues methods will not guarantee atomicity of writes across al the output channels when using internal parallelism. If preserving the order of messages in multiple output channels is not an issue, bindAllOutputs as well as bindAllOutputValues will provide better performance over the atomic variants.
Operator lifecycle
Dataflow operators and selectors fire several events during their lifecycle, which allows the interested parties to obtain notifications and potential alter operator's behavior. The DataflowEventListener interface offers a couple of callback methods:public interface DataflowEventListener { /** * Invoked immediately after the operator starts by a pooled thread before the first message is obtained * * @param processor The reporting dataflow operator/selector */ void afterStart(DataflowProcessor processor); /** * Invoked immediately after the operator terminates * * @param processor The reporting dataflow operator/selector */ void afterStop(DataflowProcessor processor); /** * Invoked if an exception occurs. * If any of the listeners returns true, the operator will terminate. * Exceptions outside of the operator's body or listeners' messageSentOut() handlers will terminate the operator irrespective of the listeners' votes. * * @param processor The reporting dataflow operator/selector * @param e The thrown exception * @return True, if the operator should terminate in response to the exception, false otherwise. */ boolean onException(DataflowProcessor processor, Throwable e); /** * Invoked when a message becomes available in an input channel. * * @param processor The reporting dataflow operator/selector * @param channel The input channel holding the message * @param index The index of the input channel within the operator * @param message The incoming message * @return The original message or a message that should be used instead */ Object messageArrived(DataflowProcessor processor, DataflowReadChannel<Object> channel, int index, Object message); /** * Invoked when a control message (instances of ControlMessage) becomes available in an input channel. * * @param processor The reporting dataflow operator/selector * @param channel The input channel holding the message * @param index The index of the input channel within the operator * @param message The incoming message * @return The original message or a message that should be used instead */ Object controlMessageArrived(DataflowProcessor processor, DataflowReadChannel<Object> channel, int index, Object message); /** * Invoked when a message is being bound to an output channel. * * @param processor The reporting dataflow operator/selector * @param channel The output channel to send the message to * @param index The index of the output channel within the operator * @param message The message to send * @return The original message or a message that should be used instead */ Object messageSentOut(DataflowProcessor processor, DataflowWriteChannel<Object> channel, int index, Object message); /** * Invoked when all messages required to trigger the operator become available in the input channels. * * @param processor The reporting dataflow operator/selector * @param messages The incoming messages * @return The original list of messages or a modified/new list of messages that should be used instead */ List<Object> beforeRun(DataflowProcessor processor, List<Object> messages); /** * Invoked when the operator completes a single run * * @param processor The reporting dataflow operator/selector * @param messages The incoming messages that have been processed */ void afterRun(DataflowProcessor processor, List<Object> messages); /** * Invoked when the fireCustomEvent() method is triggered manually on a dataflow operator/selector * * @param processor The reporting dataflow operator/selector * @param data The custom piece of data provided as part of the event * @return A value to return from the fireCustomEvent() method to the caller (event initiator) */ Object customEvent(DataflowProcessor processor, Object data); }
final listener = new DataflowEventAdapter() { @Override Object customEvent(DataflowProcessor processor, Object data) { println "Log: Getting quite high on the scale $data" return 100 //The value to use instead } }op = group.operator(inputs: [a, b], outputs: [c], listeners: [listener]) {x, y -> final sum = x + y if (sum > 100) bindOutput(fireCustomEvent(sum)) //Reporting that the sum is too high, binding the lowered value that comes back else bindOutput sum }
Selectors
Selector's body should be a closure consuming either one or two arguments.selector (inputs : [a, b, c], outputs : [d, e]) {value -> .... }
selector (inputs : [a, b, c], outputs : [d, e]) {value, index -> .... }
Priority Selector
When priorities need to be preserved among input channels, a DataflowPrioritySelector should be used.prioritySelector(inputs : [a, b, c], outputs : [d, e]) {value, index -> … }
Join selector
A selector without a body closure specified will copy all incoming values to all of its output channels.def join = selector (inputs : [programmers, analysis, managers], outputs : [employees, colleagues])
Internal parallelism
The maxForks attribute allowing for internal selectors parallelism is also available.selector (inputs : [a, b, c], outputs : [d, e], maxForks : 5) {value -> .... }
Guards
Just like Selects , Selectors also allow the users to temporarily include/exclude individual input channels from selection. The guards input property can be used to set the initial mask on all input channels and the setGuards and setGuard methods are then available in the selector's body.import groovyx.gpars.dataflow.DataflowQueue import static groovyx.gpars.dataflow.Dataflow.selector import static groovyx.gpars.dataflow.Dataflow.task/** * Demonstrates the ability to enable/disable channels during a value selection on a select by providing boolean guards. */ final DataflowQueue operations = new DataflowQueue() final DataflowQueue numbers = new DataflowQueue()def instruction def nums = []selector(inputs: [operations, numbers], outputs: [], guards: [true, false]) {value, index -> //initial guards is set here if (index == 0) { instruction = value setGuard(0, false) //setGuard() used here setGuard(1, true) } else nums << value if (nums.size() == 2) { setGuards([true, false]) //setGuards() used here final def formula = "${nums[0]} $instruction ${nums[1]}" println "$formula = ${new GroovyShell().evaluate(formula)}" nums.clear() } }task { operations << '+' operations << '+' operations << '*' }task { numbers << 10 numbers << 20 numbers << 30 numbers << 40 numbers << 50 numbers << 60 }
Avoid combining guards and maxForks greater than 1. Although the Selector is thread-safe and won't be damaged in any way, the guards are likely not to be set the way you expect. The multiple threads running selector's body concurrently will tend to over-write each-other's settings to the guards property.