groovyx.gpars.dataflow.stream
Class SyncDataflowStream<T>

java.lang.Object
  extended by groovyx.gpars.dataflow.stream.StreamCore<T>
      extended by groovyx.gpars.dataflow.stream.SyncDataflowStream<T>
Type Parameters:
T - Type for values to pass through the stream
All Implemented Interfaces:
FList<T>, java.lang.Iterable<T>

public final class SyncDataflowStream<T>
extends StreamCore<T>

Represents a synchronous deterministic dataflow channel. Unlike a SyncDataflowQueue, syncDataflowStream allows multiple readers each to read all the messages. Essentially, you may think of SyncDataflowStream as a 1 to many communication channel, since when a reader consumes a messages, other readers will still be able to read the message. Also, all messages arrive to all readers in the same order. SyncDataflowStream is implemented as a functional queue, which impacts the API in that users have to traverse the values in the stream themselves. On the other hand in offers handy methods for value filtering or transformation together with interesting performance characteristics. For convenience and for the ability to use SyncDataflowStream with other dataflow constructs, like e.g. operators, you can wrap SyncDataflowStreams with DataflowReadAdapter for read access or DataflowWriteAdapter for write access.

The SyncDataflowStream class is designed for single-threaded producers and consumers. If multiple threads are supposed to read or write values to the stream, their access to the stream must be serialized externally or the adapters should be used.

SyncDataflowStream uses SyncDataflowVariables to preform the actual data exchange. Unlike DataflowStream, which exchanges data in asynchronous manner, SyncDataflowStream is synchronous. The writer as well as the readers are blocked until all the required parties become ready for the data exchange. Writers can thus never get too far ahead of readers and also all the readers themselves are always processing the same message in parallel and wait for one-another before getting the next one.

Author:
Vaclav Pech

Field Summary
private  int parties
           
 
Fields inherited from class groovyx.gpars.dataflow.stream.StreamCore
first, rest, wheneverBoundListeners
 
Constructor Summary
  SyncDataflowStream(int parties)
          Creates an empty stream
  SyncDataflowStream(int parties, groovy.lang.Closure toBeApplied)
          Creates an empty stream while applying the supplied initialization closure to it
private SyncDataflowStream(int parties, java.util.Collection<MessageStream> wheneverBoundListeners, java.util.Collection<DataflowChannelListener<T>> updateListeners)
          Creates an empty stream with the specified listeners set
 
Method Summary
 java.lang.String appendingString()
           
protected  StreamCore<T> createNewStream()
          A factory method to create new instances of the correct class when needed
 void decrementParties()
          Decreases the number of parties required to perform the data exchange
 FList<T> getRest()
          Retrieves a DataflowStream representing the rest of this Stream after removing the first element
 void incrementParties()
          Increases the number of parties required to perform the data exchange
private  boolean isEmptyWithRespectToSync()
           
 java.lang.String toString()
           
 
Methods inherited from class groovyx.gpars.dataflow.stream.StreamCore
addUpdateListener, apply, eos, equals, filter, generate, getFirst, getFirstDFV, hashCode, isEmpty, iterator, leftShift, leftShift, map, reduce, reduce, wheneverBound, wheneverBound
 
Methods inherited from class java.lang.Object
clone, finalize, getClass, notify, notifyAll, wait, wait, wait
 

Field Detail

parties

private int parties
Constructor Detail

SyncDataflowStream

public SyncDataflowStream(int parties)
Creates an empty stream

Parameters:
parties - The number of readers to ask for a value before the message gets exchanged.

SyncDataflowStream

public SyncDataflowStream(int parties,
                          groovy.lang.Closure toBeApplied)
Creates an empty stream while applying the supplied initialization closure to it

Parameters:
parties - The number of readers to ask for a value before the message gets exchanged.
toBeApplied - The closure to use for initialization

SyncDataflowStream

private SyncDataflowStream(int parties,
                           java.util.Collection<MessageStream> wheneverBoundListeners,
                           java.util.Collection<DataflowChannelListener<T>> updateListeners)
Creates an empty stream with the specified listeners set

Parameters:
parties - The number of readers to ask for a value before the message gets exchanged.
wheneverBoundListeners - The collection of listeners to bind to the stream
Method Detail

getRest

public FList<T> getRest()
Retrieves a DataflowStream representing the rest of this Stream after removing the first element

Specified by:
getRest in interface FList<T>
Specified by:
getRest in class StreamCore<T>
Returns:
The remaining stream elements

createNewStream

protected StreamCore<T> createNewStream()
A factory method to create new instances of the correct class when needed

Specified by:
createNewStream in class StreamCore<T>
Returns:
An instance of the appropriate sub-class

appendingString

public java.lang.String appendingString()
Specified by:
appendingString in interface FList<T>
Overrides:
appendingString in class StreamCore<T>

toString

public java.lang.String toString()
Overrides:
toString in class java.lang.Object

isEmptyWithRespectToSync

private boolean isEmptyWithRespectToSync()

incrementParties

public void incrementParties()
Increases the number of parties required to perform the data exchange

Overrides:
incrementParties in class StreamCore<T>

decrementParties

public void decrementParties()
Decreases the number of parties required to perform the data exchange

Overrides:
decrementParties in class StreamCore<T>

Copyright © 2008–2012 Václav Pech. All Rights Reserved.