(Quick Reference)

7.3 Operators - Reference Documentation

Authors: The Whole GPars Gang

Version: 1.2.0

7.3 Operators

Dataflow Operators and Selectors provide a full Dataflow implementation with all the usual ceremony.

Concepts

Full dataflow concurrency builds on the concept of channels connecting operators and selectors, which consume values coming through input channels, transform them into new values and output the new values into their output channels. While Operators wait for all input channels to have a value available for read before they start process them, Selectors are triggered by a value available on any of the input channels.

operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
    …
    bindOutput 0, x + y + z
}

/**
 * CACHE
 *
 * Caches sites' contents. Accepts requests for url content, outputs the content. Outputs requests for download
 * if the site is not in cache yet.
 */
operator(inputs: [urlRequests], outputs: [downloadRequests, sites]) {request ->

if (!request.content) { println "[Cache] Retrieving ${request.site}" def content = cache[request.site] if (content) { println "[Cache] Found in cache" bindOutput 1, [site: request.site, word:request.word, content: content] } else { def downloads = pendingDownloads[request.site] if (downloads != null) { println "[Cache] Awaiting download" downloads << request } else { pendingDownloads[request.site] = [] println "[Cache] Asking for download" bindOutput 0, request } } } else { println "[Cache] Caching ${request.site}" cache[request.site] = request.content bindOutput 1, request def downloads = pendingDownloads[request.site] if (downloads != null) { for (downloadRequest in downloads) { println "[Cache] Waking up" bindOutput 1, [site: downloadRequest.site, word:downloadRequest.word, content: request.content] } pendingDownloads.remove(request.site) } } }

The standard error handling will print out an error message to the standard error output and terminate the operator in case an uncaught exception is thrown from withing the operator's body. To alter the behavior, you can register your own event listener:

def listener = new DataflowEventAdapter() {
    @Override
    boolean onException(final DataflowProcessor processor, final Throwable e) {
        logChannel << e
        return false   //Indicate whether to terminate the operator or not
    }
}

op = group.operator(inputs: [a, b], outputs: [c], listeners: [listener]) {x, y -> … } See the Operator lifecycle section for more details.

Types of operators

There are specialized versions of operators serving specific purposes:

  • operator - the basic general-purpose operator
  • selector - operator that is triggered by a value being available in any of its input channels
  • prioritySelector - a selector that prefers delivering messages from lower-indexed input channels over higher-indexed ones
  • splitter - a single-input operator copying its input values to all of its output channels

Wiring operators together

Operators are typically combined into networks, when some operators consume output by other operators.

operator(inputs:[a, b], outputs:[c, d]) {...}
splitter(c, [e, f])
selector(inputs:[e, d]: outputs:[]) {...}

You may alternatively refer to output channels through operators themselves:

def op1 = operator(inputs:[a, b], outputs:[c, d]) {...}
def sp1 = splitter(op1.outputs[0], [e, f])                            //takes the first output of op1
selector(inputs:[sp1.outputs[0], op1.outputs[1]]: outputs:[]) {...}   //takes the first output of sp1 and the second output of op1

Grouping operators

Dataflow operators can be organized into groups to allow for performance fine-tuning. Groups provide a handy operator() factory method to create tasks attached to the groups.

import groovyx.gpars.group.DefaultPGroup

def group = new DefaultPGroup()

group.with { operator(inputs: [a, b, c], outputs: [d]) {x, y, z -> … bindOutput 0, x + y + z } }

The default thread pool for dataflow operators contains daemon threads, which means your application will exit as soon as the main thread finishes and won't wait for all tasks to complete. When grouping operators, make sure that your custom thread pools either use daemon threads, too, which can be achieved by using DefaultPGroup or by providing your own thread factory to a thread pool constructor, or in case your thread pools use non-daemon threads, such as when using the NonDaemonPGroup group class, make sure you shutdown the group or the thread pool explicitly by calling its shutdown() method, otherwise your applications will not exit.

You may selectively override the default group used for tasks, operators, callbacks and other dataflow elements inside a code block using the _Dataflow.usingGroup() method:

Dataflow.usingGroup(group) {
    operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
        …
        bindOutput 0, x + y + z
    }
}

You can always override the default group by being specific:

Dataflow.usingGroup(group) {
    anotherGroup.operator(inputs: [a, b, c], outputs: [d]) {x, y, z ->
        …
        bindOutput 0, x + y + z
    }
}

Constructing operators

The construction properties of an operator, such as inputs , outputs , stateObject or maxForks cannot be modified once the operator has been build. You may find the groovyx.gpars.dataflow.ProcessingNode class helpful when gradually collecting channels and values into lists before you finally build an operator.

import groovyx.gpars.dataflow.Dataflow
import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.ProcessingNode.node

/** * Shows how to build operators using the ProcessingNode class */

final DataflowQueue aValues = new DataflowQueue() final DataflowQueue bValues = new DataflowQueue() final DataflowQueue results = new DataflowQueue()

//Create a config and gradually set the required properties - channels, code, etc. def adderConfig = node {valueA, valueB -> bindOutput valueA + valueB } adderConfig.inputs << aValues adderConfig.inputs << bValues adderConfig.outputs << results

//Build the operator final adder = adderConfig.operator(Dataflow.DATA_FLOW_GROUP)

//Now the operator is running and processing the data aValues << 10 aValues << 20 bValues << 1 bValues << 2

assert [11, 22] == (1..2).collect { results.val }

State in operators

Although operators can frequently do without keeping state between subsequent invocations, GPars allows operators to maintain state, if desired by the developer. One obvious way is to leverage the Groovy closure capabilities to close-over their context:

int counter = 0
operator(inputs: [a], outputs: [b]) {value ->
    counter += 1
}

Another way, which allows you to avoid declaring the state object outside of the operator definition, is to pass the state object into the operator as a stateObject parameter at construction time:

operator(inputs: [a], outputs: [b], stateObject: [counter: 0]) {value ->
    stateObject.counter += 1
}

Parallelize operators

By default an operator's body is processed by a single thread at a time. While this is a safe setting allowing the operator's body to be written in a non-thread-safe manner, once an operator becomes "hot" and data start to accumulate in the operator's input queues, you might consider allowing multiple threads to run the operator's body concurrently. Bear in mind that in such a case you need to avoid or protect shared resources from multi-threaded access. To enable multiple threads to run the operator's body concurrently, pass an extra maxForks parameter when creating an operator:

def op = operator(inputs: [a, b, c], outputs: [d, e], maxForks: 2) {x, y, z ->
    bindOutput 0, x + y + z
    bindOutput 1, x * y * z
}

The value of the maxForks parameter indicates the maximum of threads running the operator concurrently. Only positive numbers are allowed with value 1 being the default.

Please always make sure the group serving the operator holds enough threads to support all requested forks. Using groups allows you to organize tasks or operators around different thread pools (wrapped inside the group). While the Dataflow.task() command schedules the task on a default thread pool (java.util.concurrent.Executor, fixed size=#cpu+1, daemon threads), you may prefer being able to define your own thread pool(s) to run your tasks.

def group = new DefaultPGroup(10)
group.operator((inputs: [a, b, c], outputs: [d, e], maxForks: 5) {x, y, z -> ...}

The default group uses a resizeable thread pool as so will never run out of threads.

Synchronizing the output

When enabling internal parallelization of an operator by setting the value for maxForks to a value greater than 1 it is important to remember that without explicit or implicit synchronization in the operators' body race-conditions may occur. Especially bear in mind that values written to multiple output channels are not guarantied to be written atomically in the same order to all the channels

operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
    bindOutput 0, msg
    bindOutput 1, msg
}
inputChannel << 1
inputChannel << 2
inputChannel << 3
inputChannel << 4
inputChannel << 5
May result in output channels having the values mixed-up something like:
a -> 1, 3, 2, 4, 5
b -> 2, 1, 3, 5, 4

Explicit synchronization is one way to get correctly bound all output channels and protect operator not-thread local state:

def lock = new Object()
operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
    doStuffThatIsThreadSafe()

synchronized(lock) { doSomethingThatMustNotBeAccessedByMultipleThreadsAtTheSameTime() bindOutput 0, msg bindOutput 1, 2*msg } }

