org.codehaus.gpars

groovyx.gpars.actor.impl
Class SequentialProcessingActor

java.lang.Object
  groovyx.gpars.serial.WithSerialId
      groovyx.gpars.actor.impl.MessageStream
          groovyx.gpars.actor.Actor
              groovyx.gpars.actor.impl.ReplyingMessageStream
                  groovyx.gpars.actor.impl.SequentialProcessingActor
All Implemented Interfaces:
java.lang.Runnable

@Deprecated
@SuppressWarnings({"UnqualifiedStaticUsage"})
class SequentialProcessingActor
extends ReplyingMessageStream

Authors:
Alex Tkachman, Vaclav Pech


Nested Class Summary
class SequentialProcessingActor.MultiMessageReaction

Enables multiple argument closures to be passed to react().

class SequentialProcessingActor.Node

Represents an element in the message queue.

class SequentialProcessingActor.Reaction

Buffers messages for the next continuation of an event-driven actor, handles timeouts and no-param continuations.

 
Field Summary
private static java.lang.String ERROR_EVALUATING_LOOP_CONDITION

private static java.lang.String SHOULD_NOT_REACH_HERE

protected static int S_ACTIVE_MASK

protected static int S_FINISHED_MASK

protected static int S_FINISHING_MASK

protected static int S_NOT_STARTED

protected static int S_RUNNING

protected static int S_STOPPED

protected static int S_STOPPING

protected static int S_STOP_TERMINATE_MASK

protected static int S_TERMINATED

protected static int S_TERMINATING

protected groovy.lang.Closure afterLoopCode

private int count

Counter of messages in the queues

private static java.util.concurrent.atomic.AtomicIntegerFieldUpdater countUpdater

private SequentialProcessingActor.Node inputQueue

Stored incoming messages.

private static java.util.concurrent.atomic.AtomicReferenceFieldUpdater inputQueueUpdater

protected java.lang.Runnable loopCode

Code for the loop, if any

protected java.util.concurrent.Callable loopCondition

private static ActorMessage loopMessage

private java.util.concurrent.atomic.AtomicBoolean ongoingThreadTermination

private SequentialProcessingActor.Node outputQueue

Stores messages ready for processing by the actor.

private SequentialProcessingActor.Reaction reaction

Code for the next action

private static long serialVersionUID

private static ActorMessage startMessage

protected int stopFlag

Indicates whether the actor should terminate

protected static java.util.concurrent.atomic.AtomicIntegerFieldUpdater stopFlagUpdater

private java.lang.Thread waitingThread

 
Fields inherited from class ReplyingMessageStream
obj2Sender
 
Fields inherited from class Actor
ACTOR_HAS_ALREADY_BEEN_STARTED, START_MESSAGE, STOP_MESSAGE, TERMINATE_MESSAGE, TIMEOUT_MESSAGE, currentThread, parallelGroup, timer
 
Constructor Summary
protected SequentialProcessingActor()

Creates a new instance, sets the default actor group.

 
Method Summary
private ActorMessage awaitNextMessage(long endTime)

Holds common functionality for takeMessage() methods.

protected void checkStopTerminate()

private void doLoopCall()

private ActorMessage getMessage()

Retrieves the next message from the queue

protected void handleTermination()

protected boolean hasBeenStopped()

boolean isActive()

Checks the current status of the Actor.

protected void loop(java.lang.Runnable code)

Ensures that the supplied closure will be invoked repeatedly in a loop.

protected void loop(int numberOfLoops, java.lang.Runnable code)

Ensures that the supplied closure will be invoked repeatedly in a loop.

protected void loop(int numberOfLoops, groovy.lang.Closure afterLoopCode, java.lang.Runnable code)

Ensures that the supplied closure will be invoked repeatedly in a loop.

protected void loop(groovy.lang.Closure condition, java.lang.Runnable code)

Ensures that the supplied closure will be invoked repeatedly in a loop.

protected void loop(groovy.lang.Closure condition, groovy.lang.Closure afterLoopCode, java.lang.Runnable code)

Ensures that the supplied closure will be invoked repeatedly in a loop.

private void loop(java.util.concurrent.Callable condition, groovy.lang.Closure afterLoopCode, java.lang.Runnable code)

Ensures that the supplied closure will be invoked repeatedly in a loop.

protected ActorMessage pollMessage()

Polls a message from the queues

protected void react(groovy.time.Duration duration, groovy.lang.Closure code)

Schedules an ActorAction to take the next message off the message queue and to pass it on to the supplied closure.

protected void react(groovy.lang.Closure code)

Schedules an ActorAction to take the next message off the message queue and to pass it on to the supplied closure.

protected void react(long timeout, java.util.concurrent.TimeUnit timeUnit, groovy.lang.Closure code)

Schedules an ActorAction to take the next message off the message queue and to pass it on to the supplied closure.

