public abstract class DataflowProcessor
extends java.lang.Object
Dataflow selectors and operators enable creation of highly concurrent applications yet the abstraction hides the low-level concurrency primitives and exposes much friendlier API. Since selectors and operators internally leverage the actor implementation, they reuse a pool of threads and so the actual number of threads used by the calculation can be kept much lower than the actual number of processors used in the network.
Modifier and Type | Field and Description |
---|---|
protected DataflowProcessorActor |
actor
The internal actor performing on behalf of the processor
|
static java.lang.String |
INPUTS |
protected java.util.Collection<DataflowEventListener> |
listeners |
private static java.lang.String |
LISTENERS |
static java.lang.String |
MAX_FORKS |
static java.lang.String |
OUTPUTS |
static java.lang.String |
STATE_OBJECT |
protected java.lang.Object |
stateObject
May hold custom state provided at construction time and read within the body
|
Modifier | Constructor and Description |
---|---|
protected |
DataflowProcessor(java.util.Map<java.lang.String,java.lang.Object> channels,
groovy.lang.Closure<?> code)
Creates a processor
After creation the processor needs to be started using the start() method.
|
Modifier and Type | Method and Description |
---|---|
void |
addDataflowEventListener(DataflowEventListener listener) |
void |
bindAllOutputs(java.lang.Object value)
Used by the processor's body to send a value to all output channels.
|
void |
bindAllOutputsAtomically(java.lang.Object value)
Used by the processor's body to send a value to all output channels, while guaranteeing atomicity of the operation
and preventing other calls to bindAllOutputsAtomically() from interfering with one another.
|
void |
bindAllOutputValues(java.lang.Object... values)
Used by the processor's body to send a value to all output channels.
|
void |
bindAllOutputValuesAtomically(java.lang.Object... values)
Used by the processor's body to send a value to all output channels, while guaranteeing atomicity of the operation
and preventing other calls to bindAllOutputsAtomically() from interfering with one another.
|
void |
bindOutput(int idx,
java.lang.Object value)
Used by the processor's body to send a value to the given output channel
|
void |
bindOutput(java.lang.Object value)
Used by the processor's body to send a value to the first / only output channel
|
protected static void |
checkMaxForks(java.util.Map<?,?> channels) |
(package private) static java.util.List<DataflowReadChannel<?>> |
extractInputs(java.util.Map<java.lang.String,java.lang.Object> channels) |
private static java.util.Collection<DataflowEventListener> |
extractListeners(java.util.Map<java.lang.String,java.lang.Object> channels) |
(package private) static java.util.List<DataflowWriteChannel<?>> |
extractOutputs(java.util.Map<java.lang.String,java.lang.Object> channels) |
private static java.lang.Object |
extractState(java.util.Map<java.lang.String,java.lang.Object> channels) |
protected void |
fireAfterRun(java.util.List<java.lang.Object> messages) |
protected void |
fireAfterStart() |
protected void |
fireAfterStop() |
protected java.util.List<java.lang.Object> |
fireBeforeRun(java.util.List<java.lang.Object> messages) |
protected java.lang.Object |
fireControlMessageArrived(DataflowReadChannel channel,
int index,
java.lang.Object message) |
java.lang.Object |
fireCustomEvent(java.lang.Object data) |
protected java.lang.Object |
fireMessageArrived(DataflowReadChannel channel,
int index,
java.lang.Object message) |
protected java.lang.Object |
fireMessageSentOut(DataflowWriteChannel channel,
int index,
java.lang.Object message) |
protected boolean |
fireOnException(java.lang.Throwable e) |
DataflowWriteChannel<?> |
getOutput()
The processor's first / only output channel
|
java.util.List<DataflowWriteChannel<?>> |
getOutputs()
The processor's all output channels
|
DataflowWriteChannel<?> |
getOutputs(int idx)
The processor's output channel of the given index
|
java.lang.Object |
getStateObject()
Retrieves the custom state object
|
void |
join()
Joins the processor waiting for it to finish
|
void |
registerChannelListenersToAllInputs(DataflowChannelListener<java.lang.Object> handler)
Registers the provided handler to all input channels
|
void |
removeDataflowEventListener(DataflowEventListener listener) |
(package private) void |
reportError(java.lang.Throwable e)
Is invoked in case the actor throws an exception.
|
(package private) static boolean |
shouldBeMultiThreaded(java.util.Map<java.lang.String,java.lang.Object> channels) |
DataflowProcessor |
start()
Starts a processor using the specified parallel group
|
DataflowProcessor |
start(PGroup group)
Starts a processor using the specified parallel group
|
void |
terminate()
Stops the processor immediately, potentially loosing unhandled messages
|
void |
terminateAfterNextRun()
Gently stops the processor after the next set of messages is handled.
|
public static final java.lang.String INPUTS
public static final java.lang.String OUTPUTS
public static final java.lang.String MAX_FORKS
public static final java.lang.String STATE_OBJECT
private static final java.lang.String LISTENERS
protected DataflowProcessorActor actor
protected final java.lang.Object stateObject
protected final java.util.Collection<DataflowEventListener> listeners
protected DataflowProcessor(java.util.Map<java.lang.String,java.lang.Object> channels, groovy.lang.Closure<?> code)
channels
- A map specifying "inputs" and "outputs" - dataflow channels (instances of the DataflowQueue or DataflowVariable classes) to use for inputs and outputscode
- The processor's body to run each time all inputs have a value to readstatic boolean shouldBeMultiThreaded(java.util.Map<java.lang.String,java.lang.Object> channels)
static java.util.List<DataflowReadChannel<?>> extractInputs(java.util.Map<java.lang.String,java.lang.Object> channels)
static java.util.List<DataflowWriteChannel<?>> extractOutputs(java.util.Map<java.lang.String,java.lang.Object> channels)
private static java.lang.Object extractState(java.util.Map<java.lang.String,java.lang.Object> channels)
private static java.util.Collection<DataflowEventListener> extractListeners(java.util.Map<java.lang.String,java.lang.Object> channels)
protected static void checkMaxForks(java.util.Map<?,?> channels)
public final DataflowProcessor start(PGroup group)
group
- The parallel group to use with the processorpublic final DataflowProcessor start()
public final void terminate()
public final void terminateAfterNextRun()
public final void join() throws java.lang.InterruptedException
java.lang.InterruptedException
- If the thread gets interruptedpublic final void bindOutput(int idx, java.lang.Object value)
idx
- The index of the channel to bindvalue
- The value to bindpublic final void bindOutput(java.lang.Object value)
value
- The value to bindpublic final void bindAllOutputs(java.lang.Object value)
value
- The value to bindpublic final void bindAllOutputValues(java.lang.Object... values)
If the maxForks value is set to a value greater than 1, calls to bindAllOutputs may result in values written to different channels to be in different order. If this is a problem for the application logic, the bindAllOutputsAtomically method should be considered instead.
values
- Values to send to output channels of the same position indexpublic final void bindAllOutputsAtomically(java.lang.Object value)
value
- The value to bindpublic final void bindAllOutputValuesAtomically(java.lang.Object... values)
values
- Values to send to output channels of the same position indexpublic final DataflowWriteChannel<?> getOutputs(int idx)
idx
- The index of the channel to retrievepublic final java.util.List<DataflowWriteChannel<?>> getOutputs()
public final DataflowWriteChannel<?> getOutput()
public final java.lang.Object getStateObject()
final void reportError(java.lang.Throwable e)
e
- The reported exceptionpublic final void addDataflowEventListener(DataflowEventListener listener)
public final void removeDataflowEventListener(DataflowEventListener listener)
public final void registerChannelListenersToAllInputs(DataflowChannelListener<java.lang.Object> handler)
handler
- The closure to invoke whenever a value gets bound to any of the input channelsprotected final void fireAfterStart()
protected final void fireAfterStop()
protected final boolean fireOnException(java.lang.Throwable e)
public final java.lang.Object fireCustomEvent(java.lang.Object data)
protected final java.lang.Object fireMessageArrived(DataflowReadChannel channel, int index, java.lang.Object message)
protected final java.lang.Object fireControlMessageArrived(DataflowReadChannel channel, int index, java.lang.Object message)
protected final java.lang.Object fireMessageSentOut(DataflowWriteChannel channel, int index, java.lang.Object message)
protected final java.util.List<java.lang.Object> fireBeforeRun(java.util.List<java.lang.Object> messages)
protected final void fireAfterRun(java.util.List<java.lang.Object> messages)