groovyx.gpars.dataflow.stream
Class DataflowStreamReadAdapter<T>

java.lang.Object
  extended by groovyx.gpars.dataflow.stream.DataflowStreamReadAdapter<T>
Type Parameters:
T - The type of messages to pass through the stream
All Implemented Interfaces:
DataflowReadChannel<T>

public final class DataflowStreamReadAdapter<T>
extends java.lang.Object
implements DataflowReadChannel<T>

Adapts a DataflowStream to accommodate for the DataflowReadChannel interface. To minimize the overhead and stay in-line with the DataflowStream semantics, the DataflowStreamReadAdapter class is not thread-safe and should only be used from within a single thread. If multiple threads need to read from a DataflowStream, they should each create their own wrapping DataflowStreamReadAdapter.

Author:
Vaclav Pech

Field Summary
private  DataflowStream<T> asyncHead
           
private  DataflowStream<T> head
           
 
Constructor Summary
DataflowStreamReadAdapter(DataflowStream<T> stream)
          Creates a new adapter
 
Method Summary
 T getVal()
          Reads the current value of the channel.
 T getVal(long timeout, java.util.concurrent.TimeUnit units)
          Reads the current value of the channel.
 void getValAsync(MessageStream callback)
          Asynchronously retrieves the value from the channel.
 void getValAsync(java.lang.Object attachment, MessageStream callback)
          Asynchronously retrieves the value from the channel.
 boolean isBound()
          Check if value has been set already for this expression
 java.util.Iterator<T> iterator()
           
private  void moveAsyncHead()
           
private  void moveHead()
           
 DataflowExpression<T> poll()
          Retrieves the value at the head of the buffer.
 void rightShift(groovy.lang.Closure closure)
          Schedule closure to be executed by pooled actor after data became available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled
 java.lang.String toString()
           
 void whenBound(groovy.lang.Closure closure)
          Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.
 void whenBound(MessageStream stream)
          Send the bound data to provided stream when it becomes available
 void whenBound(groovyx.gpars.group.PGroup group, groovy.lang.Closure closure)
          Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.
 void whenBound(Pool pool, groovy.lang.Closure closure)
          Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.
 void wheneverBound(groovy.lang.Closure closure)
          Send all pieces of data bound in the future to the provided stream when it becomes available *
 void wheneverBound(MessageStream stream)
          Send all pieces of data bound in the future to the provided stream when it becomes available
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

head

private DataflowStream<T> head

asyncHead

private DataflowStream<T> asyncHead
Constructor Detail

DataflowStreamReadAdapter

public DataflowStreamReadAdapter(DataflowStream<T> stream)
Creates a new adapter

Parameters:
stream - The stream to wrap
Method Detail

iterator

public java.util.Iterator<T> iterator()

toString

public java.lang.String toString()
Overrides:
toString in class java.lang.Object

getVal

public T getVal()
         throws java.lang.InterruptedException
Description copied from interface: DataflowReadChannel
Reads the current value of the channel. Blocks, if the value has not been assigned yet.

Specified by:
getVal in interface DataflowReadChannel<T>
Returns:
The actual value
Throws:
java.lang.InterruptedException - If the current thread gets interrupted while waiting for the channel to be bound

getVal

public T getVal(long timeout,
                java.util.concurrent.TimeUnit units)
         throws java.lang.InterruptedException
Description copied from interface: DataflowReadChannel
Reads the current value of the channel. Blocks up to given timeout, if the value has not been assigned yet.

Specified by:
getVal in interface DataflowReadChannel<T>
Parameters:
timeout - The timeout value
units - Units for the timeout
Returns:
The actual value
Throws:
java.lang.InterruptedException - If the current thread gets interrupted while waiting for the channel to be bound

getValAsync