protected void react(long timeout, groovy.lang.Closure code)

Schedules an ActorAction to take the next message off the message queue and to pass it on to the supplied closure.

protected java.lang.Object receive()

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected java.lang.Object receive(long timeout, java.util.concurrent.TimeUnit units)

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected java.lang.Object receive(groovy.time.BaseDuration duration)

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected java.lang.Object receiveImpl()

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected java.lang.Object receiveImpl(long timeout, java.util.concurrent.TimeUnit units)

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

private ActorMessage retrieveNextMessage()

Takes the next message from the outputQueue, decrements the counter and possibly throws control exceptions

void run()

void runReaction(ActorMessage message, groovy.lang.Closure code)

private void schedule()

Schedules the current actor for processing on the actor group's thread pool.

protected void scheduleLoop()

MessageStream send(java.lang.Object message)

void setParallelGroup(PGroup group)

Sets the actor's group.

Actor silentStart()

SequentialProcessingActor start()

Starts the Actor.

Actor stop()

Send message to stop to the actor.

protected ActorMessage sweepNextMessage()

Removes the head of the message queue

protected ActorMessage takeMessage()

Takes a message from the queues.

protected ActorMessage takeMessage(long timeout, java.util.concurrent.TimeUnit timeUnit)

Takes a message from the queues.

Actor terminate()

Terminate the Actor.

private void throwIfNeeded(ActorMessage toProcess)

Checks the supplied message and throws either STOP or TERMINATE, if the message is a Stop or Terminate message respectively.

private void transferQueues()

Transfers messages from the input queue into the output queue, reverting the order of the elements.

private static java.lang.Object unwrapMessage(java.lang.Object msg)

private static boolean verifyLoopCondition(java.util.concurrent.Callable condition)

 
Methods inherited from class ReplyingMessageStream
getSender, getSenders, reply, replyIfExists, runEnhancedWithRepliesOnMessages
 
Methods inherited from class Actor
createActorMessage, createRemoteHandle, deregisterCurrentActorWithThread, getJoinLatch, getParallelGroup, handleException, handleInterrupt, handleStart, handleTermination, handleTimeout, hasBeenStopped, isActive, isActorThread, join, join, join, join, onStop, registerCurrentActorWithThread, sendAndContinue, setParallelGroup, silentStart, start, stop, sweepNextMessage, sweepQueue, terminate, threadBoundActor
 
Methods inherited from class MessageStream
call, getRemoteClass, leftShift, reInterrupt, send, send, send, sendAndWait, sendAndWait, sendAndWait
 
Methods inherited from class WithSerialId
createRemoteHandle, getOrCreateSerialHandle, getRemoteClass, writeReplace
 

Field Detail

ERROR_EVALUATING_LOOP_CONDITION

private static final java.lang.String ERROR_EVALUATING_LOOP_CONDITION


SHOULD_NOT_REACH_HERE

private static final java.lang.String SHOULD_NOT_REACH_HERE


S_ACTIVE_MASK

protected static final int S_ACTIVE_MASK


S_FINISHED_MASK

protected static final int S_FINISHED_MASK


S_FINISHING_MASK

protected static final int S_FINISHING_MASK


S_NOT_STARTED

protected static final int S_NOT_STARTED


S_RUNNING

protected static final int S_RUNNING


S_STOPPED

protected static final int S_STOPPED


S_STOPPING

protected static final int S_STOPPING


S_STOP_TERMINATE_MASK

protected static final int S_STOP_TERMINATE_MASK


S_TERMINATED

protected static final int S_TERMINATED


S_TERMINATING

protected static final int S_TERMINATING


afterLoopCode

protected groovy.lang.Closure afterLoopCode


count

@SuppressWarnings({"UnusedDeclaration"})
    //modified through countUpdater
private int count
Counter of messages in the queues


countUpdater

private static final java.util.concurrent.atomic.AtomicIntegerFieldUpdater countUpdater


inputQueue

@SuppressWarnings({"UnusedDeclaration"})
    //modified through inputQueryUpdater
private SequentialProcessingActor.Node inputQueue
Stored incoming messages. The most recently received message is in the head of the list.


inputQueueUpdater

private static final java.util.concurrent.atomic.AtomicReferenceFieldUpdater inputQueueUpdater


loopCode

protected java.lang.Runnable loopCode
Code for the loop, if any


loopCondition

protected java.util.concurrent.Callable loopCondition


loopMessage

private static final ActorMessage loopMessage


ongoingThreadTermination

private final java.util.concurrent.atomic.AtomicBoolean ongoingThreadTermination


outputQueue

private SequentialProcessingActor.Node outputQueue
Stores messages ready for processing by the actor. The oldest message is in the head of the list. Messages are transferred from the inputQueue into the output queue in the transferQueues() method.


reaction

