groovyx.gpars.dataflow
Class SyncDataflowStreamReadAdapter<T>

java.lang.Object
  extended by groovyx.gpars.dataflow.stream.DataflowStreamReadAdapter<T>
      extended by groovyx.gpars.dataflow.SyncDataflowStreamReadAdapter<T>
Type Parameters:
T - The type of messages to pass through the stream
All Implemented Interfaces:
DataflowReadChannel<T>

final class SyncDataflowStreamReadAdapter<T>
extends DataflowStreamReadAdapter<T>

Provides a special implementation of DataflowStreamReadAdapter, which cooperates with SyncDataflowBroadcast subscription and un-subscription mechanism.

Author:
Vaclav Pech

Field Summary
private  boolean closed
           
private  boolean wheneverBoundSet
           
 
Constructor Summary
SyncDataflowStreamReadAdapter(StreamCore<T> stream)
          Creates a new adapter
 
Method Summary
(package private)  void close()
          Closes the channel so that it cannot be used any longer
 T getVal()
          Reads the current value of the channel.
 T getVal(long timeout, java.util.concurrent.TimeUnit units)
          Reads the current value of the channel.
 void getValAsync(MessageStream callback)
          Asynchronously retrieves the value from the channel.
 void getValAsync(java.lang.Object attachment, MessageStream callback)
          Asynchronously retrieves the value from the channel.
<V> DataflowReadChannel<V>
chainWith(PGroup group, groovy.lang.Closure<V> closure)
          Creates and attaches a new operator processing values from the channel
private  void checkClosed()
           
 boolean isBound()
          Check if value has been set already for this expression
 java.util.Iterator<T> iterator()
           
<V> DataflowReadChannel<V>
merge(PGroup group, java.util.List<DataflowReadChannel<java.lang.Object>> others, groovy.lang.Closure<V> closure)
          Merges channels together as inputs for a single dataflow operator.
 DataflowExpression<T> poll()
          Retrieves the value at the head of the buffer.
<V> Promise<V>
rightShift(groovy.lang.Closure<V> closure)
          Schedule closure to be executed after data became available.
 DataflowReadChannel<T> tap(PGroup group, DataflowWriteChannel<T> target)
          Taps into the pipeline.
<V> void
whenBound(groovy.lang.Closure<V> closure)
          Schedule closure to be executed after data becomes available.
 void whenBound(MessageStream stream)
          Send the bound data to provided stream when it becomes available
<V> void
whenBound(PGroup group, groovy.lang.Closure<V> closure)
          Schedule closure to be executed after data becomes available.
<V> void
whenBound(Pool pool, groovy.lang.Closure<V> closure)
          Schedule closure to be executed by pooled actor after data becomes available.
<V> void
wheneverBound(groovy.lang.Closure<V> closure)
          Send all pieces of data bound in the future to the provided stream when it becomes available
 void wheneverBound(MessageStream stream)
          Send all pieces of data bound in the future to the provided stream when it becomes available.
 
Methods inherited from class groovyx.gpars.dataflow.stream.DataflowStreamReadAdapter
allUnprocessedDFVs, binaryChoice, binaryChoice, binaryChoice, binaryChoice, binaryChoice, binaryChoice, filter, filter, filter, filter, filter, filter, getEventManager, chainWith, chainWith, chainWith, chainWith, chainWith, choice, choice, choice, choice, choice, choice, into, into, into, into, into, into, length, merge, merge, merge, merge, merge, merge, merge, merge, merge, merge, merge, or, or, separate, separate, separate, separate, separate, separate, split, split, split, split, split, split, split, split, split, split, split, split, tap, tap, tap, tap, tap, then, then, then, toString
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

closed

private boolean closed

wheneverBoundSet

private boolean wheneverBoundSet
Constructor Detail

SyncDataflowStreamReadAdapter

SyncDataflowStreamReadAdapter(StreamCore<T> stream)
Creates a new adapter

Parameters:
stream - The stream to wrap
Method Detail

iterator

public java.util.Iterator<T> iterator()
Overrides:
iterator in class DataflowStreamReadAdapter<T>

getVal

public T getVal()
         throws java.lang.InterruptedException
Description copied from interface: DataflowReadChannel
Reads the current value of the channel. Blocks, if the value has not been assigned yet.

