|
org.codehaus.gpars | |||||||
FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object groovyx.gpars.dataflow.stream.StreamCore groovyx.gpars.dataflow.stream.SyncDataflowStream
@java.lang.SuppressWarnings/** public final class SyncDataflowStream extends StreamCore
Represents a synchronous deterministic dataflow channel. Unlike a SyncDataflowQueue, syncDataflowStream allows multiple readers each to read all the messages. Essentially, you may think of SyncDataflowStream as a 1 to many communication channel, since when a reader consumes a messages, other readers will still be able to read the message. Also, all messages arrive to all readers in the same order. SyncDataflowStream is implemented as a functional queue, which impacts the API in that users have to traverse the values in the stream themselves. On the other hand in offers handy methods for value filtering or transformation together with interesting performance characteristics. For convenience and for the ability to use SyncDataflowStream with other dataflow constructs, like e.g. operators, you can wrap SyncDataflowStreams with DataflowReadAdapter for read access or DataflowWriteAdapter for write access.
The SyncDataflowStream class is designed for single-threaded producers and consumers. If multiple threads are supposed to read or write values to the stream, their access to the stream must be serialized externally or the adapters should be used.
SyncDataflowStream uses SyncDataflowVariables to preform the actual data exchange. Unlike DataflowStream, which exchanges data in asynchronous manner, SyncDataflowStream is synchronous. The writer as well as the readers are blocked until all the required parties become ready for the data exchange. Writers can thus never get too far ahead of readers and also all the readers themselves are always processing the same message in parallel and wait for one-another before getting the next one.
- Type for values to pass through the streamField Summary | |
---|---|
private int |
parties
|
Fields inherited from class StreamCore | |
---|---|
first, rest, wheneverBoundListeners |
Constructor Summary | |
SyncDataflowStream(int parties)
|
|
SyncDataflowStream(int parties, groovy.lang.Closure toBeApplied)
Creates an empty stream with the specified listeners set |
|
private SyncDataflowStream(int parties, java.util.Collection wheneverBoundListeners, java.util.Collection updateListeners)
Retrieves a DataflowStream representing the rest of this Stream after removing the first element |
Method Summary | |
---|---|
java.lang.String
|
appendingString()
|
protected StreamCore
|
createNewStream()
|
void
|
decrementParties()
|
FList
|
getRest()
A factory method to create new instances of the correct class when needed |
void
|
incrementParties()
Decreases the number of parties required to perform the data exchange |
private boolean
|
isEmptyWithRespectToSync()
Increases the number of parties required to perform the data exchange |
java.lang.String
|
toString()
|
Methods inherited from class StreamCore | |
---|---|
addUpdateListener, addUpdateListeners, appendingString, apply, bind, createNewStream, decrementParties, eos, equals, eval, filter, filter, generate, generateNext, getFirst, getFirstDFV, getRest, hashCode, hookWheneverBoundListeners, incrementParties, isEmpty, iterator, leftShift, leftShift, map, map, reduce, reduce, reduce, wheneverBound, wheneverBound |
Field Detail |
---|
private int parties
Constructor Detail |
---|
public SyncDataflowStream(int parties)
public SyncDataflowStream(int parties, groovy.lang.Closure toBeApplied)
parties
- The number of readers to ask for a value before the message gets exchanged.wheneverBoundListeners
- The collection of listeners to bind to the stream
private SyncDataflowStream(int parties, java.util.Collection wheneverBoundListeners, java.util.Collection updateListeners)
Method Detail |
---|
@java.lang.Overridereturn "SyncDataflowStream[" + getFirst() + getRest().appendingString() + ']'; public java.lang.String appendingString()
@java.lang.Overridepublic String toString() { protected StreamCore createNewStream()
@java.lang.Override public void decrementParties()
@java.lang.Override} public FList getRest()
@java.lang.Override} public void incrementParties()
private boolean isEmptyWithRespectToSync()
@java.lang.Override} public java.lang.String toString()
Copyright © 2008–2013 Václav Pech. All Rights Reserved.