org.codehaus.gpars

groovyx.gpars.dataflow
[Java] Class DataflowQueue

java.lang.Object
  groovyx.gpars.dataflow.DataflowQueue
All Implemented Interfaces:
DataflowChannel

@SuppressWarnings({"ClassWithTooManyMethods"})
public final class DataflowQueue
extends java.lang.Object

Represents a thread-safe data flow stream. Values or DataflowVariables are added using the '<<' operator and safely read once available using the 'val' property. The iterative methods like each(), collect(), iterator(), any(), all() or the for loops work with snapshots of the stream at the time of calling the particular method. For actors and Dataflow Operators the asynchronous non-blocking variants of the getValAsync() methods can be used. They register the request to read a value and will send a message to the actor or operator once the value is available.

Authors:
Vaclav Pech Date: Jun 5, 2009


Field Summary
private java.util.concurrent.LinkedBlockingQueue queue

Stores the received DataflowVariables in the buffer.

private java.lang.Object queueLock

Internal lock

private java.util.Queue requests

Stores unsatisfied requests for values

private java.util.Collection wheneverBoundListeners

A collection of listeners who need to be informed each time the stream is bound to a value

 
Constructor Summary
DataflowQueue()

 
Method Summary
void bind(T value)

Adds a DataflowVariable representing the passed in value to the buffer.

private DataflowVariable copyDFV(java.util.Queue from, java.util.Queue to)

T getVal()

Retrieves the value at the head of the buffer.

T getVal(long timeout, java.util.concurrent.TimeUnit units)

Retrieves the value at the head of the buffer.

void getValAsync(MessageStream callback)

Asynchronously retrieves the value at the head of the buffer.

void getValAsync(java.lang.Object attachment, MessageStream callback)

Asynchronously retrieves the value at the head of the buffer.

private DataflowExpression hookWheneverBoundListeners(DataflowExpression expr)

Hooks the registered when bound handlers to the supplied dataflow expression

boolean isBound()

Check if value has been set already for this expression

java.util.Iterator iterator()

Returns an iterator over a current snapshot of the buffer's content.

DataflowWriteChannel leftShift(DataflowReadChannel ref)

Adds a DataflowVariable to the buffer.

DataflowWriteChannel leftShift(T value)

Adds a DataflowVariable representing the passed in value to the buffer.

int length()

Returns the current size of the buffer

DataflowExpression poll()

Retrieves the value at the head of the buffer.

private DataflowVariable retrieveForBind()

Takes the first unsatisfied value request and binds a value on it.

private DataflowVariable retrieveOrCreateVariable()

Checks whether there's a DFV waiting in the queue and retrieves it.

void rightShift(groovy.lang.Closure closure)

Schedule closure to be executed by pooled actor after data became available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled

java.lang.String toString()

void whenBound(groovy.lang.Closure closure)

Schedule closure to be executed by pooled actor after the next data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.

void whenBound(Pool pool, groovy.lang.Closure closure)

Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.

void whenBound(PGroup group, groovy.lang.Closure closure)

void whenBound(MessageStream stream)

Send the next bound piece of data to the provided stream when it becomes available

void wheneverBound(groovy.lang.Closure closure)

Send all pieces of data bound in the future to the provided stream when it becomes available *

void wheneverBound(MessageStream stream)

Send all pieces of data bound in the future to the provided stream when it becomes available

 
Methods inherited from class java.lang.Object
java.lang.Object#wait(long), java.lang.Object#wait(), java.lang.Object#wait(long, int), java.lang.Object#equals(java.lang.Object), java.lang.Object#toString(), java.lang.Object#hashCode(), java.lang.Object#getClass(), java.lang.Object#notify(), java.lang.Object#notifyAll()
 

Field Detail

queue

private final java.util.concurrent.LinkedBlockingQueue queue
Stores the received DataflowVariables in the buffer.


queueLock

private final java.lang.Object queueLock
Internal lock


requests

private final java.util.Queue requests
Stores unsatisfied requests for values


wheneverBoundListeners

private final java.util.Collection wheneverBoundListeners
A collection of listeners who need to be informed each time the stream is bound to a value


 
Constructor Detail

DataflowQueue

DataflowQueue()


 
Method Detail

bind

@Override
public void bind(T value)
Adds a DataflowVariable representing the passed in value to the buffer.
Parameters:
value - The value to bind to the head of the stream


copyDFV

private DataflowVariable copyDFV(java.util.Queue from, java.util.Queue to)


getVal

@Override
public T getVal()
Retrieves the value at the head of the buffer. Blocks until a value is available.
throws:
InterruptedException If the current thread is interrupted
Returns:
The value bound to the DFV at the head of the stream


