Class SyncDataflowVariable<T>

  extended by groovyx.gpars.serial.WithSerialId
      extended by groovyx.gpars.dataflow.expression.DataflowExpression<T>
          extended by groovyx.gpars.dataflow.DataflowVariable<T>
              extended by groovyx.gpars.dataflow.SyncDataflowVariable<T>
All Implemented Interfaces:
groovy.lang.GroovyObject, DataflowChannel<T>, DataflowReadChannel<T>, DataflowWriteChannel<T>, Promise<T>,

public final class SyncDataflowVariable<T>
extends DataflowVariable<T>

A synchronous variant of DataflowVariable, which blocks the writer as well as the readers. The synchronous variable ensures a specified number of readers must ask for a value before the writer as well as the readers can continue.

Vaclav Pech
See Also:
Field Summary
private static java.lang.String ERROR_READING_A_SYNCHRONOUS_CHANNEL
private  ResizeableCountDownLatch parties
Constructor Summary
          Creates a new variable, which will never block writers.
SyncDataflowVariable(int parties)
          Creates a new variable blocking the specified number of readers.
Method Summary
 boolean awaitingParties()
          Reports whether the variable is still waiting for parties to arrive for the rendezvous.
private  void awaitParties()
 void decrementParties()
          Decreases the number of parties required to perform data exchange by one
protected  void doBindImpl(T value)
 T getVal()
          Reads the value of the variable.
 T getVal(long timeout, java.util.concurrent.TimeUnit units)
          Reads the value of the variable.
<V> DataflowReadChannel<V>
chainWith(PGroup group, groovy.lang.Closure<V> closure)
          Creates and attaches a new operator processing values from the channel
 void incrementParties()
          Increases the number of parties required to perform data exchange by one.
<V> DataflowReadChannel<V>
merge(PGroup group, java.util.List<DataflowReadChannel<java.lang.Object>> others, groovy.lang.Closure<V> closure)
          Merges channels together as inputs for a single dataflow operator.
private  void readerIsReady()
private  boolean readerIsReady(long timeout)
(package private)  boolean shouldThrowTimeout()
protected  void scheduleCallback(java.lang.Object attachment, MessageStream callback)
          Sends the result back to the actor, which is waiting asynchronously for the value to be bound.
 DataflowReadChannel<T> tap(PGroup group, DataflowWriteChannel<T> target)
          Taps into the pipeline.
Field Detail


private static final java.lang.String ERROR_READING_A_SYNCHRONOUS_CHANNEL
See Also:
private final ResizeableCountDownLatch parties
Constructor Detail


public SyncDataflowVariable()
Creates a new variable, which will never block writers.


public SyncDataflowVariable(int parties)
Creates a new variable blocking the specified number of readers.

parties - Number of readers that have to match a writer before the message gets transferred
Method Detail


protected void doBindImpl(T value)
doBindImpl in class DataflowExpression<T>


public T getVal()
         throws java.lang.InterruptedException
Reads the value of the variable. Blocks, if the value has not been assigned yet.

Specified by:
getVal in interface DataflowReadChannel<T>
getVal in class DataflowExpression<T>
The actual value
java.lang.InterruptedException - If the current thread gets interrupted while waiting for the variable to be bound


public T getVal(long timeout,
                java.util.concurrent.TimeUnit units)
         throws java.lang.InterruptedException
Reads the value of the variable. Blocks up to given timeout, if the value has not been assigned yet.

Specified by:
getVal in interface DataflowReadChannel<T>
getVal in class DataflowExpression<T>
timeout - The timeout value
units - Units for the timeout
The actual value
java.lang.InterruptedException - If the current thread gets interrupted while waiting for the variable to be bound


boolean shouldThrowTimeout()
shouldThrowTimeout in class DataflowVariable<T>


public boolean awaitingParties()
Reports whether the variable is still waiting for parties to arrive for the rendezvous.

True if not all parties have shown yet


public <V> DataflowReadChannel<V> chainWith(PGroup group,
                                            groovy.lang.Closure<V> closure)
Description copied from interface: DataflowReadChannel
Creates and attaches a new operator processing values from the channel

Specified by:
chainWith in interface DataflowReadChannel<T>
chainWith in class DataflowExpression<T>
Type Parameters:
V - The type of values returned from the supplied closure
group - The PGroup to use
closure - The function to invoke on all incoming values as part of the new operator's body
A channel of the same type as this channel, which the new operator will output into.


public DataflowReadChannel<T> tap(PGroup group,
                                  DataflowWriteChannel<T> target)
Description copied from interface: DataflowReadChannel
Taps into the pipeline. The supplied channel will receive a copy of all messages passed through.

Specified by:
tap in interface DataflowReadChannel<T>
tap in class DataflowExpression<T>
group - The PGroup to use
target - The channel to tap data into
A channel of the same type as this channel, which the new operator will output into.


public <V> DataflowReadChannel<V> merge(PGroup group,
                                        java.util.List<DataflowReadChannel<java.lang.Object>> others,
                                        groovy.lang.Closure<V> closure)
Description copied from interface: DataflowReadChannel
Merges channels together as inputs for a single dataflow operator.

Specified by:
merge in interface DataflowReadChannel<T>
merge in class DataflowExpression<T>
Type Parameters:
V - The type of values passed between the channels
group - The PGroup to use
others - The channels to merge with
closure - The function to invoke on all incoming values as part of the new operator's body. The number of arguments to the closure must match the number of input channels.
A channel of the same type as this channel, which the new operator will output into.


protected void scheduleCallback(java.lang.Object attachment,
                                MessageStream callback)
Description copied from class: DataflowExpression
Sends the result back to the actor, which is waiting asynchronously for the value to be bound. The message will either be a map holding the attachment under the 'attachment' key and the actual bound value under the 'result' key, or it will be the result itself if the callback doesn't care about the index.

scheduleCallback in class DataflowExpression<T>
attachment - An arbitrary object identifying the request
callback - The actor to send the message to


private void readerIsReady()


private void awaitParties()


private boolean readerIsReady(long timeout)


public void incrementParties()
Increases the number of parties required to perform data exchange by one.


public void decrementParties()
Decreases the number of parties required to perform data exchange by one