public void getValAsync(MessageStream callback)
Description copied from interface: DataflowReadChannel
Asynchronously retrieves the value from the channel. Sends the actual value of the channel as a message back the the supplied actor once the value has been bound. The actor can perform other activities or release a thread back to the pool by calling react() waiting for the message with the value of the Dataflow channel.

Specified by:
getValAsync in interface DataflowReadChannel<T>
Parameters:
callback - An actor to send the bound value to.

getValAsync

public void getValAsync(java.lang.Object attachment,
                        MessageStream callback)
Description copied from interface: DataflowReadChannel
Asynchronously retrieves the value from the channel. Sends a message back the the supplied MessageStream with a map holding the supplied attachment under the 'attachment' key and the actual value of the channel under the 'result' key once the value has been bound. Attachment is an arbitrary value helping the actor.operator match its request with the reply. The actor/operator can perform other activities or release a thread back to the pool by calling react() waiting for the message with the value of the Dataflow channel.

Specified by:
getValAsync in interface DataflowReadChannel<T>
Parameters:
attachment - arbitrary non-null attachment if reader needs better identification of result
callback - An actor to send the bound value plus the supplied index to.

rightShift

public void rightShift(groovy.lang.Closure closure)
Description copied from interface: DataflowReadChannel
Schedule closure to be executed by pooled actor after data became available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled

Specified by:
rightShift in interface DataflowReadChannel<T>
Parameters:
closure - closure to execute when data available

whenBound

public void whenBound(groovy.lang.Closure closure)
Description copied from interface: DataflowReadChannel
Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.

Specified by:
whenBound in interface DataflowReadChannel<T>
Parameters:
closure - closure to execute when data available

whenBound

public void whenBound(Pool pool,
                      groovy.lang.Closure closure)
Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.

Specified by:
whenBound in interface DataflowReadChannel<T>
Parameters:
pool - The thread pool to use for task scheduling for asynchronous message delivery
closure - closure to execute when data available

whenBound

public void whenBound(groovyx.gpars.group.PGroup group,
                      groovy.lang.Closure closure)
Description copied from interface: DataflowReadChannel
Schedule closure to be executed by pooled actor after data becomes available It is important to notice that even if data already available the execution of closure will not happen immediately but will be scheduled.

Specified by:
whenBound in interface DataflowReadChannel<T>
Parameters:
group - The PGroup to use for task scheduling for asynchronous message delivery
closure - closure to execute when data available

whenBound

public void whenBound(MessageStream stream)
Description copied from interface: DataflowReadChannel
Send the bound data to provided stream when it becomes available

Specified by:
whenBound in interface DataflowReadChannel<T>
Parameters:
stream - stream where to send result

wheneverBound

public void wheneverBound(groovy.lang.Closure closure)
Description copied from interface: DataflowReadChannel
Send all pieces of data bound in the future to the provided stream when it becomes available *

Specified by:
wheneverBound in interface DataflowReadChannel<T>
Parameters:
closure - closure to execute when data available

wheneverBound

public void wheneverBound(MessageStream stream)
Description copied from interface: DataflowReadChannel
Send all pieces of data bound in the future to the provided stream when it becomes available

Specified by:
wheneverBound in interface DataflowReadChannel<T>
Parameters:
stream - stream where to send result

isBound

public boolean isBound()
Description copied from interface: DataflowReadChannel
Check if value has been set already for this expression

Specified by:
isBound in interface DataflowReadChannel<T>
Returns:
true if bound already

poll

public DataflowExpression<T> poll()
                           throws java.lang.InterruptedException
Description copied from interface: DataflowReadChannel
Retrieves the value at the head of the buffer. Returns null, if no value is available.

Specified by:
poll in interface DataflowReadChannel<T>
Returns:
The value bound to the DFV at the head of the stream or null
Throws:
java.lang.InterruptedException - If the current thread is interrupted

moveHead

private void moveHead()

moveAsyncHead

private void moveAsyncHead()

Copyright © 2008–2010 Václav Pech. All Rights Reserved.