groovyx.gpars.dataflow.stream
Class StreamCore<T>

java.lang.Object
  extended by groovyx.gpars.dataflow.stream.StreamCore<T>
Type Parameters:
T - Type for values to pass through the channels
All Implemented Interfaces:
FList<T>, java.lang.Iterable<T>
Direct Known Subclasses:
DataflowStream, SyncDataflowStream

public abstract class StreamCore<T>
extends java.lang.Object
implements FList<T>

Represents a common base for publish-subscribe deterministic channels.

Author:
Johannes Link, Vaclav Pech

Field Summary
protected  DataflowVariable<T> first
           
protected  java.util.concurrent.atomic.AtomicReference<StreamCore<T>> rest
           
protected  java.util.Collection<MessageStream> wheneverBoundListeners
          A collection of listeners who need to be informed each time the stream is bound to a value
 
Constructor Summary
protected StreamCore(DataflowVariable<T> first)
          Creates an empty stream
protected StreamCore(DataflowVariable<T> first, groovy.lang.Closure toBeApplied)
          Creates a stream while applying the supplied initialization closure to it
protected StreamCore(DataflowVariable<T> first, java.util.Collection<MessageStream> wheneverBoundListeners, java.util.Collection<DataflowChannelListener<T>> updateListeners)
           
 
Method Summary
(package private)  void addUpdateListener(DataflowChannelListener<T> updateListener)
           
private  void addUpdateListeners(java.util.Collection<DataflowChannelListener<T>> updateListeners)
           
 java.lang.String appendingString()
           
 StreamCore<T> apply(groovy.lang.Closure closure)
          Calls the supplied closure with the stream as a parameter
private  void bind(T value)
           
protected abstract  StreamCore<T> createNewStream()
          A factory method to create new instances of the correct class when needed
 void decrementParties()
          Decreases the number of parties required to perform the data exchange
static
<T> T
eos()
           
 boolean equals(java.lang.Object obj)
           
private static
<T> T
eval(java.lang.Object valueOrDataflowVariable)
           
 FList<T> filter(groovy.lang.Closure filterClosure)
          Builds a filtered stream using the supplied filter closure
private  void filter(StreamCore<T> rest, groovy.lang.Closure filterClosure, StreamCore<T> result)
           
 StreamCore<T> generate(T seed, groovy.lang.Closure generator, groovy.lang.Closure condition)
          Populates the stream with generated values
private  void generateNext(T value, StreamCore<T> stream, groovy.lang.Closure generator, groovy.lang.Closure condition)
           
 T getFirst()
          Retrieved the first element in the stream, blocking until a value is available
(package private)  DataflowVariable<T> getFirstDFV()
           
abstract  FList<T> getRest()
          Retrieves a DataflowStream representing the rest of this Stream after removing the first element
 int hashCode()
           
private  DataflowExpression<T> hookWheneverBoundListeners(DataflowExpression<T> expr)
          Hooks the registered when bound handlers to the supplied dataflow expression
 void incrementParties()
          Increases the number of parties required to perform the data exchange
 boolean isEmpty()
          Indicates, whether the first element in the stream is an eos
 java.util.Iterator<T> iterator()
          Builds an iterator to iterate over the stream
 StreamCore<T> leftShift(DataflowReadChannel<T> ref)
          Adds a dataflow variable value to the stream, once the value is available
 StreamCore<T> leftShift(T value)
          Adds a value to the stream
 FList<java.lang.Object> map(groovy.lang.Closure mapClosure)
          Builds a modified stream using the supplied map closure
private  void map(FList<T> rest, groovy.lang.Closure mapClosure, StreamCore<java.lang.Object> result)
           
 T reduce(groovy.lang.Closure reduceClosure)
          Reduces all elements in the stream using the supplied closure
 T reduce(T seed, groovy.lang.Closure reduceClosure)
          Reduces all elements in the stream using the supplied closure
private  T reduce(T current, FList<T> rest, groovy.lang.Closure reduceClosure)
           
 void wheneverBound(groovy.lang.Closure closure)
           
 void wheneverBound(MessageStream stream)
           
 
Methods inherited from class java.lang.Object
clone, finalize, getClass, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

first

protected final DataflowVariable<T> first

rest

protected final java.util.concurrent.atomic.AtomicReference<StreamCore<T>> rest

wheneverBoundListeners

protected final java.util.Collection<MessageStream> wheneverBoundListeners
A collection of listeners who need to be informed each time the stream is bound to a value

Constructor Detail

StreamCore

protected StreamCore(DataflowVariable<T> first)
Creates an empty stream

Parameters:
first - The variable to store as the head of the stream

StreamCore

protected StreamCore(DataflowVariable<T> first,
                     groovy.lang.Closure toBeApplied)
Creates a stream while applying the supplied initialization closure to it

Parameters:
first - The variable to store as the head of the stream
toBeApplied - The closure to use for initialization

StreamCore

protected StreamCore(DataflowVariable<T> first,
                     java.util.Collection<MessageStream> wheneverBoundListeners,
                     java.util.Collection<DataflowChannelListener<T>> updateListeners)
