groovyx.gpars.dataflow
Class DataflowQueue<T>

java.lang.Object
  extended by groovyx.gpars.dataflow.DataflowQueue<T>
All Implemented Interfaces:
DataflowChannel<T>, DataflowReadChannel<T>, DataflowWriteChannel<T>

public final class DataflowQueue<T>
extends java.lang.Object
implements DataflowChannel<T>

Represents a thread-safe data flow stream. Values or DataflowVariables are added using the '<<' operator and safely read once available using the 'val' property. The iterative methods like each(), collect(), iterator(), any(), all() or the for loops work with snapshots of the stream at the time of calling the particular method. For actors and Dataflow Operators the asynchronous non-blocking variants of the getValAsync() methods can be used. They register the request to read a value and will send a message to the actor or operator once the value is available.

Author:
Vaclav Pech Date: Jun 5, 2009

Field Summary
private  java.util.concurrent.LinkedBlockingQueue<DataflowVariable<T>> queue
          Stores the received DataflowVariables in the buffer.
private  java.lang.Object queueLock
          Internal lock
private  java.util.Queue<DataflowVariable<T>> requests
          Stores unsatisfied requests for values
private  java.util.Collection<MessageStream> wheneverBoundListeners
          A collection of listeners who need to be informed each time the stream is bound to a value
 
Constructor Summary
DataflowQueue()
           
 
Method Summary
 void bind(T value)
          Adds a DataflowVariable representing the passed in value to the buffer.
private  DataflowVariable<T> copyDFV(java.util.Queue<DataflowVariable<T>> from, java.util.Queue<DataflowVariable<T>> to)
           
 T getVal()
          Retrieves the value at the head of the buffer.
 T getVal(long timeout, java.util.concurrent.TimeUnit units)
          Retrieves the value at the head of the buffer.
 void getValAsync(MessageStream callback)
          Asynchronously retrieves the value at the head of the buffer.
 void getValAsync(java.lang.Object attachment, MessageStream callback)
          Asynchronously retrieves the value at the head of the buffer.
private  DataflowExpression<T> hookWheneverBoundListeners(DataflowExpression<T> expr)
          Hooks the registered when bound handlers to the supplied dataflow expression
 boolean isBound()
          Check if value has been set already for this expression
 java.util.Iterator<T> iterator()
          Returns an iterator over a current snapshot of the buffer's content.
 DataflowWriteChannel<T> leftShift(DataflowReadChannel<T> ref)
          Adds a DataflowVariable to the buffer.
 DataflowWriteChannel<T> leftShift(T value)
          Adds a DataflowVariable representing the passed in value to the buffer.
 int length()
          Returns the current size of the buffer
 DataflowExpression<T> poll()
          Retrieves the value at the head of the buffer.
private  DataflowVariable<T> retrieveForBind()
          Takes the first unsatisfied value request and binds a value on it.
private  DataflowVariable<T> retrieveOrCreateVariable()
          Checks whether there's a DFV waiting in the queue and retrieves it.
 void rightShift(groovy.lang.Closure closure)
          Schedule closure to be executed by pooled actor after data became available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled
 java.lang.String toString()
           
 void whenBound(groovy.lang.Closure closure)
          Schedule closure to be executed by pooled actor after the next data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.
 void whenBound(MessageStream stream)
          Send the next bound piece of data to the provided stream when it becomes available
 void whenBound(groovyx.gpars.group.PGroup group, groovy.lang.Closure closure)
          Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.
 void whenBound(Pool pool, groovy.lang.Closure closure)
          Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.
 void wheneverBound(groovy.lang.Closure closure)
          Send all pieces of data bound in the future to the provided stream when it becomes available *
 void wheneverBound(MessageStream stream)
          Send all pieces of data bound in the future to the provided stream when it becomes available
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

queueLock

private final java.lang.Object queueLock
Internal lock


queue

private final java.util.concurrent.LinkedBlockingQueue<DataflowVariable<T>> queue
Stores the received DataflowVariables in the buffer.


requests

private final java.util.Queue<DataflowVariable<T>> requests
Stores unsatisfied requests for values


wheneverBoundListeners

private final java.util.Collection<MessageStream> wheneverBoundListeners
A collection of listeners who need to be informed each time the stream is bound to a value

Constructor Detail

DataflowQueue

public DataflowQueue()
Method Detail

leftShift

public DataflowWriteChannel<T> leftShift(DataflowReadChannel<T> ref)
Adds a DataflowVariable to the buffer. Implementation detail - in fact another DFV is added to the buffer and an asynchronous 'whenBound' handler is registered with the supplied DFV to update the one stored in the buffer.

Specified by:
leftShift in interface DataflowWriteChannel<T>
Parameters:
ref - The DFV to add to the stream
Returns:
The current channel instance

leftShift

public DataflowWriteChannel<T> leftShift(T value)
Adds a DataflowVariable representing the passed in value to the buffer.

Specified by:
leftShift in interface DataflowWriteChannel<T>
Parameters:
value - The value to bind to the head of the stream
Returns:
The current channel instance

bind

public void bind(T value)
Adds a DataflowVariable representing the passed in value to the buffer.

Specified by:
bind in interface DataflowWriteChannel<T>
Parameters:
value - The value to bind to the head of the stream

hookWheneverBoundListeners