Specified by:
getVal in interface DataflowReadChannel<T>
Overrides:
getVal in class DataflowStreamReadAdapter<T>
Returns:
The actual value
Throws:
java.lang.InterruptedException - If the current thread gets interrupted while waiting for the channel to be bound

getVal

public T getVal(long timeout,
                java.util.concurrent.TimeUnit units)
         throws java.lang.InterruptedException
Description copied from interface: DataflowReadChannel
Reads the current value of the channel. Blocks up to given timeout, if the value has not been assigned yet.

Specified by:
getVal in interface DataflowReadChannel<T>
Overrides:
getVal in class DataflowStreamReadAdapter<T>
Parameters:
timeout - The timeout value
units - Units for the timeout
Returns:
The actual value
Throws:
java.lang.InterruptedException - If the current thread gets interrupted while waiting for the channel to be bound

getValAsync

public void getValAsync(MessageStream callback)
Description copied from interface: DataflowReadChannel
Asynchronously retrieves the value from the channel. Sends the actual value of the channel as a message back the the supplied actor once the value has been bound. The actor can perform other activities or release a thread back to the pool by calling react() waiting for the message with the value of the Dataflow channel.

Specified by:
getValAsync in interface DataflowReadChannel<T>
Overrides:
getValAsync in class DataflowStreamReadAdapter<T>
Parameters:
callback - An actor to send the bound value to.

getValAsync

public void getValAsync(java.lang.Object attachment,
                        MessageStream callback)
Description copied from interface: DataflowReadChannel
Asynchronously retrieves the value from the channel. Sends a message back the the supplied MessageStream with a map holding the supplied attachment under the 'attachment' key and the actual value of the channel under the 'result' key once the value has been bound. Attachment is an arbitrary value helping the actor.operator match its request with the reply. The actor/operator can perform other activities or release a thread back to the pool by calling react() waiting for the message with the value of the Dataflow channel.

Specified by:
getValAsync in interface DataflowReadChannel<T>
Overrides:
getValAsync in class DataflowStreamReadAdapter<T>
Parameters:
attachment - arbitrary non-null attachment if reader needs better identification of result
callback - An actor to send the bound value plus the supplied index to.

rightShift

public <V> Promise<V> rightShift(groovy.lang.Closure<V> closure)
Description copied from interface: DataflowReadChannel
Schedule closure to be executed after data became available. It is important to notice that even if the expression is already bound the execution of closure will not happen immediately but will be scheduled

rightShift() redefines the >> operator so you can write df >> {println it} instead of df.whenBound{println it}

Specified by:
rightShift in interface DataflowReadChannel<T>
Overrides:
rightShift in class DataflowStreamReadAdapter<T>
Parameters:
closure - closure to execute when data becomes available. The closure should take at most one argument.
Returns:
A promise for the results of the supplied closure. This allows for chaining of then() method calls.

whenBound

public <V> void whenBound(groovy.lang.Closure<V> closure)
Description copied from interface: DataflowReadChannel
Schedule closure to be executed after data becomes available. It is important to notice that even if the expression is already bound the execution of closure will not happen immediately but will be scheduled.

Specified by:
whenBound in interface DataflowReadChannel<T>
Overrides:
whenBound in class DataflowStreamReadAdapter<T>
Parameters:
closure - closure to execute when data becomes available. The closure should take at most one argument.

whenBound

public <V> void whenBound(Pool pool,
                          groovy.lang.Closure<V> closure)
Schedule closure to be executed by pooled actor after data becomes available. It is important to notice that even if the expression is already bound the execution of closure will not happen immediately but will be scheduled.

Specified by:
whenBound in interface DataflowReadChannel<T>
Overrides:
whenBound in class DataflowStreamReadAdapter<T>
Parameters:
pool - The thread pool to use for task scheduling for asynchronous message delivery
closure - closure to execute when data becomes available. The closure should take at most one argument.

whenBound

public <V> void whenBound(PGroup group,
                          groovy.lang.Closure<V> closure)
Description copied from interface: DataflowReadChannel
Schedule closure to be executed after data becomes available. It is important to notice that even if the expression is already bound the execution of closure will not happen immediately but will be scheduled.

