Class DataflowProcessor

  extended by groovyx.gpars.dataflow.operator.DataflowProcessor
Direct Known Subclasses:
DataflowOperator, DataflowSelector

public abstract class DataflowProcessor
extends java.lang.Object

Dataflow selectors and operators (processors) form the basic units in dataflow networks. They are typically combined into oriented graphs that transform data. They accept a set of input and output dataflow channels and following specific strategies they transform input values from the input channels into new values written to the output channels. The output channels at the same time are suitable to be used as input channels by some other dataflow processors. The channels allow processors to communicate.

Dataflow selectors and operators enable creation of highly concurrent applications yet the abstraction hides the low-level concurrency primitives and exposes much friendlier API. Since selectors and operators internally leverage the actor implementation, they reuse a pool of threads and so the actual number of threads used by the calculation can be kept much lower than the actual number of processors used in the network.

Vaclav Pech Date: Sep 9, 2009

Field Summary
protected  DataflowProcessorActor actor
          The internal actor performing on behalf of the processor
static java.lang.String INPUTS
(package private)  java.util.Collection<DataflowEventListener> listeners
private static java.lang.String LISTENERS
static java.lang.String MAX_FORKS
static java.lang.String OUTPUTS
static java.lang.String STATE_OBJECT
protected  java.lang.Object stateObject
          May hold custom state provided at construction time and read within the body
Constructor Summary
protected DataflowProcessor(java.util.Map<java.lang.String,java.lang.Object> channels, groovy.lang.Closure code)
          Creates a processor After creation the processor needs to be started using the start() method.
Method Summary
 void addDataflowEventListener(DataflowEventListener listener)
 void bindAllOutputs(java.lang.Object value)
          Used by the processor's body to send a value to all output channels.
 void bindAllOutputsAtomically(java.lang.Object value)
          Used by the processor's body to send a value to all output channels, while guaranteeing atomicity of the operation and preventing other calls to bindAllOutputsAtomically() from interfering with one another.
 void bindAllOutputValues(java.lang.Object... values)
          Used by the processor's body to send a value to all output channels.
 void bindAllOutputValuesAtomically(java.lang.Object... values)
          Used by the processor's body to send a value to all output channels, while guaranteeing atomicity of the operation and preventing other calls to bindAllOutputsAtomically() from interfering with one another.
 void bindOutput(int idx, java.lang.Object value)
          Used by the processor's body to send a value to the given output channel
 void bindOutput(java.lang.Object value)
          Used by the processor's body to send a value to the first / only output channel
(package private) static java.util.List<DataflowReadChannel> extractInputs(java.util.Map<java.lang.String,java.lang.Object> channels)
private static java.util.Collection<DataflowEventListener> extractListeners(java.util.Map<java.lang.String,java.lang.Object> channels)
(package private) static java.util.List<DataflowWriteChannel> extractOutputs(java.util.Map<java.lang.String,java.lang.Object> channels)
private static java.lang.Object extractState(java.util.Map<java.lang.String,java.lang.Object> channels)
protected  void fireAfterRun(java.util.List<java.lang.Object> messages)
protected  void fireAfterStart()
protected  void fireAfterStop()
protected  java.util.List<java.lang.Object> fireBeforeRun(java.util.List<java.lang.Object> messages)
protected  java.lang.Object fireControlMessageArrived(DataflowReadChannel channel, int index, java.lang.Object message)
 java.lang.Object fireCustomEvent(java.lang.Object data)
protected  java.lang.Object fireMessageArrived(DataflowReadChannel channel, int index, java.lang.Object message)
protected  java.lang.Object fireMessageSentOut(DataflowWriteChannel channel, int index, java.lang.Object message)
protected  boolean fireOnException(java.lang.Throwable e)
 DataflowWriteChannel getOutput()
          The processor's first / only output channel
 java.util.List<DataflowWriteChannel> getOutputs()
          The processor's all output channels
 DataflowWriteChannel getOutputs(int idx)
          The processor's output channel of the given index
 java.lang.Object getStateObject()
          Retrieves the custom state object
protected static void checkMaxForks(java.util.Map channels)
 void join()
          Joins the processor waiting for it to finish
 void registerChannelListenersToAllInputs(DataflowChannelListener<java.lang.Object> handler)
          Registers the provided handler to all input channels
 void removeDataflowEventListener(DataflowEventListener listener)
(package private)  void reportError(java.lang.Throwable e)
          Is invoked in case the actor throws an exception.
(package private) static boolean shouldBeMultiThreaded(java.util.Map<java.lang.String,java.lang.Object> channels)
 DataflowProcessor start()
          Starts a processor using the specified parallel group
 DataflowProcessor start(PGroup group)
          Starts a processor using the specified parallel group
 void terminate()
          Stops the processor immediately, potentially loosing unhandled messages
 void terminateAfterNextRun()
          Gently stops the processor after the next set of messages is handled.
Field Detail


public static final java.lang.String INPUTS
public static final java.lang.String OUTPUTS
public static final java.lang.String MAX_FORKS
public static final java.lang.String STATE_OBJECT
private static final java.lang.String LISTENERS
protected DataflowProcessorActor actor
The internal actor performing on behalf of the processor