private DataflowExpression<T> hookWheneverBoundListeners(DataflowExpression<T> expr)
Hooks the registered when bound handlers to the supplied dataflow expression

Parameters:
expr - The expression to hook all the when bound listeners to
Returns:
The supplied expression handler to allow method chaining

retrieveForBind

private DataflowVariable<T> retrieveForBind()
Takes the first unsatisfied value request and binds a value on it. If there are no unsatisfied value requests, a new DFV is stored in the queue.

Returns:
The DFV to bind the value on

copyDFV

private DataflowVariable<T> copyDFV(java.util.Queue<DataflowVariable<T>> from,
                                    java.util.Queue<DataflowVariable<T>> to)

getVal

public T getVal()
         throws java.lang.InterruptedException
Retrieves the value at the head of the buffer. Blocks until a value is available.

Specified by:
getVal in interface DataflowReadChannel<T>
Returns:
The value bound to the DFV at the head of the stream
Throws:
java.lang.InterruptedException - If the current thread is interrupted

getVal

public T getVal(long timeout,
                java.util.concurrent.TimeUnit units)
         throws java.lang.InterruptedException
Retrieves the value at the head of the buffer. Blocks until a value is available.

Specified by:
getVal in interface DataflowReadChannel<T>
Parameters:
timeout - The timeout value
units - Units for the timeout
Returns:
The value bound to the DFV at the head of the stream
Throws:
java.lang.InterruptedException - If the current thread is interrupted

poll

public DataflowExpression<T> poll()
Retrieves the value at the head of the buffer. Returns null, if no value is available.

Specified by:
poll in interface DataflowReadChannel<T>
Returns:
The value bound to the DFV at the head of the stream or null

getValAsync

public void getValAsync(MessageStream callback)
Asynchronously retrieves the value at the head of the buffer. Sends the actual value of the variable as a message back the the supplied actor once the value has been bound. The actor can perform other activities or release a thread back to the pool by calling react() waiting for the message with the value of the Dataflow Variable.

Specified by:
getValAsync in interface DataflowReadChannel<T>
Parameters:
callback - The actor to notify when a value is bound

getValAsync

public void getValAsync(java.lang.Object attachment,
                        MessageStream callback)
Asynchronously retrieves the value at the head of the buffer. Sends a message back the the supplied actor / operator with a map holding the supplied index under the 'index' key and the actual value of the variable under the 'result' key once the value has been bound. The actor/operator can perform other activities or release a thread back to the pool by calling react() waiting for the message with the value of the Dataflow Variable.

Specified by:
getValAsync in interface DataflowReadChannel<T>
Parameters:
attachment - An arbitrary value to identify operator channels and so match requests and replies
callback - The actor / operator to notify when a value is bound

rightShift

public void rightShift(groovy.lang.Closure closure)
Schedule closure to be executed by pooled actor after data became available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled

Specified by:
rightShift in interface DataflowReadChannel<T>
Parameters:
closure - closure to execute when data available

whenBound

public void whenBound(groovy.lang.Closure closure)
Schedule closure to be executed by pooled actor after the next data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.

Specified by:
whenBound in interface DataflowReadChannel<T>
Parameters:
closure - closure to execute when data available

whenBound

public void whenBound(Pool pool,
                      groovy.lang.Closure closure)
Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.

Specified by:
whenBound in interface DataflowReadChannel<T>
Parameters:
pool - The thread pool to use for task scheduling for asynchronous message delivery
closure - closure to execute when data available

whenBound

public void whenBound(groovyx.gpars.group.PGroup group,
                      groovy.lang.Closure closure)
Description copied from interface: DataflowReadChannel
Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.

Specified by:
whenBound in interface DataflowReadChannel<T>
Parameters:
group - The PGroup to use for task scheduling for asynchronous message delivery
closure - closure to execute when data available

whenBound

public void whenBound(MessageStream stream)
Send the next bound piece of data to the provided stream when it becomes available

Specified by:
whenBound in interface DataflowReadChannel<T>
Parameters:
stream - stream where to send result

wheneverBound

public void wheneverBound(groovy.lang.Closure closure)
Send all pieces of data bound in the future to the provided stream when it becomes available *

Specified by:
wheneverBound in interface DataflowReadChannel<T>
Parameters:
closure - closure to execute when data available

wheneverBound

public void wheneverBound(MessageStream stream)
Send all pieces of data bound in the future to the provided stream when it becomes available

Specified by:
wheneverBound in interface DataflowReadChannel<T>
Parameters:
stream - stream where to send result

isBound

public boolean isBound()
Check if value has been set already for this expression

Specified by:
isBound in interface DataflowReadChannel<T>
Returns:
true if bound already

retrieveOrCreateVariable

private DataflowVariable<T> retrieveOrCreateVariable()
Checks whether there's a DFV waiting in the queue and retrieves it. If not, a new unmatched value request, represented by a new DFV, is added to the requests queue.

Returns:
The DFV to wait for value on

length

public int length()
Returns the current size of the buffer

Returns:
Number of DFVs in the queue

iterator

public java.util.Iterator<T> iterator()
Returns an iterator over a current snapshot of the buffer's content. The next() method returns actual values not the DataflowVariables.

Returns:
AN iterator over all DFVs in the queue

toString

public java.lang.String toString()
Overrides:
toString in class java.lang.Object

Copyright © 2008–2010 Václav Pech. All Rights Reserved.