|
||||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||
java.lang.Objectgroovyx.gpars.serial.WithSerialId
groovyx.gpars.actor.impl.MessageStream
groovyx.gpars.actor.Actor
groovyx.gpars.actor.impl.ReplyingMessageStream
groovyx.gpars.actor.impl.SequentialProcessingActor
groovyx.gpars.actor.AbstractPooledActor
@Deprecated public abstract class AbstractPooledActor
AbstractPooledActor provides the default implementation of a stateful actor. Refer to DynamicDispatchActor or ReactiveActor for examples of stateless actors. AbstractPooledActor represents a standalone active object (actor), which reacts asynchronously to
messages sent to it from outside through the send method, which preserving its internal implicit
state. Each Actor has its own message queue and a thread pool shared with other Actors
by means of an instance of the PGroup, which they have in common. The PGroup instance is
responsible for the pool creation, management and shutdown. All work performed by an Actor is
divided into chunks, which are sequentially submitted as independent tasks to the thread pool for
processing. Whenever an Actor looks for a new message through the react method, the
actor gets detached from the thread, making the thread available for other actors. Thanks to the ability
to dynamically attach and detach threads to actors, Actors can scale far beyond the limits of the
underlying platform on number of concurrently available threads. The receive method can be used
to read a message from the queue without giving up the thread. If no message is available, the call to
receive blocks until a message arrives or the supplied timeout expires. The loop method
allows to repeatedly invoke a closure and yet perform each of the iterations sequentially in different
thread from the thread pool. To support continuations correctly the react and loop
methods never return.
import static groovyx.gpars.actor.Actors.actor
def actor = actor {
loop {
react { message ->
println message
}
// This line will never be reached.
}
// This line will never be reached.
}.start()
actor.send 'Hi!'
This requires the code to be structured accordingly.
def adder = actor {
loop {
react { a ->
react { b ->
println a+b
replyIfExists a+b // Sends reply, if b was sent by a PooledActor.
}
}
// This line will never be reached.
}
// This line will never be reached.
}.start()
The closures passed to the react method can call reply or replyIfExists, which
will send a message back to the originator of the currently processed message. The replyIfExists
method unlike the reply method will not fail if the original message wasn't sent by an actor nor
if the original sender actor is no longer running. The reply and replyIfExists methods
are also dynamically added to the processed messages.
react { a ->
react { b ->
reply 'message' //sent to senders of a as well as b
a.reply 'private message' //sent to the sender of a only
}
}
The react method accepts timeouts as well.
react(10, TimeUnit.MINUTES) {
println 'Received message: ' + it
}
If no message arrives within the given timeout, the onTimeout lifecycle handler is invoked, if
exists, and the Actor.TIMEOUT message is returned. Each Actor has at any point in time
at most one active instance of ActorAction associated, which abstracts the current chunk of
actor's work to perform. Once a thread is assigned to the ActorAction, it moves the actor forward
till loop or react is called. These methods schedule another ActorAction for
processing and throw dedicated exception to terminate the current ActorAction.
Each Actor can define lifecycle observing methods, which will be called by the Actor's background thread whenever a certain lifecycle event occurs.
afterStart() - called immediately after the Actor's background thread has been
started, before the act method is called the first time.afterStop(List undeliveredMessages) - called right after the actor is stopped, passing in all
the messages from the queue.onInterrupt(InterruptedException e) - called when a react method timeouts. The actor
will be terminated.onTimeout() - called when the actor's thread gets interrupted. Thread interruption will
result in the stopping the actor in any case.onException(Throwable e) - called when an exception occurs in the actor's thread. Throwing an
exception from this method will stop the actor.
| Nested Class Summary |
|---|
| Nested classes/interfaces inherited from class groovyx.gpars.actor.Actor |
|---|
Actor.MyRemoteHandle, Actor.RemoteActor |
| Nested classes/interfaces inherited from class groovyx.gpars.actor.impl.MessageStream |
|---|
MessageStream.RemoteMessageStream, MessageStream.SendTo |
| Field Summary | |
|---|---|
private static long |
serialVersionUID
Deprecated. |
private static java.lang.String |
THE_ACTOR_HAS_BEEN_STOPPED
Deprecated. |
private static java.lang.String |
THE_ACTOR_HAS_NOT_BEEN_STARTED
Deprecated. |
| Fields inherited from class groovyx.gpars.actor.impl.SequentialProcessingActor |
|---|
afterLoopCode, loopCode, loopCondition, S_ACTIVE_MASK, S_FINISHED_MASK, S_FINISHING_MASK, S_NOT_STARTED, S_RUNNING, S_STOP_TERMINATE_MASK, S_STOPPED, S_STOPPING, S_TERMINATED, S_TERMINATING, stopFlag, stopFlagUpdater |
| Fields inherited from class groovyx.gpars.actor.impl.ReplyingMessageStream |
|---|
obj2Sender |
| Fields inherited from class groovyx.gpars.actor.Actor |
|---|
ACTOR_HAS_ALREADY_BEEN_STARTED, currentThread, parallelGroup, START_MESSAGE, STOP_MESSAGE, TERMINATE_MESSAGE, TIMEOUT, TIMEOUT_MESSAGE, timer |
| Fields inherited from class groovyx.gpars.serial.WithSerialId |
|---|
serialHandle |
| Constructor Summary | |
|---|---|
AbstractPooledActor()
Deprecated. |
|
| Method Summary | |
|---|---|
protected abstract void |
act()
Deprecated. This method represents the body of the actor. |
private void |
collectRequiredMessages(java.util.Collection<ActorMessage> messages,
int toReceive)
Deprecated. |
private java.lang.Object |
enhanceAndUnwrap(ActorMessage message)
Deprecated. |
private void |
enhanceReplies(java.lang.Iterable<ActorMessage> messages)
Deprecated. Adds reply and replyIfExists methods to the current Actor and the message. |
protected void |
handleStart()
Deprecated. |
private void |
checkStoppedFlags()
Deprecated. |
protected void |
receive(groovy.lang.Closure handler)
Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected void |
receive(groovy.time.Duration duration,
groovy.lang.Closure handler)
Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected void |
receive(long timeout,
java.util.concurrent.TimeUnit timeUnit,
groovy.lang.Closure handler)
Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected java.lang.Object |
receiveImpl()
Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected java.lang.Object |
receiveImpl(long timeout,
java.util.concurrent.TimeUnit units)
Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
private static java.lang.Object[] |
retrievePayloadOfMessages(java.util.List<ActorMessage> messages)
Deprecated. |
| Methods inherited from class groovyx.gpars.actor.impl.SequentialProcessingActor |
|---|
handleTermination, hasBeenStopped, checkStopTerminate, isActive, loop, loop, loop, loop, loop, pollMessage, react, react, react, react, receive, receive, receive, run, send, setParallelGroup, scheduleLoop, silentStart, start, stop, sweepNextMessage, takeMessage, takeMessage, terminate |
| Methods inherited from class groovyx.gpars.actor.impl.ReplyingMessageStream |
|---|
getSender, getSenders, reply, replyIfExists, runEnhancedWithRepliesOnMessages |
| Methods inherited from class groovyx.gpars.actor.Actor |
|---|
createActorMessage, createRemoteHandle, deregisterCurrentActorWithThread, getJoinLatch, getParallelGroup, handleException, handleInterrupt, handleTimeout, isActorThread, join, join, join, join, onStop, registerCurrentActorWithThread, sendAndContinue, sweepQueue, threadBoundActor |
| Methods inherited from class groovyx.gpars.actor.impl.MessageStream |
|---|
call, getRemoteClass, leftShift, send, send, sendAndWait, sendAndWait, sendAndWait |
| Methods inherited from class groovyx.gpars.serial.WithSerialId |
|---|
getOrCreateSerialHandle, writeReplace |
| Methods inherited from class java.lang.Object |
|---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
| Field Detail |
|---|
private static final java.lang.String THE_ACTOR_HAS_NOT_BEEN_STARTED
private static final java.lang.String THE_ACTOR_HAS_BEEN_STOPPED
private static final long serialVersionUID
| Constructor Detail |
|---|
public AbstractPooledActor()
| Method Detail |
|---|
protected abstract void act()
Actor's
message handling code.
private void enhanceReplies(java.lang.Iterable<ActorMessage> messages)
reply and replyIfExists methods to the current Actor and the message.
These methods will call send on the target actor (the sender of the original message). The
reply/replyIfExists methods invoked on the actor will be sent to all currently
processed messages, reply/replyIfExists invoked on a message will send a reply to the
sender of that particular message only.
messages - List of ActorMessage wrapping the sender actor, who we need to be able to
respond to, plus the original message
protected final java.lang.Object receiveImpl()
throws java.lang.InterruptedException
receiveImpl in class SequentialProcessingActorjava.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.
protected final java.lang.Object receiveImpl(long timeout,
java.util.concurrent.TimeUnit units)
throws java.lang.InterruptedException
receiveImpl in class SequentialProcessingActortimeout - how long to wait before giving up, in units of unitunits - a TimeUnit determining how to interpret the timeout parameter
java.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.private java.lang.Object enhanceAndUnwrap(ActorMessage message)
private void checkStoppedFlags()
protected final void receive(groovy.lang.Closure handler)
throws java.lang.InterruptedException
handler - A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
java.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.
private void collectRequiredMessages(java.util.Collection<ActorMessage> messages,
int toReceive)
throws java.lang.InterruptedException
java.lang.InterruptedException
protected final void receive(long timeout,
java.util.concurrent.TimeUnit timeUnit,
groovy.lang.Closure handler)
throws java.lang.InterruptedException
timeout - how long to wait before giving up, in units of unittimeUnit - a TimeUnit determining how to interpret the timeout parameterhandler - A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
java.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.private static java.lang.Object[] retrievePayloadOfMessages(java.util.List<ActorMessage> messages)
protected final void receive(groovy.time.Duration duration,
groovy.lang.Closure handler)
throws java.lang.InterruptedException
duration - how long to wait before giving up, in units of unithandler - A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
java.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.protected void handleStart()
handleStart in class Actor
|
Copyright © 2008–2010 Václav Pech. All Rights Reserved. | |||||||||
| PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
| SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD | |||||||||