Obviously you need to weight the pros and cons here, since synchronization may defeat the purpose of setting maxForks to a value greater than 1.

To set values of all the operator's output channels in one atomic step, you may also consider calling either the bindAllOutputsAtomically method, passing in a single value to write to all output channels or the bindAllOutputsAtomically method, which takes a multiple values, each of which will be written to the output channel with the same position index.

operator(inputs:[inputChannel], outputs:[a, b], maxForks:5) {msg ->
    doStuffThatIsThreadSafe()
        bindAllOutputValuesAtomically msg, 2*msg
    }
}

Using the bindAllOutputs or the bindAllOutputValues methods will not guarantee atomicity of writes across al the output channels when using internal parallelism. If preserving the order of messages in multiple output channels is not an issue, bindAllOutputs as well as bindAllOutputValues will provide better performance over the atomic variants.

Operator lifecycle

Dataflow operators and selectors fire several events during their lifecycle, which allows the interested parties to obtain notifications and potential alter operator's behavior. The DataflowEventListener interface offers a couple of callback methods:

public interface DataflowEventListener {
    /**
     * Invoked immediately after the operator starts by a pooled thread before the first message is obtained
     *
     * @param processor The reporting dataflow operator/selector
     */
    void afterStart(DataflowProcessor processor);

/** * Invoked immediately after the operator terminates * * @param processor The reporting dataflow operator/selector */ void afterStop(DataflowProcessor processor);

/** * Invoked if an exception occurs. * If any of the listeners returns true, the operator will terminate. * Exceptions outside of the operator's body or listeners' messageSentOut() handlers will terminate the operator irrespective of the listeners' votes. * * @param processor The reporting dataflow operator/selector * @param e The thrown exception * @return True, if the operator should terminate in response to the exception, false otherwise. */ boolean onException(DataflowProcessor processor, Throwable e);

/** * Invoked when a message becomes available in an input channel. * * @param processor The reporting dataflow operator/selector * @param channel The input channel holding the message * @param index The index of the input channel within the operator * @param message The incoming message * @return The original message or a message that should be used instead */ Object messageArrived(DataflowProcessor processor, DataflowReadChannel<Object> channel, int index, Object message);

/** * Invoked when a control message (instances of ControlMessage) becomes available in an input channel. * * @param processor The reporting dataflow operator/selector * @param channel The input channel holding the message * @param index The index of the input channel within the operator * @param message The incoming message * @return The original message or a message that should be used instead */ Object controlMessageArrived(DataflowProcessor processor, DataflowReadChannel<Object> channel, int index, Object message);

/** * Invoked when a message is being bound to an output channel. * * @param processor The reporting dataflow operator/selector * @param channel The output channel to send the message to * @param index The index of the output channel within the operator * @param message The message to send * @return The original message or a message that should be used instead */ Object messageSentOut(DataflowProcessor processor, DataflowWriteChannel<Object> channel, int index, Object message);

/** * Invoked when all messages required to trigger the operator become available in the input channels. * * @param processor The reporting dataflow operator/selector * @param messages The incoming messages * @return The original list of messages or a modified/new list of messages that should be used instead */ List<Object> beforeRun(DataflowProcessor processor, List<Object> messages);

/** * Invoked when the operator completes a single run * * @param processor The reporting dataflow operator/selector * @param messages The incoming messages that have been processed */ void afterRun(DataflowProcessor processor, List<Object> messages);

/** * Invoked when the fireCustomEvent() method is triggered manually on a dataflow operator/selector * * @param processor The reporting dataflow operator/selector * @param data The custom piece of data provided as part of the event * @return A value to return from the fireCustomEvent() method to the caller (event initiator) */ Object customEvent(DataflowProcessor processor, Object data); }

