groovyx.gpars.dataflow
Class SyncDataflowVariable<T>

java.lang.Object
  extended by groovyx.gpars.serial.WithSerialId
      extended by groovyx.gpars.dataflow.expression.DataflowExpression<T>
          extended by groovyx.gpars.dataflow.DataflowVariable<T>
              extended by groovyx.gpars.dataflow.SyncDataflowVariable<T>
All Implemented Interfaces:
groovy.lang.GroovyObject, DataflowChannel<T>, DataflowReadChannel<T>, DataflowWriteChannel<T>, Promise<T>, java.io.Serializable

public final class SyncDataflowVariable<T>
extends DataflowVariable<T>

A synchronous variant of DataflowVariable, which blocks the writer as well as the readers. The synchronous variable ensures a specified number of readers must ask for a value before the writer as well as the readers can continue.

Author:
Vaclav Pech
See Also:
Serialized Form

Nested Class Summary
 
Nested classes/interfaces inherited from class groovyx.gpars.dataflow.DataflowVariable
DataflowVariable.RemoteDataflowVariable<T>
 
Nested classes/interfaces inherited from class groovyx.gpars.dataflow.expression.DataflowExpression
DataflowExpression.BindDataflow<T>
 
Field Summary
private static java.lang.String ERROR_READING_A_SYNCHRONOUS_CHANNEL
           
private  ResizeableCountDownLatch parties
           
 
Fields inherited from class groovyx.gpars.dataflow.expression.DataflowExpression
error, S_INITIALIZED, S_INITIALIZING, S_NOT_INITIALIZED, state, value
 
Fields inherited from class groovyx.gpars.serial.WithSerialId
serialHandle
 
Constructor Summary
SyncDataflowVariable()
          Creates a new variable, which will never block writers.
SyncDataflowVariable(int parties)
          Creates a new variable blocking the specified number of readers.
 
Method Summary
 boolean awaitingParties()
          Reports whether the variable is still waiting for parties to arrive for the rendezvous.
private  void awaitParties()
           
 void decrementParties()
          Decreases the number of parties required to perform data exchange by one
protected  void doBindImpl(T value)
           
 T getVal()
          Reads the value of the variable.
 T getVal(long timeout, java.util.concurrent.TimeUnit units)
          Reads the value of the variable.
<V> DataflowReadChannel<V>
chainWith(PGroup group, groovy.lang.Closure<V> closure)
          Creates and attaches a new operator processing values from the channel
 void incrementParties()
          Increases the number of parties required to perform data exchange by one.
<V> DataflowReadChannel<V>
merge(PGroup group, java.util.List<DataflowReadChannel<java.lang.Object>> others, groovy.lang.Closure<V> closure)
          Merges channels together as inputs for a single dataflow operator.
private  void readerIsReady()
           
private  boolean readerIsReady(long timeout)
           
(package private)  boolean shouldThrowTimeout()
           
protected  void scheduleCallback(java.lang.Object attachment, MessageStream callback)
          Sends the result back to the actor, which is waiting asynchronously for the value to be bound.
 DataflowReadChannel<T> tap(PGroup group, DataflowWriteChannel<T> target)
          Taps into the pipeline.
 
Methods inherited from class groovyx.gpars.dataflow.DataflowVariable
get, get, getError, getRemoteClass, isError, leftShift, leftShift, then, then, then
 
Methods inherited from class groovyx.gpars.dataflow.expression.DataflowExpression
binaryChoice, binaryChoice, binaryChoice, binaryChoice, binaryChoice, binaryChoice, bind, bindError, bindSafely, bindUnique, doBindRemote, evaluate, filter, filter, filter, filter, filter, filter, getEventManager, getMetaClass, getProperty, getValAsync, getValAsync, chainWith, chainWith, chainWith, chainWith, chainWith, choice, choice, choice, choice, choice, choice, into, into, into, into, into, into, invokeMethod, isBound, join, join, length, merge, merge, merge, merge, merge, merge, merge, merge, merge, merge, merge, or, or, poll, rightShift, separate, separate, separate, separate, separate, separate, setMetaClass, setProperty, split, split, split, split, split, split, split, split, split, split, split, split, subscribe, subscribe, tap, tap, tap, tap, tap, then, then, then, toString, transform, whenBound, whenBound, whenBound, whenBound, wheneverBound, wheneverBound
 
Methods inherited from class groovyx.gpars.serial.WithSerialId
createRemoteHandle, getOrCreateSerialHandle, writeReplace
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface groovyx.gpars.dataflow.DataflowReadChannel
binaryChoice, binaryChoice, binaryChoice, binaryChoice, binaryChoice, binaryChoice, filter, filter, filter, filter, filter, filter, getEventManager, getValAsync, getValAsync, chainWith, chainWith, chainWith, chainWith, chainWith, choice, choice, choice, choice, choice, choice, into, into, into, into, into, into, isBound, length, merge, merge, merge, merge, merge, merge, merge, merge, merge, merge, merge, or, or, poll, rightShift, separate, separate, separate, separate, separate, separate, split, split, split, split, split, split, split, split, split, split, split, split, tap, tap, tap, tap, tap, then, then, then, whenBound, whenBound, whenBound, whenBound, wheneverBound, wheneverBound
 