private SequentialProcessingActor.Reaction reaction
Code for the next action


serialVersionUID

private static final long serialVersionUID


startMessage

private static final ActorMessage startMessage


stopFlag

protected int stopFlag
Indicates whether the actor should terminate


stopFlagUpdater

protected static final java.util.concurrent.atomic.AtomicIntegerFieldUpdater stopFlagUpdater


waitingThread

private java.lang.Thread waitingThread


 
Constructor Detail

SequentialProcessingActor

protected SequentialProcessingActor()
Creates a new instance, sets the default actor group.


 
Method Detail

awaitNextMessage

private ActorMessage awaitNextMessage(long endTime)
Holds common functionality for takeMessage() methods.
throws:
InterruptedException If the thread has been interrupted
Parameters:
endTime - End of the timeout, 0 if no timeout was set
Returns:
The next message


checkStopTerminate

protected final void checkStopTerminate()


doLoopCall

private void doLoopCall()


getMessage

private ActorMessage getMessage()
Retrieves the next message from the queue
Returns:
The message


handleTermination

@Override
protected void handleTermination()


hasBeenStopped

@Override
protected final boolean hasBeenStopped()


isActive

@Override
public final boolean isActive()
Checks the current status of the Actor.


loop

protected final void loop(java.lang.Runnable code)
Ensures that the supplied closure will be invoked repeatedly in a loop. The method never returns, but instead frees the processing thread back to the thread pool.
Parameters:
code - The closure to invoke repeatedly


loop

protected final void loop(int numberOfLoops, java.lang.Runnable code)
Ensures that the supplied closure will be invoked repeatedly in a loop. The method never returns, but instead frees the processing thread back to the thread pool.
Parameters:
numberOfLoops - The loop will only be run the given number of times
code - The closure to invoke repeatedly


loop

protected final void loop(int numberOfLoops, groovy.lang.Closure afterLoopCode, java.lang.Runnable code)
Ensures that the supplied closure will be invoked repeatedly in a loop. The method never returns, but instead frees the processing thread back to the thread pool.
Parameters:
numberOfLoops - The loop will only be run the given number of times
afterLoopCode - Code to run after the main actor's loop finishes
code - The closure to invoke repeatedly


loop

protected final void loop(groovy.lang.Closure condition, java.lang.Runnable code)
Ensures that the supplied closure will be invoked repeatedly in a loop. The method never returns, but instead frees the processing thread back to the thread pool.
Parameters:
condition - A condition to evaluate before each iteration starts. If the condition returns false, the loop exits.
code - The closure to invoke repeatedly


loop

protected final void loop(groovy.lang.Closure condition, groovy.lang.Closure afterLoopCode, java.lang.Runnable code)
Ensures that the supplied closure will be invoked repeatedly in a loop. The method never returns, but instead frees the processing thread back to the thread pool.
Parameters:
condition - A condition to evaluate before each iteration starts. If the condition returns false, the loop exits.
afterLoopCode - Code to run after the main actor's loop finishes
code - The closure to invoke repeatedly


loop

private void loop(java.util.concurrent.Callable condition, groovy.lang.Closure afterLoopCode, java.lang.Runnable code)
Ensures that the supplied closure will be invoked repeatedly in a loop. The method never returns, but instead frees the processing thread back to the thread pool.
Parameters:
condition - A condition to evaluate before each iteration starts. If the condition returns false, the loop exits.
afterLoopCode - Code to run after the main actor's loop finishes
code - The closure to invoke repeatedly


pollMessage

protected final ActorMessage pollMessage()
Polls a message from the queues
Returns:
The message


react

protected final void react(groovy.time.Duration duration, groovy.lang.Closure code)
Schedules an ActorAction to take the next message off the message queue and to pass it on to the supplied closure. The method never returns, but instead frees the processing thread back to the thread pool.
Parameters:
duration - Time to wait at most for a message to arrive. The actor terminates if a message doesn't arrive within the given timeout. The TimeCategory DSL to specify timeouts must be enabled explicitly inside the Actor's act() method.
code - The code to handle the next message. The reply() and replyIfExists() methods are available inside the closure to send a reply back to the actor, which sent the original message.


react

protected final void react(groovy.lang.Closure code)
Schedules an ActorAction to take the next message off the message queue and to pass it on to the supplied closure. The method never returns, but instead frees the processing thread back to the thread pool.
Parameters:
code - The code to handle the next message. The reply() and replyIfExists() methods are available inside the closure to send a reply back to the actor, which sent the original message.


react

protected final void react(long timeout, java.util.concurrent.TimeUnit timeUnit, groovy.lang.Closure code)
Schedules an ActorAction to take the next message off the message queue and to pass it on to the supplied closure. The method never returns, but instead frees the processing thread back to the thread pool.
Parameters:
timeout - Time in milliseconds to wait at most for a message to arrive. The actor terminates if a message doesn't arrive within the given timeout.
timeUnit - a TimeUnit determining how to interpret the timeout parameter
code - The code to handle the next message. The reply() and replyIfExists() methods are available inside the closure to send a reply back to the actor, which sent the original message.


