groovyx.gpars.dataflow.stream
Class DataflowStream<T>

java.lang.Object
  extended by groovyx.gpars.dataflow.stream.DataflowStream<T>
Type Parameters:
T - Type for values to pass through the stream
All Implemented Interfaces:
FList<T>, java.lang.Iterable<T>

public final class DataflowStream<T>
extends java.lang.Object
implements FList<T>

Represents a deterministic dataflow channel. Unlike a DataflowQueue, DataflowStream allows multiple readers each to read all the messages. Essentially, you may think of DataflowStream as a 1 to many communication channel, since when a reader consumes a messages, other readers will still be able to read the message. Also, all messages arrive to all readers in the same order. DataflowStream is implemented as a functional queue, which impacts the API in that users have to traverse the values in the stream themselves. On the other hand in offers handy methods for value filtering or transformation together with interesting performance characteristics. For convenience and for the ability to use DataflowStream with other dataflow constructs, like e.g. operators, you can wrap DataflowStreams with DataflowReadAdapter for read access or DataflowWriteAdapter for write access.

The DataflowStream class is designed for single-threaded producers and consumers. If multiple threads are supposed to read or write values to the stream, their access to the stream must be serialized externally or the adapters should be used.

Author:
Johannes Link, Vaclav Pech

Field Summary
private  DataflowVariable<T> first
           
private  java.util.concurrent.atomic.AtomicReference<DataflowStream<T>> rest
           
private  java.util.Collection<MessageStream> wheneverBoundListeners
          A collection of listeners who need to be informed each time the stream is bound to a value
 
Constructor Summary
  DataflowStream()
          Creates an empty stream
  DataflowStream(groovy.lang.Closure toBeApplied)
          Creates a stream while applying the supplied initialization closure to it
private DataflowStream(java.util.Collection<MessageStream> wheneverBoundListeners)
           
 
Method Summary
 java.lang.String appendingString()
           
 DataflowStream<T> apply(groovy.lang.Closure closure)
          Calls the supplied closure with the stream as a parameter
private  void bind(T value)
           
static
<T> T
eos()
           
 boolean equals(java.lang.Object obj)
           
private static
<T> T
eval(java.lang.Object valueOrDataflowVariable)
           
 FList<T> filter(groovy.lang.Closure filterClosure)
          Builds a filtered stream using the supplied filter closure
private  void filter(DataflowStream<T> rest, groovy.lang.Closure filterClosure, DataflowStream<T> result)
           
 DataflowStream<T> generate(T seed, groovy.lang.Closure generator, groovy.lang.Closure condition)
          Populates the stream with generated values
private  void generateNext(T value, DataflowStream<T> stream, groovy.lang.Closure generator, groovy.lang.Closure condition)
           
 T getFirst()
          Retrieved the first element in the stream, blocking until a value is available
(package private)  DataflowVariable<T> getFirstDFV()
           
 FList<T> getRest()
          Retrieves a DataflowStream representing the rest of this Stream after removing the first element
 int hashCode()
           
private  DataflowExpression<T> hookWheneverBoundListeners(DataflowExpression<T> expr)
          Hooks the registered when bound handlers to the supplied dataflow expression
 boolean isEmpty()
          Indicates, whether the first element in the stream is an eos
 java.util.Iterator<T> iterator()
          Builds an iterator to iterate over the stream
 DataflowStream<T> leftShift(DataflowReadChannel<T> ref)
          Adds a dataflow variable value to the stream, once the value is available
 DataflowStream<T> leftShift(T value)
          Adds a value to the stream
 FList<java.lang.Object> map(groovy.lang.Closure mapClosure)
          Builds a modified stream using the supplied map closure
private  void map(FList<T> rest, groovy.lang.Closure mapClosure, DataflowStream result)
           
 T reduce(groovy.lang.Closure reduceClosure)
          Reduces all elements in the stream using the supplied closure
 T reduce(T seed, groovy.lang.Closure reduceClosure)
          Reduces all elements in the stream using the supplied closure
private  T reduce(T current, FList<T> rest, groovy.lang.Closure reduceClosure)
           
 java.lang.String toString()
           
 void wheneverBound(groovy.lang.Closure closure)
           
 void wheneverBound(MessageStream stream)
           
 
Methods inherited from class java.lang.Object
clone, finalize, getClass, notify, notifyAll, wait, wait, wait
 

Field Detail

first

private final DataflowVariable<T> first

rest

private final java.util.concurrent.atomic.AtomicReference<DataflowStream<T>> rest

wheneverBoundListeners

private final java.util.Collection<MessageStream> wheneverBoundListeners
A collection of listeners who need to be informed each time the stream is bound to a value

Constructor Detail

DataflowStream

public DataflowStream()
Creates an empty stream


DataflowStream