Method Detail

addUpdateListeners

private void addUpdateListeners(java.util.Collection<DataflowChannelListener<T>> updateListeners)

addUpdateListener

final void addUpdateListener(DataflowChannelListener<T> updateListener)

eos

public static <T> T eos()

eval

private static <T> T eval(java.lang.Object valueOrDataflowVariable)

generate

public final StreamCore<T> generate(T seed,
                                    groovy.lang.Closure generator,
                                    groovy.lang.Closure condition)
Populates the stream with generated values

Parameters:
seed - The initial element to evaluate and add as the first value of the stream
generator - A closure generating stream elements from the previous values
condition - A closure indicating whether the generation should continue based on the last generated value
Returns:
This stream

generateNext

private void generateNext(T value,
                          StreamCore<T> stream,
                          groovy.lang.Closure generator,
                          groovy.lang.Closure condition)

apply

public final StreamCore<T> apply(groovy.lang.Closure closure)
Calls the supplied closure with the stream as a parameter

Parameters:
closure - The closure to call
Returns:
This instance of DataflowStream

leftShift

public final StreamCore<T> leftShift(DataflowReadChannel<T> ref)
Adds a dataflow variable value to the stream, once the value is available

Parameters:
ref - The DataflowVariable to check for value
Returns:
The rest of the stream

leftShift

public final StreamCore<T> leftShift(T value)
Adds a value to the stream

Parameters:
value - The value to add
Returns:
The rest of the stream

bind

private void bind(T value)

getFirstDFV

final DataflowVariable<T> getFirstDFV()

getFirst

public final T getFirst()
Retrieved the first element in the stream, blocking until a value is available

Specified by:
getFirst in interface FList<T>
Returns:
The first item in the stream

getRest

public abstract FList<T> getRest()
Retrieves a DataflowStream representing the rest of this Stream after removing the first element

Specified by:
getRest in interface FList<T>
Returns:
The remaining stream elements

isEmpty

public final boolean isEmpty()
Indicates, whether the first element in the stream is an eos

Specified by:
isEmpty in interface FList<T>

filter

public final FList<T> filter(groovy.lang.Closure filterClosure)
Builds a filtered stream using the supplied filter closure

Specified by:
filter in interface FList<T>
Parameters:
filterClosure - The closure to decide on inclusion of elements
Returns:
The first item of the filtered stream

filter

private void filter(StreamCore<T> rest,
                    groovy.lang.Closure filterClosure,
                    StreamCore<T> result)

map

public final FList<java.lang.Object> map(groovy.lang.Closure mapClosure)
Builds a modified stream using the supplied map closure

Specified by:
map in interface FList<T>
Parameters:
mapClosure - The closure to transform elements
Returns:
The first item of the transformed stream

map

private void map(FList<T> rest,
                 groovy.lang.Closure mapClosure,
                 StreamCore<java.lang.Object> result)

reduce

public final T reduce(groovy.lang.Closure reduceClosure)
Reduces all elements in the stream using the supplied closure

Specified by:
reduce in interface FList<T>
Parameters:
reduceClosure - The closure to reduce elements of the stream gradually into an accumulator. The accumulator is seeded with the first stream element.
Returns:
The result of reduction of the whole stream

reduce

public final T reduce(T seed,
                      groovy.lang.Closure reduceClosure)
Reduces all elements in the stream using the supplied closure

Specified by:
reduce in interface FList<T>
Parameters:
reduceClosure - The closure to reduce elements of the stream gradually into an accumulator.
seed - The value to initialize the accumulator with.
Returns:
The result of reduction of the whole stream

reduce

private T reduce(T current,
                 FList<T> rest,
                 groovy.lang.Closure reduceClosure)

iterator

public final java.util.Iterator<T> iterator()
Builds an iterator to iterate over the stream

Specified by:
iterator in interface java.lang.Iterable<T>
Returns:
A new FListIterator instance

appendingString

public java.lang.String appendingString()
Specified by:
appendingString in interface FList<T>

equals

public boolean equals(java.lang.Object obj)
Overrides:
equals in class java.lang.Object

hashCode

public int hashCode()
Overrides:
hashCode in class java.lang.Object

createNewStream

protected abstract StreamCore<T> createNewStream()
A factory method to create new instances of the correct class when needed

Returns:
An instance of the appropriate sub-class

wheneverBound

public final void wheneverBound(groovy.lang.Closure closure)

wheneverBound

public final void wheneverBound(MessageStream stream)

hookWheneverBoundListeners

private DataflowExpression<T> hookWheneverBoundListeners(DataflowExpression<T> expr)
Hooks the registered when bound handlers to the supplied dataflow expression

Parameters:
expr - The expression to hook all the when bound listeners to
Returns:
The supplied expression handler to allow method chaining

incrementParties

public void incrementParties()
Increases the number of parties required to perform the data exchange


decrementParties

public void decrementParties()
Decreases the number of parties required to perform the data exchange


Copyright © 2008–2012 Václav Pech. All Rights Reserved.