Methods inherited from interface groovyx.gpars.dataflow.DataflowWriteChannel
bind
 
Methods inherited from interface groovyx.gpars.dataflow.Promise
isBound, join, join, rightShift, then, then, then, whenBound, whenBound, whenBound, whenBound
 

Field Detail

ERROR_READING_A_SYNCHRONOUS_CHANNEL

private static final java.lang.String ERROR_READING_A_SYNCHRONOUS_CHANNEL
See Also:
Constant Field Values

parties

private final ResizeableCountDownLatch parties
Constructor Detail

SyncDataflowVariable

public SyncDataflowVariable()
Creates a new variable, which will never block writers.


SyncDataflowVariable

public SyncDataflowVariable(int parties)
Creates a new variable blocking the specified number of readers.

Parameters:
parties - Number of readers that have to match a writer before the message gets transferred
Method Detail

doBindImpl

protected void doBindImpl(T value)
Overrides:
doBindImpl in class DataflowExpression<T>

getVal

public T getVal()
         throws java.lang.InterruptedException
Reads the value of the variable. Blocks, if the value has not been assigned yet.

Specified by:
getVal in interface DataflowReadChannel<T>
Overrides:
getVal in class DataflowExpression<T>
Returns:
The actual value
Throws:
java.lang.InterruptedException - If the current thread gets interrupted while waiting for the variable to be bound

getVal

public T getVal(long timeout,
                java.util.concurrent.TimeUnit units)
         throws java.lang.InterruptedException
Reads the value of the variable. Blocks up to given timeout, if the value has not been assigned yet.

Specified by:
getVal in interface DataflowReadChannel<T>
Overrides:
getVal in class DataflowExpression<T>
Parameters:
timeout - The timeout value
units - Units for the timeout
Returns:
The actual value
Throws:
java.lang.InterruptedException - If the current thread gets interrupted while waiting for the variable to be bound

shouldThrowTimeout

boolean shouldThrowTimeout()
Overrides:
shouldThrowTimeout in class DataflowVariable<T>

awaitingParties

public boolean awaitingParties()
Reports whether the variable is still waiting for parties to arrive for the rendezvous.

Returns:
True if not all parties have shown yet

chainWith

public <V> DataflowReadChannel<V> chainWith(PGroup group,
                                            groovy.lang.Closure<V> closure)
Description copied from interface: DataflowReadChannel
Creates and attaches a new operator processing values from the channel

Specified by:
chainWith in interface DataflowReadChannel<T>
Overrides:
chainWith in class DataflowExpression<T>
Type Parameters:
V - The type of values returned from the supplied closure
Parameters:
group - The PGroup to use
closure - The function to invoke on all incoming values as part of the new operator's body
Returns:
A channel of the same type as this channel, which the new operator will output into.

tap

public DataflowReadChannel<T> tap(PGroup group,
                                  DataflowWriteChannel<T> target)
Description copied from interface: DataflowReadChannel
Taps into the pipeline. The supplied channel will receive a copy of all messages passed through.

Specified by:
tap in interface DataflowReadChannel<T>
Overrides:
tap in class DataflowExpression<T>
Parameters:
group - The PGroup to use
target - The channel to tap data into
Returns:
A channel of the same type as this channel, which the new operator will output into.

merge

public <V> DataflowReadChannel<V> merge(PGroup group,
                                        java.util.List<DataflowReadChannel<java.lang.Object>> others,
                                        groovy.lang.Closure<V> closure)
Description copied from interface: DataflowReadChannel
Merges channels together as inputs for a single dataflow operator.

Specified by:
merge in interface DataflowReadChannel<T>
Overrides:
merge in class DataflowExpression<T>
Type Parameters:
V - The type of values passed between the channels
Parameters:
group - The PGroup to use
others - The channels to merge with
closure - The function to invoke on all incoming values as part of the new operator's body. The number of arguments to the closure must match the number of input channels.
Returns:
A channel of the same type as this channel, which the new operator will output into.

scheduleCallback

protected void scheduleCallback(java.lang.Object attachment,
                                MessageStream callback)
Description copied from class: DataflowExpression
Sends the result back to the actor, which is waiting asynchronously for the value to be bound. The message will either be a map holding the attachment under the 'attachment' key and the actual bound value under the 'result' key, or it will be the result itself if the callback doesn't care about the index.

Overrides:
scheduleCallback in class DataflowExpression<T>
Parameters:
attachment - An arbitrary object identifying the request
callback - The actor to send the message to

readerIsReady

private void readerIsReady()

awaitParties

private void awaitParties()

readerIsReady

private boolean readerIsReady(long timeout)

incrementParties

public void incrementParties()
Increases the number of parties required to perform data exchange by one.


decrementParties

public void decrementParties()
Decreases the number of parties required to perform data exchange by one


Copyright © 2008–2012 Václav Pech. All Rights Reserved.