Dataflow selectors and operators (processors) form the basic units in dataflow networks. They are typically combined into oriented graphs that transform data. They accept a set of input and output dataflow channels and following specific strategies they transform input values from the input channels into new values written to the output channels. The output channels at the same time are suitable to be used as input channels by some other dataflow processors. The channels allow processors to communicate.
Dataflow selectors and operators enable creation of highly concurrent applications yet the abstraction hides the low-level concurrency primitives and exposes much friendlier API. Since selectors and operators internally leverage the actor implementation, they reuse a pool of threads and so the actual number of threads used by the calculation can be kept much lower than the actual number of processors used in the network.
Modifiers | Name | Description |
---|---|---|
static java.lang.String |
INPUTS |
|
private static java.lang.String |
LISTENERS |
|
static java.lang.String |
MAX_FORKS |
|
static java.lang.String |
OUTPUTS |
|
static java.lang.String |
STATE_OBJECT |
|
protected DataflowProcessorActor |
actor |
The internal actor performing on behalf of the processor |
protected java.util.Collection<DataflowEventListener> |
listeners |
|
protected java.lang.Object |
stateObject |
May hold custom state provided at construction time and read within the body |
Constructor and description |
---|
protected DataflowProcessor
(java.util.Map<java.lang.String, java.lang.Object> channels, groovy.lang.Closure<?> code) Creates a processor After creation the processor needs to be started using the start() method. |
Type | Name and description |
---|---|
void |
addDataflowEventListener(DataflowEventListener listener) |
void |
bindAllOutputValues(java.lang.Object... values) Used by the processor's body to send a value to all output channels. |
void |
bindAllOutputValuesAtomically(java.lang.Object... values) Used by the processor's body to send a value to all output channels, while guaranteeing atomicity of the operation and preventing other calls to bindAllOutputsAtomically() from interfering with one another. |
void |
bindAllOutputs(java.lang.Object value) Used by the processor's body to send a value to all output channels. |
void |
bindAllOutputsAtomically(java.lang.Object value) Used by the processor's body to send a value to all output channels, while guaranteeing atomicity of the operation and preventing other calls to bindAllOutputsAtomically() from interfering with one another. |
void |
bindOutput(int idx, java.lang.Object value) Used by the processor's body to send a value to the given output channel |
void |
bindOutput(java.lang.Object value) Used by the processor's body to send a value to the first / only output channel |
protected static void |
checkMaxForks(java.util.Map<?, ?> channels) |
static java.util.List<DataflowReadChannel<?>> |
extractInputs(java.util.Map<java.lang.String, java.lang.Object> channels) |
private static java.util.Collection<DataflowEventListener> |
extractListeners(java.util.Map<java.lang.String, java.lang.Object> channels) |
static java.util.List<DataflowWriteChannel<?>> |
extractOutputs(java.util.Map<java.lang.String, java.lang.Object> channels) |
private static java.lang.Object |
extractState(java.util.Map<java.lang.String, java.lang.Object> channels) |
protected void |
fireAfterRun(java.util.List<java.lang.Object> messages) |
protected void |
fireAfterStart() |
protected void |
fireAfterStop() |
protected java.util.List<java.lang.Object> |
fireBeforeRun(java.util.List<java.lang.Object> messages) |
protected java.lang.Object |
fireControlMessageArrived(DataflowReadChannel channel, int index, java.lang.Object message) |
java.lang.Object |
fireCustomEvent(java.lang.Object data) |
protected java.lang.Object |
fireMessageArrived(DataflowReadChannel channel, int index, java.lang.Object message) |
protected java.lang.Object |
fireMessageSentOut(DataflowWriteChannel channel, int index, java.lang.Object message) |
protected boolean |
fireOnException(java.lang.Throwable e) |
DataflowWriteChannel<?> |
getOutput() The processor's first / only output channel |
DataflowWriteChannel<?> |
getOutputs(int idx) The processor's output channel of the given index |
java.util.List<DataflowWriteChannel<?>> |
getOutputs() The processor's all output channels |
java.lang.Object |
getStateObject() Retrieves the custom state object |
void |
join() Joins the processor waiting for it to finish |
void |
registerChannelListenersToAllInputs(DataflowChannelListener<java.lang.Object> handler) Registers the provided handler to all input channels |
void |
removeDataflowEventListener(DataflowEventListener listener) |
void |
reportError(java.lang.Throwable e) Is invoked in case the actor throws an exception. |
static boolean |
shouldBeMultiThreaded(java.util.Map<java.lang.String, java.lang.Object> channels) |
DataflowProcessor |
start(PGroup group) Starts a processor using the specified parallel group |
DataflowProcessor |
start() Starts a processor using the specified parallel group |
void |
terminate() Stops the processor immediately, potentially loosing unhandled messages |
void |
terminateAfterNextRun() Gently stops the processor after the next set of messages is handled. |
Methods inherited from class | Name |
---|---|
class java.lang.Object |
java.lang.Object#wait(), java.lang.Object#wait(long, int), java.lang.Object#wait(long), java.lang.Object#equals(java.lang.Object), java.lang.Object#toString(), java.lang.Object#hashCode(), java.lang.Object#getClass(), java.lang.Object#notify(), java.lang.Object#notifyAll() |
The internal actor performing on behalf of the processor
May hold custom state provided at construction time and read within the body
Creates a processor After creation the processor needs to be started using the start() method.
channels
- A map specifying "inputs" and "outputs" - dataflow channels (instances of the DataflowQueue or DataflowVariable classes) to use for inputs and outputscode
- The processor's body to run each time all inputs have a value to readUsed by the processor's body to send a value to all output channels. The values passed as arguments will each be sent to an output channel with identical position index.
If the maxForks value is set to a value greater than 1, calls to bindAllOutputs may result in values written to different channels to be in different order. If this is a problem for the application logic, the bindAllOutputsAtomically method should be considered instead.
values
- Values to send to output channels of the same position indexUsed by the processor's body to send a value to all output channels, while guaranteeing atomicity of the operation and preventing other calls to bindAllOutputsAtomically() from interfering with one another. The values passed as arguments will each be sent to an output channel with identical position index.
values
- Values to send to output channels of the same position indexUsed by the processor's body to send a value to all output channels. If the maxForks value is set to a value greater than 1, calls to bindAllOutputs may result in values written to different channels to be in different order. If this is a problem for the application logic, the bindAllOutputsAtomically method should be considered instead.
value
- The value to bindUsed by the processor's body to send a value to all output channels, while guaranteeing atomicity of the operation and preventing other calls to bindAllOutputsAtomically() from interfering with one another.
value
- The value to bindUsed by the processor's body to send a value to the given output channel
idx
- The index of the channel to bindvalue
- The value to bindUsed by the processor's body to send a value to the first / only output channel
value
- The value to bindThe processor's first / only output channel
The processor's output channel of the given index
idx
- The index of the channel to retrieveThe processor's all output channels
Retrieves the custom state object
Joins the processor waiting for it to finish
Registers the provided handler to all input channels
handler
- The closure to invoke whenever a value gets bound to any of the input channelsIs invoked in case the actor throws an exception.
e
- The reported exceptionStarts a processor using the specified parallel group
group
- The parallel group to use with the processorStarts a processor using the specified parallel group
Stops the processor immediately, potentially loosing unhandled messages
Gently stops the processor after the next set of messages is handled. Unlike with terminate(), no messages will get lost. If the operator never gets triggered after calling the terminateAfterNextRun() method, the operator never really stops.
Copyright © 2008–2014 Václav Pech. All Rights Reserved.