public abstract class SequentialProcessingActor extends ReplyingMessageStream implements java.lang.Runnable
Modifier and Type | Class and Description |
---|---|
private static class |
SequentialProcessingActor.Node
Represents an element in the message queue.
|
Actor.MyRemoteHandle, Actor.RemoteActor
MessageStream.RemoteMessageStream, MessageStream.SendTo
Modifier and Type | Field and Description |
---|---|
private SequentialProcessingActor.Node |
inputQueue
Stored incoming messages.
|
private static java.util.concurrent.atomic.AtomicReferenceFieldUpdater<SequentialProcessingActor,SequentialProcessingActor.Node> |
inputQueueUpdater |
private java.util.concurrent.atomic.AtomicBoolean |
ongoingThreadTermination |
private SequentialProcessingActor.Node |
outputQueue
Stores messages ready for processing by the actor.
|
protected static int |
S_ACTIVE_MASK |
protected static int |
S_FINISHED_MASK |
protected static int |
S_FINISHING_MASK |
protected static int |
S_NOT_STARTED |
protected static int |
S_RUNNING |
protected static int |
S_STOP_TERMINATE_MASK |
protected static int |
S_STOPPED |
protected static int |
S_STOPPING |
protected static int |
S_TERMINATED |
protected static int |
S_TERMINATING |
private static long |
serialVersionUID |
private static java.lang.String |
SHOULD_NOT_REACH_HERE |
protected int |
stopFlag
Indicates whether the actor should terminate
|
protected static java.util.concurrent.atomic.AtomicIntegerFieldUpdater<SequentialProcessingActor> |
stopFlagUpdater |
private java.lang.Thread |
waitingThread |
ACTOR_HAS_ALREADY_BEEN_STARTED, CANNOT_SEND_REPLIES_NO_SENDER_HAS_BEEN_REGISTERED, currentThread, parallelGroup, START_MESSAGE, STOP_MESSAGE, TERMINATE_MESSAGE, TIMEOUT, TIMEOUT_MESSAGE, timer
serialHandle
Modifier | Constructor and Description |
---|---|
protected |
SequentialProcessingActor()
Creates a new instance, sets the default actor group.
|
Modifier and Type | Method and Description |
---|---|
protected abstract void |
act()
This method represents the body of the actor.
|
private ActorMessage |
awaitNextMessage(long endTime)
Holds common functionality for takeMessage() methods.
|
protected void |
checkStopTerminate() |
protected void |
handleTermination() |
protected boolean |
hasBeenStopped() |
boolean |
isActive()
Checks the current status of the Actor.
|
protected ActorMessage |
pollMessage()
Polls a message from the queues
|
protected java.lang.Object |
receive()
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
|
protected java.lang.Object |
receive(groovy.time.BaseDuration duration)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
|
protected java.lang.Object |
receive(long timeout,
java.util.concurrent.TimeUnit units)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
|
protected abstract java.lang.Object |
receiveImpl()
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
|
protected abstract java.lang.Object |
receiveImpl(long timeout,
java.util.concurrent.TimeUnit units)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
|
private ActorMessage |
retrieveNextMessage()
Takes the next message from the outputQueue, decrements the counter and possibly throws control exceptions
|
void |
run() |
MessageStream |
send(java.lang.Object message)
Send message to stream and return immediately
|
void |
setParallelGroup(PGroup group)
Sets the actor's group.
|
Actor |
silentStart()
Starts the Actor without sending the START_MESSAGE message to speed the start-up.
|
SequentialProcessingActor |
start()
Starts the Actor.
|
Actor |
stop()
Send message to stop to the actor.
|
protected ActorMessage |
sweepNextMessage()
Removes the head of the message queue
|
protected ActorMessage |
takeMessage()
Takes a message from the queues.
|
protected ActorMessage |
takeMessage(long timeout,
java.util.concurrent.TimeUnit timeUnit)
Takes a message from the queues.
|
Actor |
terminate()
Terminate the Actor.
|
private void |
throwIfNeeded(ActorMessage toProcess)
Checks the supplied message and throws either STOP or TERMINATE, if the message is a Stop or Terminate message respectively.
|
private void |
transferQueues()
Transfers messages from the input queue into the output queue, reverting the order of the elements.
|
private static java.lang.Object |
unwrapMessage(java.lang.Object msg) |
getSender, reply, replyIfExists, setSender
createActorMessage, createRemoteHandle, deregisterCurrentActorWithThread, getJoinLatch, getParallelGroup, handleException, handleInterrupt, handleStart, handleTimeout, isActorThread, join, join, join, join, onStop, registerCurrentActorWithThread, sendAndContinue, sendAndPromise, threadBoundActor
call, getRemoteClass, leftShift, reInterrupt, send, send, sendAndWait, sendAndWait, sendAndWait
getOrCreateSerialHandle, writeReplace
private static final long serialVersionUID
private volatile SequentialProcessingActor.Node inputQueue
private SequentialProcessingActor.Node outputQueue
private final java.util.concurrent.atomic.AtomicBoolean ongoingThreadTermination
private static final java.util.concurrent.atomic.AtomicReferenceFieldUpdater<SequentialProcessingActor,SequentialProcessingActor.Node> inputQueueUpdater
private volatile java.lang.Thread waitingThread
protected static final int S_ACTIVE_MASK
protected static final int S_FINISHING_MASK
protected static final int S_FINISHED_MASK
protected static final int S_STOP_TERMINATE_MASK
protected static final int S_NOT_STARTED
protected static final int S_RUNNING
protected static final int S_STOPPING
protected static final int S_TERMINATING
protected static final int S_STOPPED
protected static final int S_TERMINATED
protected volatile int stopFlag
protected static final java.util.concurrent.atomic.AtomicIntegerFieldUpdater<SequentialProcessingActor> stopFlagUpdater
private static final java.lang.String SHOULD_NOT_REACH_HERE
protected SequentialProcessingActor()
public final boolean isActive()
private void throwIfNeeded(ActorMessage toProcess)
toProcess
- The next message to process by the actorsprotected final ActorMessage pollMessage()
protected final ActorMessage takeMessage() throws java.lang.InterruptedException
java.lang.InterruptedException
- If the thread gets interrupted.protected ActorMessage takeMessage(long timeout, java.util.concurrent.TimeUnit timeUnit) throws java.lang.InterruptedException
timeout
- Max time to wait for a messagetimeUnit
- The units for the timeoutjava.lang.InterruptedException
- If the thread gets interrupted.private ActorMessage awaitNextMessage(long endTime) throws java.lang.InterruptedException
endTime
- End of the timeout, 0 if no timeout was setjava.lang.InterruptedException
- If the thread has been interruptedprivate ActorMessage retrieveNextMessage()
private void transferQueues()
public final void setParallelGroup(PGroup group)
setParallelGroup
in class Actor
group
- new grouppublic final MessageStream send(java.lang.Object message)
MessageStream
send
in class MessageStream
message
- message to sendprotected final boolean hasBeenStopped()
hasBeenStopped
in class Actor
protected void handleTermination()
handleTermination
in class Actor
public Actor silentStart()
Actor
silentStart
in class Actor
public final SequentialProcessingActor start()
public final Actor stop()
public final Actor terminate()
protected abstract java.lang.Object receiveImpl() throws java.lang.InterruptedException
java.lang.InterruptedException
- If the thread is interrupted during the wait. Should propagate up to stop the thread.protected abstract java.lang.Object receiveImpl(long timeout, java.util.concurrent.TimeUnit units) throws java.lang.InterruptedException
timeout
- how long to wait before giving up, in units of unitunits
- a TimeUnit determining how to interpret the timeout parameterjava.lang.InterruptedException
- If the thread is interrupted during the wait. Should propagate up to stop the thread.protected final java.lang.Object receive() throws java.lang.InterruptedException
java.lang.InterruptedException
- If the thread is interrupted during the wait. Should propagate up to stop the thread.protected final java.lang.Object receive(long timeout, java.util.concurrent.TimeUnit units) throws java.lang.InterruptedException
timeout
- how long to wait before giving up, in units of unitunits
- a TimeUnit determining how to interpret the timeout parameterjava.lang.InterruptedException
- If the thread is interrupted during the wait. Should propagate up to stop the thread.private static java.lang.Object unwrapMessage(java.lang.Object msg)
protected final java.lang.Object receive(groovy.time.BaseDuration duration) throws java.lang.InterruptedException
duration
- how long to wait before giving up, in units of unitjava.lang.InterruptedException
- If the thread is interrupted during the wait. Should propagate up to stop the thread.protected final ActorMessage sweepNextMessage()
sweepNextMessage
in class Actor
protected abstract void act()
Actor
's
message handling code.public void run()
run
in interface java.lang.Runnable
protected final void checkStopTerminate()