Specified by:
whenBound in interface DataflowReadChannel<T>
Overrides:
whenBound in class DataflowStreamReadAdapter<T>
Parameters:
group - The PGroup to use for task scheduling for asynchronous message delivery
closure - closure to execute when data becomes available. The closure should take at most one argument.

whenBound

public void whenBound(MessageStream stream)
Description copied from interface: DataflowReadChannel
Send the bound data to provided stream when it becomes available

Specified by:
whenBound in interface DataflowReadChannel<T>
Overrides:
whenBound in class DataflowStreamReadAdapter<T>
Parameters:
stream - stream where to send result

wheneverBound

public <V> void wheneverBound(groovy.lang.Closure<V> closure)
Description copied from interface: DataflowReadChannel
Send all pieces of data bound in the future to the provided stream when it becomes available. *

Specified by:
wheneverBound in interface DataflowReadChannel<T>
Overrides:
wheneverBound in class DataflowStreamReadAdapter<T>
Parameters:
closure - closure to execute when data becomes available. The closure should take at most one argument.

wheneverBound

public void wheneverBound(MessageStream stream)
Description copied from interface: DataflowReadChannel
Send all pieces of data bound in the future to the provided stream when it becomes available.

Specified by:
wheneverBound in interface DataflowReadChannel<T>
Overrides:
wheneverBound in class DataflowStreamReadAdapter<T>
Parameters:
stream - stream where to send result

chainWith

public <V> DataflowReadChannel<V> chainWith(PGroup group,
                                            groovy.lang.Closure<V> closure)
Description copied from interface: DataflowReadChannel
Creates and attaches a new operator processing values from the channel

Specified by:
chainWith in interface DataflowReadChannel<T>
Overrides:
chainWith in class DataflowStreamReadAdapter<T>
Type Parameters:
V - The type of values returned from the supplied closure
Parameters:
group - The PGroup to use
closure - The function to invoke on all incoming values as part of the new operator's body
Returns:
A channel of the same type as this channel, which the new operator will output into.

tap

public DataflowReadChannel<T> tap(PGroup group,
                                  DataflowWriteChannel<T> target)
Description copied from interface: DataflowReadChannel
Taps into the pipeline. The supplied channel will receive a copy of all messages passed through.

Specified by:
tap in interface DataflowReadChannel<T>
Overrides:
tap in class DataflowStreamReadAdapter<T>
Parameters:
group - The PGroup to use
target - The channel to tap data into
Returns:
A channel of the same type as this channel, which the new operator will output into.

merge

public <V> DataflowReadChannel<V> merge(PGroup group,
                                        java.util.List<DataflowReadChannel<java.lang.Object>> others,
                                        groovy.lang.Closure<V> closure)
Description copied from interface: DataflowReadChannel
Merges channels together as inputs for a single dataflow operator.

Specified by:
merge in interface DataflowReadChannel<T>
Overrides:
merge in class DataflowStreamReadAdapter<T>
Type Parameters:
V - The type of values passed between the channels
Parameters:
group - The PGroup to use
others - The channels to merge with
closure - The function to invoke on all incoming values as part of the new operator's body. The number of arguments to the closure must match the number of input channels.
Returns:
A channel of the same type as this channel, which the new operator will output into.

isBound

public boolean isBound()
Description copied from interface: DataflowReadChannel
Check if value has been set already for this expression

Specified by:
isBound in interface DataflowReadChannel<T>
Overrides:
isBound in class DataflowStreamReadAdapter<T>
Returns:
true if bound already

poll

public DataflowExpression<T> poll()
                           throws java.lang.InterruptedException
Description copied from interface: DataflowReadChannel
Retrieves the value at the head of the buffer. Returns null, if no value is available.

Specified by:
poll in interface DataflowReadChannel<T>
Overrides:
poll in class DataflowStreamReadAdapter<T>
Returns:
The value bound to the DFV at the head of the stream or null
Throws:
java.lang.InterruptedException - If the current thread is interrupted

checkClosed

private void checkClosed()

close

void close()
     throws java.lang.InterruptedException
Closes the channel so that it cannot be used any longer

Throws:
java.lang.InterruptedException - When the thread gets interrupted

Copyright © 2008–2012 Václav Pech. All Rights Reserved.