getVal

@Override
public T getVal(long timeout, java.util.concurrent.TimeUnit units)
Retrieves the value at the head of the buffer. Blocks until a value is available.
throws:
InterruptedException If the current thread is interrupted
Parameters:
timeout - The timeout value
units - Units for the timeout
Returns:
The value bound to the DFV at the head of the stream


getValAsync

@Override
public void getValAsync(MessageStream callback)
Asynchronously retrieves the value at the head of the buffer. Sends the actual value of the variable as a message back the the supplied actor once the value has been bound. The actor can perform other activities or release a thread back to the pool by calling react() waiting for the message with the value of the Dataflow Variable.
Parameters:
callback - The actor to notify when a value is bound


getValAsync

@Override
public void getValAsync(java.lang.Object attachment, MessageStream callback)
Asynchronously retrieves the value at the head of the buffer. Sends a message back the the supplied actor / operator with a map holding the supplied index under the 'index' key and the actual value of the variable under the 'result' key once the value has been bound. The actor/operator can perform other activities or release a thread back to the pool by calling react() waiting for the message with the value of the Dataflow Variable.
Parameters:
attachment - An arbitrary value to identify operator channels and so match requests and replies
callback - The actor / operator to notify when a value is bound


hookWheneverBoundListeners

private DataflowExpression hookWheneverBoundListeners(DataflowExpression expr)
Hooks the registered when bound handlers to the supplied dataflow expression
Parameters:
expr - The expression to hook all the when bound listeners to
Returns:
The supplied expression handler to allow method chaining


isBound

@Override
public boolean isBound()
Check if value has been set already for this expression
Returns:
true if bound already


iterator

public java.util.Iterator iterator()
Returns an iterator over a current snapshot of the buffer's content. The next() method returns actual values not the DataflowVariables.
Returns:
AN iterator over all DFVs in the queue


leftShift

@Override
@SuppressWarnings("unchecked")
public DataflowWriteChannel leftShift(DataflowReadChannel ref)
Adds a DataflowVariable to the buffer. Implementation detail - in fact another DFV is added to the buffer and an asynchronous 'whenBound' handler is registered with the supplied DFV to update the one stored in the buffer.
Parameters:
ref - The DFV to add to the stream


leftShift

@Override
public DataflowWriteChannel leftShift(T value)
Adds a DataflowVariable representing the passed in value to the buffer.
Parameters:
value - The value to bind to the head of the stream


length

public int length()
Returns the current size of the buffer
Returns:
Number of DFVs in the queue


poll

@Override
public DataflowExpression poll()
Retrieves the value at the head of the buffer. Returns null, if no value is available.
Returns:
The value bound to the DFV at the head of the stream or null


retrieveForBind

private DataflowVariable retrieveForBind()
Takes the first unsatisfied value request and binds a value on it. If there are no unsatisfied value requests, a new DFV is stored in the queue.
Returns:
The DFV to bind the value on


retrieveOrCreateVariable

private DataflowVariable retrieveOrCreateVariable()
Checks whether there's a DFV waiting in the queue and retrieves it. If not, a new unmatched value request, represented by a new DFV, is added to the requests queue.
Returns:
The DFV to wait for value on


rightShift

@Override
public void rightShift(groovy.lang.Closure closure)
Schedule closure to be executed by pooled actor after data became available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled
Parameters:
closure - closure to execute when data available


toString

@Override
public java.lang.String toString()


whenBound

@Override
public void whenBound(groovy.lang.Closure closure)
Schedule closure to be executed by pooled actor after the next data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.
Parameters:
closure - closure to execute when data available


whenBound

@Override
public void whenBound(Pool pool, groovy.lang.Closure closure)
Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.
Parameters:
pool - The thread pool to use for task scheduling for asynchronous message delivery
closure - closure to execute when data available


whenBound

@Override
public void whenBound(PGroup group, groovy.lang.Closure closure)


whenBound

@Override
public void whenBound(MessageStream stream)
Send the next bound piece of data to the provided stream when it becomes available
Parameters:
stream - stream where to send result


wheneverBound

@Override
public void wheneverBound(groovy.lang.Closure closure)
Send all pieces of data bound in the future to the provided stream when it becomes available *
Parameters:
closure - closure to execute when data available


wheneverBound

@Override
public void wheneverBound(MessageStream stream)
Send all pieces of data bound in the future to the provided stream when it becomes available
Parameters:
stream - stream where to send result


 

Copyright © 2008–2010 Václav Pech. All Rights Reserved.