public abstract class SequentialProcessingActor extends ReplyingMessageStream implements java.lang.Runnable
| Modifier and Type | Class and Description |
|---|---|
private static class |
SequentialProcessingActor.Node
Represents an element in the message queue.
|
MessageStream.RemoteMessageStream, MessageStream.SendTo| Modifier and Type | Field and Description |
|---|---|
private SequentialProcessingActor.Node |
inputQueue
Stored incoming messages.
|
private static java.util.concurrent.atomic.AtomicReferenceFieldUpdater<SequentialProcessingActor,SequentialProcessingActor.Node> |
inputQueueUpdater |
private java.util.concurrent.atomic.AtomicBoolean |
ongoingThreadTermination |
private SequentialProcessingActor.Node |
outputQueue
Stores messages ready for processing by the actor.
|
protected static int |
S_ACTIVE_MASK |
protected static int |
S_FINISHED_MASK |
protected static int |
S_FINISHING_MASK |
protected static int |
S_NOT_STARTED |
protected static int |
S_RUNNING |
protected static int |
S_STOP_TERMINATE_MASK |
protected static int |
S_STOPPED |
protected static int |
S_STOPPING |
protected static int |
S_TERMINATED |
protected static int |
S_TERMINATING |
private static long |
serialVersionUID |
private static java.lang.String |
SHOULD_NOT_REACH_HERE |
protected int |
stopFlag
Indicates whether the actor should terminate
|
protected static java.util.concurrent.atomic.AtomicIntegerFieldUpdater<SequentialProcessingActor> |
stopFlagUpdater |
private java.lang.Thread |
waitingThread |
ACTOR_HAS_ALREADY_BEEN_STARTED, CANNOT_SEND_REPLIES_NO_SENDER_HAS_BEEN_REGISTERED, currentThread, parallelGroup, START_MESSAGE, STOP_MESSAGE, TERMINATE_MESSAGE, TIMEOUT, TIMEOUT_MESSAGE, timerserialHandle| Modifier | Constructor and Description |
|---|---|
protected |
SequentialProcessingActor()
Creates a new instance, sets the default actor group.
|
| Modifier and Type | Method and Description |
|---|---|
protected abstract void |
act()
This method represents the body of the actor.
|
private ActorMessage |
awaitNextMessage(long endTime)
Holds common functionality for takeMessage() methods.
|
protected void |
checkStopTerminate() |
protected void |
handleTermination() |
protected boolean |
hasBeenStopped() |
boolean |
isActive()
Checks the current status of the Actor.
|
protected ActorMessage |
pollMessage()
Polls a message from the queues
|
protected java.lang.Object |
receive()
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
|
protected java.lang.Object |
receive(groovy.time.BaseDuration duration)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
|
protected java.lang.Object |
receive(long timeout,
java.util.concurrent.TimeUnit units)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
|
protected abstract java.lang.Object |
receiveImpl()
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
|
protected abstract java.lang.Object |
receiveImpl(long timeout,
java.util.concurrent.TimeUnit units)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
|
private ActorMessage |
retrieveNextMessage()
Takes the next message from the outputQueue, decrements the counter and possibly throws control exceptions
|
void |
run() |
MessageStream |
send(java.lang.Object message)
Send message to stream and return immediately
|
void |
setParallelGroup(PGroup group)
Sets the actor's group.
|
Actor |
silentStart()
Starts the Actor without sending the START_MESSAGE message to speed the start-up.
|
SequentialProcessingActor |
start()
Starts the Actor.
|
Actor |
stop()
Send message to stop to the actor.
|
protected ActorMessage |
sweepNextMessage()
Removes the head of the message queue
|
protected ActorMessage |
takeMessage()
Takes a message from the queues.
|
protected ActorMessage |
takeMessage(long timeout,
java.util.concurrent.TimeUnit timeUnit)
Takes a message from the queues.
|
Actor |
terminate()
Terminate the Actor.
|
private void |
throwIfNeeded(ActorMessage toProcess)
Checks the supplied message and throws either STOP or TERMINATE, if the message is a Stop or Terminate message respectively.
|
private void |
transferQueues()
Transfers messages from the input queue into the output queue, reverting the order of the elements.
|
private static java.lang.Object |
unwrapMessage(java.lang.Object msg) |
getSender, reply, replyIfExists, setSendercreateActorMessage, createRemoteHandle, deregisterCurrentActorWithThread, getJoinLatch, getParallelGroup, handleException, handleInterrupt, handleStart, handleTimeout, isActorThread, join, join, join, join, onStop, registerCurrentActorWithThread, sendAndContinue, sendAndPromise, threadBoundActorcall, getRemoteClass, leftShift, reInterrupt, send, send, sendAndWait, sendAndWait, sendAndWaitgetOrCreateSerialHandle, writeReplaceprivate static final long serialVersionUID
private volatile SequentialProcessingActor.Node inputQueue
private SequentialProcessingActor.Node outputQueue
private final java.util.concurrent.atomic.AtomicBoolean ongoingThreadTermination
private static final java.util.concurrent.atomic.AtomicReferenceFieldUpdater<SequentialProcessingActor,SequentialProcessingActor.Node> inputQueueUpdater
private volatile java.lang.Thread waitingThread
protected static final int S_ACTIVE_MASK
protected static final int S_FINISHING_MASK
protected static final int S_FINISHED_MASK
protected static final int S_STOP_TERMINATE_MASK
protected static final int S_NOT_STARTED
protected static final int S_RUNNING
protected static final int S_STOPPING
protected static final int S_TERMINATING
protected static final int S_STOPPED
protected static final int S_TERMINATED
protected volatile int stopFlag
protected static final java.util.concurrent.atomic.AtomicIntegerFieldUpdater<SequentialProcessingActor> stopFlagUpdater
private static final java.lang.String SHOULD_NOT_REACH_HERE
protected SequentialProcessingActor()
public final boolean isActive()
private void throwIfNeeded(ActorMessage toProcess)
toProcess - The next message to process by the actorsprotected final ActorMessage pollMessage()
protected final ActorMessage takeMessage() throws java.lang.InterruptedException
java.lang.InterruptedException - If the thread gets interrupted.protected ActorMessage takeMessage(long timeout, java.util.concurrent.TimeUnit timeUnit) throws java.lang.InterruptedException
timeout - Max time to wait for a messagetimeUnit - The units for the timeoutjava.lang.InterruptedException - If the thread gets interrupted.private ActorMessage awaitNextMessage(long endTime) throws java.lang.InterruptedException
endTime - End of the timeout, 0 if no timeout was setjava.lang.InterruptedException - If the thread has been interruptedprivate ActorMessage retrieveNextMessage()
private void transferQueues()
public final void setParallelGroup(PGroup group)
setParallelGroup in class Actorgroup - new grouppublic final MessageStream send(java.lang.Object message)
MessageStreamsend in class MessageStreammessage - message to sendprotected final boolean hasBeenStopped()
hasBeenStopped in class Actorprotected void handleTermination()
handleTermination in class Actorpublic Actor silentStart()
ActorsilentStart in class Actorpublic final SequentialProcessingActor start()
public final Actor stop()
public final Actor terminate()
protected abstract java.lang.Object receiveImpl()
throws java.lang.InterruptedException
java.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.protected abstract java.lang.Object receiveImpl(long timeout,
java.util.concurrent.TimeUnit units)
throws java.lang.InterruptedException
timeout - how long to wait before giving up, in units of unitunits - a TimeUnit determining how to interpret the timeout parameterjava.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.protected final java.lang.Object receive()
throws java.lang.InterruptedException
java.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.protected final java.lang.Object receive(long timeout,
java.util.concurrent.TimeUnit units)
throws java.lang.InterruptedException
timeout - how long to wait before giving up, in units of unitunits - a TimeUnit determining how to interpret the timeout parameterjava.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.private static java.lang.Object unwrapMessage(java.lang.Object msg)
protected final java.lang.Object receive(groovy.time.BaseDuration duration)
throws java.lang.InterruptedException
duration - how long to wait before giving up, in units of unitjava.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.protected final ActorMessage sweepNextMessage()
sweepNextMessage in class Actorprotected abstract void act()
Actor's
message handling code.public void run()
run in interface java.lang.Runnableprotected final void checkStopTerminate()