protected final java.lang.Object stateObject
May hold custom state provided at construction time and read within the body


final java.util.Collection<DataflowEventListener> listeners
Constructor Detail


protected DataflowProcessor(java.util.Map<java.lang.String,java.lang.Object> channels,
                            groovy.lang.Closure code)
Creates a processor After creation the processor needs to be started using the start() method.

channels - A map specifying "inputs" and "outputs" - dataflow channels (instances of the DataflowQueue or DataflowVariable classes) to use for inputs and outputs
code - The processor's body to run each time all inputs have a value to read
Method Detail


static boolean shouldBeMultiThreaded(java.util.Map<java.lang.String,java.lang.Object> channels)


static java.util.List<DataflowReadChannel> extractInputs(java.util.Map<java.lang.String,java.lang.Object> channels)


static java.util.List<DataflowWriteChannel> extractOutputs(java.util.Map<java.lang.String,java.lang.Object> channels)


private static java.lang.Object extractState(java.util.Map<java.lang.String,java.lang.Object> channels)


private static java.util.Collection<DataflowEventListener> extractListeners(java.util.Map<java.lang.String,java.lang.Object> channels)


protected static void checkMaxForks(java.util.Map channels)


public final DataflowProcessor start(PGroup group)
Starts a processor using the specified parallel group

group - The parallel group to use with the processor
This operator instance


public final DataflowProcessor start()
Starts a processor using the specified parallel group

This operator instance


public final void terminate()
Stops the processor immediately, potentially loosing unhandled messages


public final void terminateAfterNextRun()
Gently stops the processor after the next set of messages is handled. Unlike with terminate(), no messages will get lost. If the operator never gets triggered after calling the terminateAfterNextRun() method, the operator never really stops.


public final void join()
                throws java.lang.InterruptedException
Joins the processor waiting for it to finish

java.lang.InterruptedException - If the thread gets interrupted


public final void bindOutput(int idx,
                             java.lang.Object value)
Used by the processor's body to send a value to the given output channel

idx - The index of the channel to bind
value - The value to bind


public final void bindOutput(java.lang.Object value)
Used by the processor's body to send a value to the first / only output channel

value - The value to bind


public final void bindAllOutputs(java.lang.Object value)
Used by the processor's body to send a value to all output channels. If the maxForks value is set to a value greater than 1, calls to bindAllOutputs may result in values written to different channels to be in different order. If this is a problem for the application logic, the bindAllOutputsAtomically method should be considered instead.

value - The value to bind


public final void bindAllOutputValues(java.lang.Object... values)
Used by the processor's body to send a value to all output channels. The values passed as arguments will each be sent to an output channel with identical position index.

If the maxForks value is set to a value greater than 1, calls to bindAllOutputs may result in values written to different channels to be in different order. If this is a problem for the application logic, the bindAllOutputsAtomically method should be considered instead.

values - Values to send to output channels of the same position index


public final void bindAllOutputsAtomically(java.lang.Object value)
Used by the processor's body to send a value to all output channels, while guaranteeing atomicity of the operation and preventing other calls to bindAllOutputsAtomically() from interfering with one another.

value - The value to bind


public final void bindAllOutputValuesAtomically(java.lang.Object... values)
Used by the processor's body to send a value to all output channels, while guaranteeing atomicity of the operation and preventing other calls to bindAllOutputsAtomically() from interfering with one another. The values passed as arguments will each be sent to an output channel with identical position index.

values - Values to send to output channels of the same position index


public final DataflowWriteChannel getOutputs(int idx)
The processor's output channel of the given index

idx - The index of the channel to retrieve
The particular DataflowWriteChannel instance


public final java.util.List<DataflowWriteChannel> getOutputs()
The processor's all output channels

A List holding all output channels


public final DataflowWriteChannel getOutput()
The processor's first / only output channel

The particular DataflowWriteChannel instance


public final java.lang.Object getStateObject()
Retrieves the custom state object

The state object associated with the operator


final void reportError(java.lang.Throwable e)
Is invoked in case the actor throws an exception.

e - The reported exception


public final void addDataflowEventListener(DataflowEventListener listener)


public final void removeDataflowEventListener(DataflowEventListener listener)


public final void registerChannelListenersToAllInputs(DataflowChannelListener<java.lang.Object> handler)
Registers the provided handler to all input channels

handler - The closure to invoke whenever a value gets bound to any of the input channels


protected final void fireAfterStart()


protected final void fireAfterStop()


protected final boolean fireOnException(java.lang.Throwable e)


public final java.lang.Object fireCustomEvent(java.lang.Object data)


protected final java.lang.Object fireMessageArrived(DataflowReadChannel channel,
                                                    int index,
                                                    java.lang.Object message)


protected final java.lang.Object fireControlMessageArrived(DataflowReadChannel channel,
                                                           int index,
                                                           java.lang.Object message)


protected final java.lang.Object fireMessageSentOut(DataflowWriteChannel channel,
                                                    int index,
                                                    java.lang.Object message)


protected final java.util.List<java.lang.Object> fireBeforeRun(java.util.List<java.lang.Object> messages)


protected final void fireAfterRun(java.util.List<java.lang.Object> messages)