react

protected final void react(long timeout, groovy.lang.Closure code)
Schedules an ActorAction to take the next message off the message queue and to pass it on to the supplied closure. The method never returns, but instead frees the processing thread back to the thread pool. Also adds reply() and replyIfExists() methods to the currentActor and the message. These methods will call send() on the target actor (the sender of the original message). The reply()/replyIfExists() methods invoked on the actor will be sent to all currently processed messages, reply()/replyIfExists() invoked on a message will send a reply to the sender of that particular message only.
Parameters:
timeout - Time in milliseconds to wait at most for a message to arrive. The actor terminates if a message doesn't arrive within the given timeout.
code - The code to handle the next message. The reply() and replyIfExists() methods are available inside the closure to send a reply back to the actor, which sent the original message.


receive

protected final java.lang.Object receive()
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.
Returns:
The message retrieved from the queue, or null, if the timeout expires.


receive

protected final java.lang.Object receive(long timeout, java.util.concurrent.TimeUnit units)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.
Parameters:
timeout - how long to wait before giving up, in units of unit
units - a TimeUnit determining how to interpret the timeout parameter
Returns:
The message retrieved from the queue, or null, if the timeout expires.


receive

protected final java.lang.Object receive(groovy.time.BaseDuration duration)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.
Parameters:
duration - how long to wait before giving up, in units of unit
Returns:
The message retrieved from the queue, or null, if the timeout expires.


receiveImpl

protected java.lang.Object receiveImpl()
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.
Returns:
The message retrieved from the queue, or null, if the timeout expires.


receiveImpl

protected java.lang.Object receiveImpl(long timeout, java.util.concurrent.TimeUnit units)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.
Parameters:
timeout - how long to wait before giving up, in units of unit
units - a TimeUnit determining how to interpret the timeout parameter
Returns:
The message retrieved from the queue, or null, if the timeout expires.


retrieveNextMessage

private ActorMessage retrieveNextMessage()
Takes the next message from the outputQueue, decrements the counter and possibly throws control exceptions
Returns:
The next message


run

@Override
@SuppressWarnings({"ThrowCaughtLocally", "OverlyLongMethod"})
public void run()


runReaction

public final void runReaction(ActorMessage message, groovy.lang.Closure code)


schedule

private void schedule()
Schedules the current actor for processing on the actor group's thread pool.


scheduleLoop

protected void scheduleLoop()


send

@Override
public final MessageStream send(java.lang.Object message)


setParallelGroup

@Override
public final void setParallelGroup(PGroup group)
Sets the actor's group. It can only be invoked before the actor is started.
Parameters:
group - new group


silentStart

@Override
public Actor silentStart()


start

@Override
public final SequentialProcessingActor start()
Starts the Actor. No messages can be send or received before an Actor is started.
Returns:
this (the actor itself) to allow method chaining


stop

@Override
public final Actor stop()
Send message to stop to the actor. All messages in queue will be processed before stopped but no new messages will be accepted after that point
Returns:
this (the actor itself) to allow method chaining


sweepNextMessage

@Override
protected final ActorMessage sweepNextMessage()
Removes the head of the message queue
Returns:
The head message, or null, if the message queue is empty


takeMessage

protected final ActorMessage takeMessage()
Takes a message from the queues. Blocks until a message is available.
throws:
InterruptedException If the thread gets interrupted.
Returns:
The message


takeMessage

protected ActorMessage takeMessage(long timeout, java.util.concurrent.TimeUnit timeUnit)
Takes a message from the queues. Blocks until a message is available.
throws:
InterruptedException If the thread gets interrupted.
Parameters:
timeout - Max time to wait for a message
timeUnit - The units for the timeout
Returns:
The message


terminate

@Override
public final Actor terminate()
Terminate the Actor. The background thread will be interrupted, unprocessed messages will be passed to the afterStop method, if exists. Has no effect if the Actor is not started.
Returns:
this (the actor itself) to allow method chaining


throwIfNeeded

private void throwIfNeeded(ActorMessage toProcess)
Checks the supplied message and throws either STOP or TERMINATE, if the message is a Stop or Terminate message respectively.
Parameters:
toProcess - The next message to process by the actors


transferQueues

private void transferQueues()
Transfers messages from the input queue into the output queue, reverting the order of the elements.


unwrapMessage

private static java.lang.Object unwrapMessage(java.lang.Object msg)


verifyLoopCondition

private static boolean verifyLoopCondition(java.util.concurrent.Callable condition)


 

Copyright © 2008–2010 Václav Pech. All Rights Reserved.