public DataflowStream(groovy.lang.Closure toBeApplied)
Creates a stream while applying the supplied initialization closure to it

Parameters:
toBeApplied - The closure to use for initialization

DataflowStream

private DataflowStream(java.util.Collection<MessageStream> wheneverBoundListeners)
Method Detail

eos

public static <T> T eos()

eval

private static <T> T eval(java.lang.Object valueOrDataflowVariable)

generate

public DataflowStream<T> generate(T seed,
                                  groovy.lang.Closure generator,
                                  groovy.lang.Closure condition)
Populates the stream with generated values

Parameters:
seed - The initial element to evaluate and add as the first value of the stream
generator - A closure generating stream elements from the previous values
condition - A closure indicating whether the generation should continue based on the last generated value
Returns:
This stream

generateNext

private void generateNext(T value,
                          DataflowStream<T> stream,
                          groovy.lang.Closure generator,
                          groovy.lang.Closure condition)

apply

public final DataflowStream<T> apply(groovy.lang.Closure closure)
Calls the supplied closure with the stream as a parameter

Parameters:
closure - The closure to call
Returns:
This instance of DataflowStream

leftShift

public DataflowStream<T> leftShift(DataflowReadChannel<T> ref)
Adds a dataflow variable value to the stream, once the value is available

Parameters:
ref - The DataflowVariable to check for value
Returns:
The rest of the stream

leftShift

public DataflowStream<T> leftShift(T value)
Adds a value to the stream

Parameters:
value - The value to add
Returns:
The rest of the stream

bind

private void bind(T value)

getFirstDFV

DataflowVariable<T> getFirstDFV()

getFirst

public T getFirst()
Retrieved the first element in the stream, blocking until a value is available

Specified by:
getFirst in interface FList<T>
Returns:
The first item in the stream

getRest

public FList<T> getRest()
Retrieves a DataflowStream representing the rest of this Stream after removing the first element

Specified by:
getRest in interface FList<T>
Returns:
The remaining stream elements

isEmpty

public boolean isEmpty()
Indicates, whether the first element in the stream is an eos

Specified by:
isEmpty in interface FList<T>

filter

public FList<T> filter(groovy.lang.Closure filterClosure)
Builds a filtered stream using the supplied filter closure

Specified by:
filter in interface FList<T>
Parameters:
filterClosure - The closure to decide on inclusion of elements
Returns:
The first item of the filtered stream

filter

private void filter(DataflowStream<T> rest,
                    groovy.lang.Closure filterClosure,
                    DataflowStream<T> result)

map

public FList<java.lang.Object> map(groovy.lang.Closure mapClosure)
Builds a modified stream using the supplied map closure

Specified by:
map in interface FList<T>
Parameters:
mapClosure - The closure to transform elements
Returns:
The first item of the transformed stream

map

private void map(FList<T> rest,
                 groovy.lang.Closure mapClosure,
                 DataflowStream result)

reduce

public T reduce(groovy.lang.Closure reduceClosure)
Reduces all elements in the stream using the supplied closure

Specified by:
reduce in interface FList<T>
Parameters:
reduceClosure - The closure to reduce elements of the stream gradually into an accumulator. The accumulator is seeded with the first stream element.
Returns:
The result of reduction of the whole stream

reduce

public T reduce(T seed,
                groovy.lang.Closure reduceClosure)
Reduces all elements in the stream using the supplied closure

Specified by:
reduce in interface FList<T>
Parameters:
reduceClosure - The closure to reduce elements of the stream gradually into an accumulator.
seed - The value to initialize the accumulator with.
Returns:
The result of reduction of the whole stream

reduce

private T reduce(T current,
                 FList<T> rest,
                 groovy.lang.Closure reduceClosure)

iterator

public java.util.Iterator<T> iterator()
Builds an iterator to iterate over the stream

Specified by:
iterator in interface java.lang.Iterable<T>
Returns:
A new FListIterator instance

toString

public java.lang.String toString()
Overrides:
toString in class java.lang.Object

appendingString

public java.lang.String appendingString()
Specified by:
appendingString in interface FList<T>

equals

public boolean equals(java.lang.Object obj)
Overrides:
equals in class java.lang.Object

hashCode

public int hashCode()
Overrides:
hashCode in class java.lang.Object

wheneverBound

public void wheneverBound(groovy.lang.Closure closure)

wheneverBound

public void wheneverBound(MessageStream stream)

hookWheneverBoundListeners

private DataflowExpression<T> hookWheneverBoundListeners(DataflowExpression<T> expr)
Hooks the registered when bound handlers to the supplied dataflow expression

Parameters:
expr - The expression to hook all the when bound listeners to
Returns:
The supplied expression handler to allow method chaining

Copyright © 2008–2010 Václav Pech. All Rights Reserved.