A default implementation is provided through the DataflowEventAdapter class.

Listeners provide a way to handle exceptions, when they occur inside operators. A listener may typically log such exceptions, notify a supervising entity, generate an alternative output or perform any steps required to recover from the situation. If there's no listener registered or if any of the listeners returns true the operator will terminate, preserving the contract of afterStop() . Exceptions that occur outside the actual operator's body, i.e. at the parameter preparation phase before the body is triggered or at the clean-up and channel subscription phase, after the body finishes, always lead to operator termination.

The fireCustomEvent() method available on operators and selectors may be used to communicate back and forth between operator's body and the interested listeners:

final listener = new DataflowEventAdapter() {
    @Override
    Object customEvent(DataflowProcessor processor, Object data) {
        println "Log: Getting quite high on the scale $data"
        return 100  //The value to use instead
    }
}

op = group.operator(inputs: [a, b], outputs: [c], listeners: [listener]) {x, y -> final sum = x + y if (sum > 100) bindOutput(fireCustomEvent(sum)) //Reporting that the sum is too high, binding the lowered value that comes back else bindOutput sum }

Selectors

Selector's body should be a closure consuming either one or two arguments.

selector (inputs : [a, b, c], outputs : [d, e]) {value ->
    ....
}

The two-argument closure will get a value plus an index of the input channel, the value of which is currently being processed. This allows the selector to distinguish between values coming through different input channels.

selector (inputs : [a, b, c], outputs : [d, e]) {value, index ->
    ....
}

Priority Selector

When priorities need to be preserved among input channels, a DataflowPrioritySelector should be used.

prioritySelector(inputs : [a, b, c], outputs : [d, e]) {value, index ->
    …
}

The priority selector will always prefer values from channels with lower position index over values coming through the channels with higher position index.

Join selector

A selector without a body closure specified will copy all incoming values to all of its output channels.

def join = selector (inputs : [programmers, analysis, managers], outputs : [employees, colleagues])

Internal parallelism

The maxForks attribute allowing for internal selectors parallelism is also available.

selector (inputs : [a, b, c], outputs : [d, e], maxForks : 5) {value ->
    ....
}

Guards

Just like Selects , Selectors also allow the users to temporarily include/exclude individual input channels from selection. The guards input property can be used to set the initial mask on all input channels and the setGuards and setGuard methods are then available in the selector's body.

import groovyx.gpars.dataflow.DataflowQueue
import static groovyx.gpars.dataflow.Dataflow.selector
import static groovyx.gpars.dataflow.Dataflow.task

/** * Demonstrates the ability to enable/disable channels during a value selection on a select by providing boolean guards. */ final DataflowQueue operations = new DataflowQueue() final DataflowQueue numbers = new DataflowQueue()

def instruction def nums = []

selector(inputs: [operations, numbers], outputs: [], guards: [true, false]) {value, index -> //initial guards is set here if (index == 0) { instruction = value setGuard(0, false) //setGuard() used here setGuard(1, true) } else nums << value if (nums.size() == 2) { setGuards([true, false]) //setGuards() used here final def formula = "${nums[0]} $instruction ${nums[1]}" println "$formula = ${new GroovyShell().evaluate(formula)}" nums.clear() } }

task { operations << '+' operations << '+' operations << '*' }

task { numbers << 10 numbers << 20 numbers << 30 numbers << 40 numbers << 50 numbers << 60 }

Avoid combining guards and maxForks greater than 1. Although the Selector is thread-safe and won't be damaged in any way, the guards are likely not to be set the way you expect. The multiple threads running selector's body concurrently will tend to over-write each-other's settings to the guards property.