7.3 Operators - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.0-SNAPSHOT
7.3 Operators
Dataflow Operators and Selectors provide a full Dataflow implementation with all the usual ceremony.Concepts
Full dataflow concurrency builds on the concept of channels connecting operators and selectors, which consume values coming through input channels, transform them into new values and output the new values into their output channels. While Operators wait for all input channels to have a value available for read before they start process them, Selectors are triggered by a value available on any of the input channels.operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
…
bindOutput 0, x + y + z
}
/** * CACHE * * Caches sites' contents. Accepts requests for url content, outputs the content. Outputs requests for download * if the site is not in cache yet. */ operator(inputs: [urlRequests], outputs: [downloadRequests, sites]) {request -> if (!request.content) { println "[Cache] Retrieving ${request.site}" def content = cache[request.site] if (content) { println "[Cache] Found in cache" bindOutput 1, [site: request.site, word:request.word, content: content] } else { def downloads = pendingDownloads[request.site] if (downloads != null) { println "[Cache] Awaiting download" downloads << request } else { pendingDownloads[request.site] = [] println "[Cache] Asking for download" bindOutput 0, request } } } else { println "[Cache] Caching ${request.site}" cache[request.site] = request.content bindOutput 1, request def downloads = pendingDownloads[request.site] if (downloads != null) { for (downloadRequest in downloads) { println "[Cache] Waking up" bindOutput 1, [site: downloadRequest.site, word:downloadRequest.word, content: request.content] } pendingDownloads.remove(request.site) } } }
op.addErrorHandler {Throwable e ->
//handle the exception
terminate() //You can also terminate the operator
}
Types of operators
There are specialized versions of operators serving specific purposes:- operator - the basic general-purpose operator
- selector - operator that is triggered by a value being available in any of its input channels
- prioritySelector - a selector that prefers delivering messages from lower-indexed input channels over higher-indexed ones
- splitter - a single-input operator copying its input values to all of its output channels
Wiring operators together
Operators are typically combined into networks, when some operators consume output by other operators.operator(inputs:[a, b], outputs:[c, d]) {...}
splitter(c, [e, f])
selector(inputs:[e, d]: outputs:[]) {...}
def op1 = operator(inputs:[a, b], outputs:[c, d]) {...}
def sp1 = splitter(op1.outputs[0], [e, f]) //takes the first output of op1
selector(inputs:[sp1.outputs[0], op1.outputs[1]]: outputs:[]) {...} //takes the first output of sp1 and the second output of op1
Grouping operators
Dataflow operators can be organized into groups to allow for performance fine-tuning. Groups provide a handy operator() factory method to create tasks attached to the groups.import groovyx.gpars.group.DefaultPGroupdef group = new DefaultPGroup()group.with { operator(inputs: [a, b, c], outputs: [d]) {x, y, z -> … bindOutput 0, x + y + z } }
The default thread pool for dataflow operators contains daemon threads, which means your application will exit as soon as the main thread finishes and won't wait for all tasks to complete. When grouping operators, make sure that your custom thread pools either use daemon threads, too, which can be achieved by using DefaultPGroup or by providing your own thread factory to a thread pool constructor, or in case your thread pools use non-daemon threads, such as when using the NonDaemonPGroup group class, make sure you shutdown the group or the thread pool explicitly by calling its shutdown() method, otherwise your applications will not exit.
Constructing operators
The construction properties of an operator, such as inputs , outputs or maxForks cannot be modified once the operator has been build. You may find the groovyx.gpars.dataflow.ProcessingNode class helpful when gradually collecting channels and values into lists before you finally build an operator.import groovyx.gpars.dataflow.Dataflow import groovyx.gpars.dataflow.DataflowQueue import static groovyx.gpars.dataflow.ProcessingNode.node/** * Shows how to build operators using the ProcessingNode class */final DataflowQueue aValues = new DataflowQueue() final DataflowQueue bValues = new DataflowQueue() final DataflowQueue results = new DataflowQueue()//Create a config and gradually set the required properties - channels, code, etc. def adderConfig = node {valueA, valueB -> bindOutput valueA + valueB } adderConfig.inputs << aValues adderConfig.inputs << bValues adderConfig.outputs << results//Build the operator final adder = adderConfig.operator(Dataflow.DATA_FLOW_GROUP)//Now the operator is running and processing the data aValues << 10 aValues << 20 bValues << 1 bValues << 2assert [11, 22] == (1..2).collect { results.val }
Parallelize operators
By default an operator's body is processed by a single thread at a time. While this is a safe setting allowing the operator's body to be written in a non-thread-safe manner, once an operator becomes "hot" and data start to accumulate in the operator's input queues, you might consider allowing multiple threads to run the operator's body concurrently. Bear in mind that in such a case you need to avoid or protect shared resources from multi-threaded access. To enable multiple threads to run the operator's body concurrently, pass an extra maxForks parameter when creating an operator:def op = operator(inputs: [a, b, c], outputs: [d, e], maxForks: 2) {x, y, z ->
bindOutput 0, x + y + z
bindOutput 1, x * y * z
}
Please always make sure the group serving the operator holds enough threads to support all requested forks. Using groups allows you to organize tasks or operators around different thread pools (wrapped inside the group). While the Dataflow.task() command schedules the task on a default thread pool (java.util.concurrent.Executor, fixed size=#cpu+1, daemon threads), you may prefer being able to define your own thread pool(s) to run your tasks.The default group uses a resizeable thread pool as so will never run out of threads.def group = new DefaultPGroup(10) group.operator((inputs: [a, b, c], outputs: [d, e], maxForks: 5) {x, y, z -> ...}
Synchronizing the output
When enabling internal parallelization of an operator by setting the value for maxForks to a value greater than 1 it is important to remember that without explicit or implicit synchronization in the operators' body race-conditions may occur. Especially bear in mind that values written to multiple output channels are not guarantied to be written atomically in the same order to all the channelsoperator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
bindOutput 0, msg
bindOutput 1, msg
}
inputChannel << 1
inputChannel << 2
inputChannel << 3
inputChannel << 4
inputChannel << 5
a -> 1, 3, 2, 4, 5 b -> 2, 1, 3, 5, 4
def lock = new Object() operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg -> doStuffThatIsThreadSafe() synchronized(lock) { doSomethingThatMustNotBeAccessedByMultipleThreadsAtTheSameTime() bindOutput 0, msg bindOutput 1, 2*msg } }
operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
doStuffThatIsThreadSafe()
bindAllOutputValuesAtomically msg, 2*msg
}
}
Using the bindAllOutputs or the bindAllOutputValues methods will not guarantee atomicity of writes across al the output channels when using internal parallelism. If preserving the order of messages in multiple output channels is not an issue, bindAllOutputs as well as bindAllOutputValues will provide better performance over the atomic variants.
Terminating operators
Dataflow operators and selectors can be terminated in two ways:- by calling the terminate() method on all operators that need to be terminated
- by sending a poisson message.
def op1 = operator(inputs: [a, b, c], outputs: [d, e]) {x, y, z -> }def op2 = selector(inputs: [d], outputs: [f, out]) { }def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }[op1, op2, op3]*.terminate() //Terminate all operators by calling the terminate() method on them
op1.join()
op2.join()
op3.join()
def op1 = operator(inputs: [a, b, c], outputs: [d, e]) {x, y, z -> }def op2 = selector(inputs: [d], outputs: [f, out]) { }def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }a << PoisonPill.instance //Send the poissonop1.join()
op2.join()
op3.join()
Given the potential variety of operator networks and their asynchronous nature, a good termination strategy is that operators and selectors should only ever terminate themselves. All ways of terminating them from outside (either by calling the terminate() method or by sending poisson down the stream) may result in messages being lost somewhere in the pipes, when the reading operators terminate before they fully handle the messages waiting in their input channels.
Stopping operators gently
Operators handle incoming messages repeatedly. The only safe moment for stopping an operator without the risk of loosing any messages is right after the operator has finished processing messages is just about to look for more messages in its incoming pipes. This is exactly what the terminateAfterNextRun() method does. It will schedule the operator for shutdown after the next set of messages gets handled. This may be particularly handy when you use a group of operators/selectors to load-balance messages coming from a channel. Once the work-load decreases, the terminateAfterNextRun() method may be used to safely reduce the pool of load-balancing operators.Selectors
Selector's body should be a closure consuming either one or two arguments.selector (inputs : [a, b, c], outputs : [d, e]) {value -> .... }
selector (inputs : [a, b, c], outputs : [d, e]) {value, index -> .... }
Priority Selector
When priorities need to be preserved among input channels, a DataflowPrioritySelector should be used.prioritySelector(inputs : [a, b, c], outputs : [d, e]) {value, index -> … }
Join selector
A selector without a body closure specified will copy all incoming values to all of its output channels.def join = selector (inputs : [programmers, analysis, managers], outputs : [employees, colleagues])
Internal parallelism
The maxForks attribute allowing for internal selectors parallelism is also available.selector (inputs : [a, b, c], outputs : [d, e], maxForks : 5) {value -> .... }
Guards
Just like Selects , Selectors also allow the users to temporarily include/exclude individual input channels from selection. The guards input property can be used to set the initial mask on all input channels and the setGuards and setGuard methods are then available in the selector's body.import groovyx.gpars.dataflow.DataflowQueue import static groovyx.gpars.dataflow.Dataflow.selector import static groovyx.gpars.dataflow.Dataflow.task/** * Demonstrates the ability to enable/disable channels during a value selection on a select by providing boolean guards. */ final DataflowQueue operations = new DataflowQueue() final DataflowQueue numbers = new DataflowQueue()def instruction def nums = []selector(inputs: [operations, numbers], outputs: [], guards: [true, false]) {value, index -> //initial guards is set here if (index == 0) { instruction = value setGuard(0, false) //setGuard() used here setGuard(1, true) } else nums << value if (nums.size() == 2) { setGuards([true, false]) //setGuards() used here final def formula = "${nums[0]} $instruction ${nums[1]}" println "$formula = ${new GroovyShell().evaluate(formula)}" nums.clear() } }task { operations << '+' operations << '+' operations << '*' }task { numbers << 10 numbers << 20 numbers << 30 numbers << 40 numbers << 50 numbers << 60 }
Avoid combining guards and maxForks greater than 1. Although the Selector is thread-safe and won't be damaged in any way, the guards are likely not to be set the way you expect. The multiple threads running selector's body concurrently will tend to over-write each-other's settings to the guards property.