@java.lang.SuppressWarnings({"ClassWithTooManyMethods", "unchecked"}) public class DataflowQueue extends WithSerialId
Represents a thread-safe data flow stream. Values or DataflowVariables are added using the '<<' operator and safely read once available using the 'val' property. The iterative methods like each(), collect(), iterator(), any(), all() or the for loops work with snapshots of the stream at the time of calling the particular method. For actors and Dataflow Operators the asynchronous non-blocking variants of the getValAsync() methods can be used. They register the request to read a value and will send a message to the actor or operator once the value is available.
Modifiers | Name | Description |
---|---|---|
private DataflowChannelEventOrchestrator<T> |
eventManager |
|
private java.util.concurrent.LinkedBlockingQueue<DataflowVariable<T>> |
queue |
Stores the received DataflowVariables in the buffer. |
private java.lang.Object |
queueLock |
Internal lock |
private java.util.Queue<DataflowVariable<T>> |
requests |
Stores unsatisfied requests for values |
private java.util.Collection<MessageStream> |
wheneverBoundListeners |
A collection of listeners who need to be informed each time the stream is bound to a value |
Fields inherited from class | Fields |
---|---|
class WithSerialId |
serialHandle, serialVersionUID |
Constructor and description |
---|
DataflowQueue
() |
Type | Name and description |
---|---|
void |
binaryChoice(DataflowWriteChannel<T> trueBranch, DataflowWriteChannel<T> falseBranch, groovy.lang.Closure<java.lang.Boolean> code) |
void |
binaryChoice(Pool pool, DataflowWriteChannel<T> trueBranch, DataflowWriteChannel<T> falseBranch, groovy.lang.Closure<java.lang.Boolean> code) |
void |
binaryChoice(PGroup group, DataflowWriteChannel<T> trueBranch, DataflowWriteChannel<T> falseBranch, groovy.lang.Closure<java.lang.Boolean> code) |
void |
binaryChoice(java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> trueBranch, DataflowWriteChannel<T> falseBranch, groovy.lang.Closure<java.lang.Boolean> code) |
void |
binaryChoice(Pool pool, java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> trueBranch, DataflowWriteChannel<T> falseBranch, groovy.lang.Closure<java.lang.Boolean> code) |
void |
binaryChoice(PGroup group, java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> trueBranch, DataflowWriteChannel<T> falseBranch, groovy.lang.Closure<java.lang.Boolean> code) |
void |
bind(T value) Adds a DataflowVariable representing the passed in value to the buffer. |
DataflowReadChannel<V> |
chainWith(groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
chainWith(Pool pool, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
chainWith(PGroup group, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
chainWith(java.util.Map<java.lang.String, java.lang.Object> params, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
chainWith(Pool pool, java.util.Map<java.lang.String, java.lang.Object> params, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
chainWith(PGroup group, java.util.Map<java.lang.String, java.lang.Object> params, groovy.lang.Closure<V> closure) |
void |
choice(java.util.List<DataflowWriteChannel<T>> outputs, groovy.lang.Closure<java.lang.Integer> code) |
void |
choice(Pool pool, java.util.List<DataflowWriteChannel<T>> outputs, groovy.lang.Closure<java.lang.Integer> code) |
void |
choice(PGroup group, java.util.List<DataflowWriteChannel<T>> outputs, groovy.lang.Closure<java.lang.Integer> code) |
void |
choice(java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowWriteChannel<T>> outputs, groovy.lang.Closure<java.lang.Integer> code) |
void |
choice(Pool pool, java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowWriteChannel<T>> outputs, groovy.lang.Closure<java.lang.Integer> code) |
void |
choice(PGroup group, java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowWriteChannel<T>> outputs, groovy.lang.Closure<java.lang.Integer> code) |
private DataflowVariable<T> |
copyDFV(java.util.Queue<DataflowVariable<T>> from, java.util.Queue<DataflowVariable<T>> to) |
protected DataflowVariable<T> |
createVariable() Creates a new variable to perform the next data exchange |
DataflowReadChannel<T> |
filter(groovy.lang.Closure<java.lang.Boolean> closure) |
DataflowReadChannel<T> |
filter(Pool pool, groovy.lang.Closure<java.lang.Boolean> closure) |
DataflowReadChannel<T> |
filter(PGroup group, groovy.lang.Closure<java.lang.Boolean> closure) |
DataflowReadChannel<T> |
filter(java.util.Map<java.lang.String, java.lang.Object> params, groovy.lang.Closure<java.lang.Boolean> closure) |
DataflowReadChannel<T> |
filter(Pool pool, java.util.Map<java.lang.String, java.lang.Object> params, groovy.lang.Closure<java.lang.Boolean> closure) |
DataflowReadChannel<T> |
filter(PGroup group, java.util.Map<java.lang.String, java.lang.Object> params, groovy.lang.Closure<java.lang.Boolean> closure) |
private void |
fireOnMessage(T value) |
DataflowChannelEventListenerManager<T> |
getEventManager() |
java.util.concurrent.LinkedBlockingQueue<DataflowVariable<T>> |
getQueue() |
java.lang.Class<groovyx.gpars.dataflow.remote.RemoteDataflowQueue> |
getRemoteClass() |
T |
getVal() Retrieves the value at the head of the buffer. |
T |
getVal(long timeout, java.util.concurrent.TimeUnit units) Retrieves the value at the head of the buffer. |
void |
getValAsync(MessageStream callback) Asynchronously retrieves the value at the head of the buffer. |
void |
getValAsync(java.lang.Object attachment, MessageStream callback) Asynchronously retrieves the value at the head of the buffer. |
private DataflowExpression<T> |
hookWheneverBoundListeners(DataflowExpression<T> expr) Hooks the registered when bound handlers to the supplied dataflow expression |
void |
into(DataflowWriteChannel<T> target) |
void |
into(Pool pool, DataflowWriteChannel<T> target) |
void |
into(PGroup group, DataflowWriteChannel<T> target) |
void |
into(java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> target) |
void |
into(Pool pool, java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> target) |
void |
into(PGroup group, java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> target) |
boolean |
isBound() Check if value has been set already for this expression |
java.util.Iterator<T> |
iterator() Returns an iterator over a current snapshot of the buffer's content. |
DataflowWriteChannel<T> |
leftShift(DataflowReadChannel<T> ref) Adds a DataflowVariable to the buffer. |
DataflowWriteChannel<T> |
leftShift(T value) Adds a DataflowVariable representing the passed in value to the buffer. |
int |
length() Returns the current size of the buffer |
DataflowReadChannel<V> |
merge(DataflowReadChannel<java.lang.Object> other, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
merge(Pool pool, DataflowReadChannel<java.lang.Object> other, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
merge(PGroup group, DataflowReadChannel<java.lang.Object> other, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
merge(java.util.List<DataflowReadChannel<java.lang.Object>> others, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
merge(Pool pool, java.util.List<DataflowReadChannel<java.lang.Object>> others, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
merge(PGroup group, java.util.List<DataflowReadChannel<java.lang.Object>> others, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
merge(java.util.Map<java.lang.String, java.lang.Object> params, DataflowReadChannel<java.lang.Object> other, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
merge(Pool pool, java.util.Map<java.lang.String, java.lang.Object> params, DataflowReadChannel<java.lang.Object> other, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
merge(PGroup group, java.util.Map<java.lang.String, java.lang.Object> params, DataflowReadChannel<java.lang.Object> other, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
merge(java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowReadChannel<java.lang.Object>> others, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
merge(Pool pool, java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowReadChannel<java.lang.Object>> others, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
merge(PGroup group, java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowReadChannel<java.lang.Object>> others, groovy.lang.Closure<V> closure) |
DataflowReadChannel<V> |
or(groovy.lang.Closure<V> closure) |
void |
or(DataflowWriteChannel<T> target) |
DataflowExpression<T> |
poll() Retrieves the value at the head of the buffer. |
private DataflowVariable<T> |
retrieveForBind() Takes the first unsatisfied value request and binds a value on it. |
private DataflowVariable<T> |
retrieveOrCreateVariable() Checks whether there's a DFV waiting in the queue and retrieves it. |
Promise<V> |
rightShift(groovy.lang.Closure<V> closure) Schedule closure to be executed by pooled actor after data became available. |
void |
separate(java.util.List<DataflowWriteChannel<?>> outputs, groovy.lang.Closure<java.util.List<java.lang.Object>> code) |
void |
separate(Pool pool, java.util.List<DataflowWriteChannel<?>> outputs, groovy.lang.Closure<java.util.List<java.lang.Object>> code) |
void |
separate(PGroup group, java.util.List<DataflowWriteChannel<?>> outputs, groovy.lang.Closure<java.util.List<java.lang.Object>> code) |
void |
separate(java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowWriteChannel<?>> outputs, groovy.lang.Closure<java.util.List<java.lang.Object>> code) |
void |
separate(Pool pool, java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowWriteChannel<?>> outputs, groovy.lang.Closure<java.util.List<java.lang.Object>> code) |
void |
separate(PGroup group, java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowWriteChannel<?>> outputs, groovy.lang.Closure<java.util.List<java.lang.Object>> code) |
void |
split(DataflowWriteChannel<T> target1, DataflowWriteChannel<T> target2) |
void |
split(Pool pool, DataflowWriteChannel<T> target1, DataflowWriteChannel<T> target2) |
void |
split(PGroup group, DataflowWriteChannel<T> target1, DataflowWriteChannel<T> target2) |
void |
split(java.util.List<DataflowWriteChannel<T>> targets) |
void |
split(Pool pool, java.util.List<DataflowWriteChannel<T>> targets) |
void |
split(PGroup group, java.util.List<DataflowWriteChannel<T>> targets) |
void |
split(java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> target1, DataflowWriteChannel<T> target2) |
void |
split(Pool pool, java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> target1, DataflowWriteChannel<T> target2) |
void |
split(PGroup group, java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> target1, DataflowWriteChannel<T> target2) |
void |
split(java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowWriteChannel<T>> targets) |
void |
split(Pool pool, java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowWriteChannel<T>> targets) |
void |
split(PGroup group, java.util.Map<java.lang.String, java.lang.Object> params, java.util.List<DataflowWriteChannel<T>> targets) |
DataflowReadChannel<T> |
tap(DataflowWriteChannel<T> target) |
DataflowReadChannel<T> |
tap(Pool pool, DataflowWriteChannel<T> target) |
DataflowReadChannel<T> |
tap(PGroup group, DataflowWriteChannel<T> target) |
DataflowReadChannel<T> |
tap(java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> target) |
DataflowReadChannel<T> |
tap(Pool pool, java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> target) |
DataflowReadChannel<T> |
tap(PGroup group, java.util.Map<java.lang.String, java.lang.Object> params, DataflowWriteChannel<T> target) |
Promise<V> |
then(groovy.lang.Closure<V> closure) Schedule closure to be executed after data became available. |
Promise<V> |
then(Pool pool, groovy.lang.Closure<V> closure) Schedule closure to be executed after data becomes available. |
Promise<V> |
then(PGroup group, groovy.lang.Closure<V> closure) Schedule closure to be executed after data becomes available. |
java.lang.String |
toString() |
void |
whenBound(groovy.lang.Closure<V> closure) Schedule closure to be executed by pooled actor after the next data becomes available. |
void |
whenBound(Pool pool, groovy.lang.Closure<V> closure) Schedule closure to be executed by pooled actor after data becomes available. |
void |
whenBound(PGroup group, groovy.lang.Closure<V> closure) |
void |
whenBound(MessageStream stream) Send the next bound piece of data to the provided stream when it becomes available. |
void |
wheneverBound(groovy.lang.Closure<V> closure) Send all pieces of data bound in the future to the provided stream when it becomes available |
void |
wheneverBound(MessageStream stream) Send all pieces of data bound in the future to the provided stream when it becomes available. |
Methods inherited from class | Name |
---|---|
class WithSerialId |
createRemoteHandle, getOrCreateSerialHandle, getRemoteClass, writeReplace |
class java.lang.Object |
java.lang.Object#wait(), java.lang.Object#wait(long, int), java.lang.Object#wait(long), java.lang.Object#equals(java.lang.Object), java.lang.Object#toString(), java.lang.Object#hashCode(), java.lang.Object#getClass(), java.lang.Object#notify(), java.lang.Object#notifyAll() |
Stores the received DataflowVariables in the buffer.
Internal lock
Stores unsatisfied requests for values
A collection of listeners who need to be informed each time the stream is bound to a value
Adds a DataflowVariable representing the passed in value to the buffer.
value
- The value to bind to the head of the streamCreates a new variable to perform the next data exchange
Retrieves the value at the head of the buffer. Blocks until a value is available.
Retrieves the value at the head of the buffer. Blocks until a value is available.
timeout
- The timeout valueunits
- Units for the timeoutAsynchronously retrieves the value at the head of the buffer. Sends the actual value of the variable as a message back the the supplied actor once the value has been bound. The actor can perform other activities or release a thread back to the pool by calling react() waiting for the message with the value of the Dataflow Variable.
callback
- The actor to notify when a value is boundAsynchronously retrieves the value at the head of the buffer. Sends a message back the the supplied actor / operator with a map holding the supplied index under the 'index' key and the actual value of the variable under the 'result' key once the value has been bound. The actor/operator can perform other activities or release a thread back to the pool by calling react() waiting for the message with the value of the Dataflow Variable.
attachment
- An arbitrary value to identify operator channels and so match requests and repliescallback
- The actor / operator to notify when a value is boundHooks the registered when bound handlers to the supplied dataflow expression
expr
- The expression to hook all the when bound listeners toCheck if value has been set already for this expression
Returns an iterator over a current snapshot of the buffer's content. The next() method returns actual values not the DataflowVariables.
Adds a DataflowVariable to the buffer. Implementation detail - in fact another DFV is added to the buffer and an asynchronous 'whenBound' handler is registered with the supplied DFV to update the one stored in the buffer.
ref
- The DFV to add to the streamAdds a DataflowVariable representing the passed in value to the buffer.
value
- The value to bind to the head of the streamReturns the current size of the buffer
Retrieves the value at the head of the buffer. Returns null, if no value is available.
Takes the first unsatisfied value request and binds a value on it. If there are no unsatisfied value requests, a new DFV is stored in the queue.
Checks whether there's a DFV waiting in the queue and retrieves it. If not, a new unmatched value request, represented by a new DFV, is added to the requests queue.
Schedule closure to be executed by pooled actor after data became available. It is important to notice that even if the expression is already bound the execution of closure will not happen immediately but will be scheduled
closure
- closure to execute when data becomes available. The closure should take at most one argument.Schedule closure to be executed after data became available. It is important to notice that even if the expression is already bound the execution of closure will not happen immediately but will be scheduled
closure
- closure to execute when data becomes available. The closure should take at most one argument.Schedule closure to be executed after data becomes available. It is important to notice that even if the expression is already bound the execution of closure will not happen immediately but will be scheduled.
pool
- The thread pool to use for task scheduling for asynchronous message deliveryclosure
- closure to execute when data becomes available. The closure should take at most one argument.Schedule closure to be executed after data becomes available. It is important to notice that even if the expression is already bound the execution of closure will not happen immediately but will be scheduled.
group
- The PGroup to use for task scheduling for asynchronous message deliveryclosure
- closure to execute when data becomes available. The closure should take at most one argument.Schedule closure to be executed by pooled actor after the next data becomes available. It is important to notice that even if the expression is already bound the execution of closure will not happen immediately but will be scheduled.
closure
- closure to execute when data becomes available. The closure should take at most one argument.Schedule closure to be executed by pooled actor after data becomes available. It is important to notice that even if the expression is already bound the execution of closure will not happen immediately but will be scheduled.
pool
- The thread pool to use for task scheduling for asynchronous message deliveryclosure
- closure to execute when data becomes available. The closure should take at most one argument.Send the next bound piece of data to the provided stream when it becomes available.
stream
- stream where to send resultSend all pieces of data bound in the future to the provided stream when it becomes available. *
closure
- closure to execute when data becomes available. The closure should take at most one argument.Send all pieces of data bound in the future to the provided stream when it becomes available.
stream
- stream where to send resultCopyright © 2008–2014 Václav Pech. All Rights Reserved.