|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectgroovyx.gpars.serial.WithSerialId
groovyx.gpars.actor.impl.MessageStream
groovyx.gpars.actor.Actor
groovyx.gpars.actor.impl.ReplyingMessageStream
groovyx.gpars.actor.impl.SequentialProcessingActor
groovyx.gpars.actor.AbstractPooledActor
@Deprecated public abstract class AbstractPooledActor
AbstractPooledActor
provides the default implementation of a stateful actor. Refer to DynamicDispatchActor
or ReactiveActor
for examples of stateless actors. AbstractPooledActor
represents a standalone active object (actor), which reacts asynchronously to
messages sent to it from outside through the send
method, which preserving its internal implicit
state. Each Actor
has its own message queue and a thread pool shared with other Actor
s
by means of an instance of the PGroup
, which they have in common. The PGroup
instance is
responsible for the pool creation, management and shutdown. All work performed by an Actor
is
divided into chunks, which are sequentially submitted as independent tasks to the thread pool for
processing. Whenever an Actor
looks for a new message through the react
method, the
actor gets detached from the thread, making the thread available for other actors. Thanks to the ability
to dynamically attach and detach threads to actors, Actors
can scale far beyond the limits of the
underlying platform on number of concurrently available threads. The receive
method can be used
to read a message from the queue without giving up the thread. If no message is available, the call to
receive
blocks until a message arrives or the supplied timeout expires. The loop
method
allows to repeatedly invoke a closure and yet perform each of the iterations sequentially in different
thread from the thread pool. To support continuations correctly the react
and loop
methods never return.
import static groovyx.gpars.actor.Actors.actor def actor = actor { loop { react { message -> println message } // This line will never be reached. } // This line will never be reached. }.start() actor.send 'Hi!'
This requires the code to be structured accordingly.
def adder = actor { loop { react { a -> react { b -> println a+b replyIfExists a+b // Sends reply, if b was sent by a PooledActor. } } // This line will never be reached. } // This line will never be reached. }.start()
The closures passed to the react
method can call reply
or replyIfExists
, which
will send a message back to the originator of the currently processed message. The replyIfExists
method unlike the reply
method will not fail if the original message wasn't sent by an actor nor
if the original sender actor is no longer running. The reply
and replyIfExists
methods
are also dynamically added to the processed messages.
react { a -> react { b -> reply 'message' //sent to senders of a as well as b a.reply 'private message' //sent to the sender of a only } }
The react
method accepts timeouts as well.
react(10, TimeUnit.MINUTES) { println 'Received message: ' + it }
If no message arrives within the given timeout, the onTimeout
lifecycle handler is invoked, if
exists, and the Actor.TIMEOUT
message is returned. Each Actor
has at any point in time
at most one active instance of ActorAction
associated, which abstracts the current chunk of
actor's work to perform. Once a thread is assigned to the ActorAction
, it moves the actor forward
till loop
or react
is called. These methods schedule another ActorAction
for
processing and throw dedicated exception to terminate the current ActorAction
.
Each Actor can define lifecycle observing methods, which will be called by the Actor's background thread whenever a certain lifecycle event occurs.
afterStart()
- called immediately after the Actor
's background thread has been
started, before the act
method is called the first time.afterStop(List undeliveredMessages)
- called right after the actor is stopped, passing in all
the messages from the queue.onInterrupt(InterruptedException e)
- called when a react
method timeouts. The actor
will be terminated.onTimeout()
- called when the actor's thread gets interrupted. Thread interruption will
result in the stopping the actor in any case.onException(Throwable e)
- called when an exception occurs in the actor's thread. Throwing an
exception from this method will stop the actor.
Nested Class Summary |
---|
Nested classes/interfaces inherited from class groovyx.gpars.actor.Actor |
---|
Actor.MyRemoteHandle, Actor.RemoteActor |
Nested classes/interfaces inherited from class groovyx.gpars.actor.impl.MessageStream |
---|
MessageStream.RemoteMessageStream, MessageStream.SendTo |
Field Summary | |
---|---|
private static long |
serialVersionUID
Deprecated. |
private static java.lang.String |
THE_ACTOR_HAS_BEEN_STOPPED
Deprecated. |
private static java.lang.String |
THE_ACTOR_HAS_NOT_BEEN_STARTED
Deprecated. |
Fields inherited from class groovyx.gpars.actor.impl.SequentialProcessingActor |
---|
afterLoopCode, loopCode, loopCondition, S_ACTIVE_MASK, S_FINISHED_MASK, S_FINISHING_MASK, S_NOT_STARTED, S_RUNNING, S_STOP_TERMINATE_MASK, S_STOPPED, S_STOPPING, S_TERMINATED, S_TERMINATING, stopFlag, stopFlagUpdater |
Fields inherited from class groovyx.gpars.actor.impl.ReplyingMessageStream |
---|
obj2Sender |
Fields inherited from class groovyx.gpars.actor.Actor |
---|
ACTOR_HAS_ALREADY_BEEN_STARTED, currentThread, parallelGroup, START_MESSAGE, STOP_MESSAGE, TERMINATE_MESSAGE, TIMEOUT, TIMEOUT_MESSAGE, timer |
Fields inherited from class groovyx.gpars.serial.WithSerialId |
---|
serialHandle |
Constructor Summary | |
---|---|
AbstractPooledActor()
Deprecated. |
Method Summary | |
---|---|
protected abstract void |
act()
Deprecated. This method represents the body of the actor. |
private void |
collectRequiredMessages(java.util.Collection<ActorMessage> messages,
int toReceive)
Deprecated. |
private java.lang.Object |
enhanceAndUnwrap(ActorMessage message)
Deprecated. |
private void |
enhanceReplies(java.lang.Iterable<ActorMessage> messages)
Deprecated. Adds reply and replyIfExists methods to the current Actor and the message. |
protected void |
handleStart()
Deprecated. |
private void |
checkStoppedFlags()
Deprecated. |
protected void |
receive(groovy.lang.Closure handler)
Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected void |
receive(groovy.time.Duration duration,
groovy.lang.Closure handler)
Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected void |
receive(long timeout,
java.util.concurrent.TimeUnit timeUnit,
groovy.lang.Closure handler)
Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected java.lang.Object |
receiveImpl()
Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected java.lang.Object |
receiveImpl(long timeout,
java.util.concurrent.TimeUnit units)
Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
private static java.lang.Object[] |
retrievePayloadOfMessages(java.util.List<ActorMessage> messages)
Deprecated. |
Methods inherited from class groovyx.gpars.actor.impl.SequentialProcessingActor |
---|
handleTermination, hasBeenStopped, checkStopTerminate, isActive, loop, loop, loop, loop, loop, pollMessage, react, react, react, react, receive, receive, receive, run, send, setParallelGroup, scheduleLoop, silentStart, start, stop, sweepNextMessage, takeMessage, takeMessage, terminate |
Methods inherited from class groovyx.gpars.actor.impl.ReplyingMessageStream |
---|
getSender, getSenders, reply, replyIfExists, runEnhancedWithRepliesOnMessages |
Methods inherited from class groovyx.gpars.actor.Actor |
---|
createActorMessage, createRemoteHandle, deregisterCurrentActorWithThread, getJoinLatch, getParallelGroup, handleException, handleInterrupt, handleTimeout, isActorThread, join, join, join, join, onStop, registerCurrentActorWithThread, sendAndContinue, sweepQueue, threadBoundActor |
Methods inherited from class groovyx.gpars.actor.impl.MessageStream |
---|
call, getRemoteClass, leftShift, send, send, sendAndWait, sendAndWait, sendAndWait |
Methods inherited from class groovyx.gpars.serial.WithSerialId |
---|
getOrCreateSerialHandle, writeReplace |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Field Detail |
---|
private static final java.lang.String THE_ACTOR_HAS_NOT_BEEN_STARTED
private static final java.lang.String THE_ACTOR_HAS_BEEN_STOPPED
private static final long serialVersionUID
Constructor Detail |
---|
public AbstractPooledActor()
Method Detail |
---|
protected abstract void act()
Actor
's
message handling code.
private void enhanceReplies(java.lang.Iterable<ActorMessage> messages)
reply
and replyIfExists
methods to the current Actor
and the message.
These methods will call send
on the target actor (the sender of the original message). The
reply
/replyIfExists
methods invoked on the actor will be sent to all currently
processed messages, reply
/replyIfExists
invoked on a message will send a reply to the
sender of that particular message only.
messages
- List of ActorMessage
wrapping the sender actor, who we need to be able to
respond to, plus the original messageprotected final java.lang.Object receiveImpl() throws java.lang.InterruptedException
receiveImpl
in class SequentialProcessingActor
java.lang.InterruptedException
- If the thread is interrupted during the wait. Should propagate up to stop the thread.protected final java.lang.Object receiveImpl(long timeout, java.util.concurrent.TimeUnit units) throws java.lang.InterruptedException
receiveImpl
in class SequentialProcessingActor
timeout
- how long to wait before giving up, in units of unitunits
- a TimeUnit
determining how to interpret the timeout parameter
java.lang.InterruptedException
- If the thread is interrupted during the wait. Should propagate up to stop the thread.private java.lang.Object enhanceAndUnwrap(ActorMessage message)
private void checkStoppedFlags()
protected final void receive(groovy.lang.Closure handler) throws java.lang.InterruptedException
handler
- A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
java.lang.InterruptedException
- If the thread is interrupted during the wait. Should propagate up to stop the thread.private void collectRequiredMessages(java.util.Collection<ActorMessage> messages, int toReceive) throws java.lang.InterruptedException
java.lang.InterruptedException
protected final void receive(long timeout, java.util.concurrent.TimeUnit timeUnit, groovy.lang.Closure handler) throws java.lang.InterruptedException
timeout
- how long to wait before giving up, in units of unittimeUnit
- a TimeUnit determining how to interpret the timeout parameterhandler
- A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
java.lang.InterruptedException
- If the thread is interrupted during the wait. Should propagate up to stop the thread.private static java.lang.Object[] retrievePayloadOfMessages(java.util.List<ActorMessage> messages)
protected final void receive(groovy.time.Duration duration, groovy.lang.Closure handler) throws java.lang.InterruptedException
duration
- how long to wait before giving up, in units of unithandler
- A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
java.lang.InterruptedException
- If the thread is interrupted during the wait. Should propagate up to stop the thread.protected void handleStart()
handleStart
in class Actor
|
Copyright © 2008–2010 Václav Pech. All Rights Reserved. | |||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |