|
org.codehaus.gpars | |||||||
FRAMES NO FRAMES | ||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Objectgroovyx.gpars.serial.WithSerialId
groovyx.gpars.actor.impl.MessageStream
groovyx.gpars.actor.Actor
groovyx.gpars.actor.impl.ReplyingMessageStream
groovyx.gpars.actor.impl.SequentialProcessingActor
groovyx.gpars.actor.AbstractPooledActor
@Deprecated @SuppressWarnings({"ThrowCaughtLocally", "UnqualifiedStaticUsage"}) class AbstractPooledActor extends SequentialProcessingActor
AbstractPooledActor provides the default implementation of a stateful actor. Refer to DynamicDispatchActor or ReactiveActor for examples of stateless actors. AbstractPooledActor represents a standalone active object (actor), which reacts asynchronously to messages sent to it from outside through the send method, which preserving its internal implicit state. Each Actor has its own message queue and a thread pool shared with other Actors by means of an instance of the PGroup, which they have in common. The PGroup instance is responsible for the pool creation, management and shutdown. All work performed by an Actor is divided into chunks, which are sequentially submitted as independent tasks to the thread pool for processing. Whenever an Actor looks for a new message through the react method, the actor gets detached from the thread, making the thread available for other actors. Thanks to the ability to dynamically attach and detach threads to actors, Actors can scale far beyond the limits of the underlying platform on number of concurrently available threads. The receive method can be used to read a message from the queue without giving up the thread. If no message is available, the call to receive blocks until a message arrives or the supplied timeout expires. The loop method allows to repeatedly invoke a closure and yet perform each of the iterations sequentially in different thread from the thread pool. To support continuations correctly the react and loop methods never return.
import static groovyx.gpars.actor.Actors.actor def actor = actor { loop { react { message -> println message } // This line will never be reached. } // This line will never be reached. }.start() actor.send 'Hi!'
This requires the code to be structured accordingly.
def adder = actor { loop { react { a -> react { b -> println a+b replyIfExists a+b // Sends reply, if b was sent by a PooledActor. } } // This line will never be reached. } // This line will never be reached. }.start()
The closures passed to the react method can call reply or replyIfExists, which will send a message back to the originator of the currently processed message. The replyIfExists method unlike the reply method will not fail if the original message wasn't sent by an actor nor if the original sender actor is no longer running. The reply and replyIfExists methods are also dynamically added to the processed messages.
react { a -> react { b -> reply 'message' //sent to senders of a as well as b a.reply 'private message' //sent to the sender of a only } }
The react method accepts timeouts as well.
react(10, TimeUnit.MINUTES) { println 'Received message: ' + it }
If no message arrives within the given timeout, the onTimeout lifecycle handler is invoked, if exists, and the Actor.TIMEOUT message is returned. Each Actor has at any point in time at most one active instance of ActorAction associated, which abstracts the current chunk of actor's work to perform. Once a thread is assigned to the ActorAction, it moves the actor forward till loop or react is called. These methods schedule another ActorAction for processing and throw dedicated exception to terminate the current ActorAction.
Each Actor can define lifecycle observing methods, which will be called by the Actor's background thread whenever a certain lifecycle event occurs.
Field Summary | |
---|---|
private static java.lang.String |
THE_ACTOR_HAS_BEEN_STOPPED
|
private static java.lang.String |
THE_ACTOR_HAS_NOT_BEEN_STARTED
|
private static long |
serialVersionUID
|
Fields inherited from class SequentialProcessingActor | |
---|---|
S_ACTIVE_MASK, S_FINISHED_MASK, S_FINISHING_MASK, S_NOT_STARTED, S_RUNNING, S_STOPPED, S_STOPPING, S_STOP_TERMINATE_MASK, S_TERMINATED, S_TERMINATING, afterLoopCode, loopCode, loopCondition, stopFlag, stopFlagUpdater |
Fields inherited from class ReplyingMessageStream | |
---|---|
obj2Sender |
Fields inherited from class Actor | |
---|---|
ACTOR_HAS_ALREADY_BEEN_STARTED, START_MESSAGE, STOP_MESSAGE, TERMINATE_MESSAGE, TIMEOUT_MESSAGE, currentThread, parallelGroup, timer |
Constructor Summary | |
AbstractPooledActor()
|
Method Summary | |
---|---|
protected void
|
act()
This method represents the body of the actor. |
private void
|
checkStoppedFlags()
|
private void
|
collectRequiredMessages(java.util.Collection messages, int toReceive)
|
private java.lang.Object
|
enhanceAndUnwrap(ActorMessage message)
|
private void
|
enhanceReplies(java.lang.Iterable messages)
Adds reply and replyIfExists methods to the current Actor and the message. |
protected void
|
handleStart()
|
protected void
|
receive(groovy.lang.Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected void
|
receive(long timeout, java.util.concurrent.TimeUnit timeUnit, groovy.lang.Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected void
|
receive(groovy.time.Duration duration, groovy.lang.Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected java.lang.Object
|
receiveImpl()
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
protected java.lang.Object
|
receiveImpl(long timeout, java.util.concurrent.TimeUnit units)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. |
private static Object[]
|
retrievePayloadOfMessages(java.util.List messages)
|
Methods inherited from class SequentialProcessingActor | |
---|---|
checkStopTerminate, handleTermination, hasBeenStopped, isActive, loop, loop, loop, loop, loop, pollMessage, react, react, react, react, receive, receive, receive, receiveImpl, receiveImpl, run, runReaction, scheduleLoop, send, setParallelGroup, silentStart, start, stop, sweepNextMessage, takeMessage, takeMessage, terminate |
Methods inherited from class ReplyingMessageStream | |
---|---|
getSender, getSenders, reply, replyIfExists, runEnhancedWithRepliesOnMessages |
Methods inherited from class MessageStream | |
---|---|
call, getRemoteClass, leftShift, reInterrupt, send, send, send, sendAndWait, sendAndWait, sendAndWait |
Methods inherited from class WithSerialId | |
---|---|
createRemoteHandle, getOrCreateSerialHandle, getRemoteClass, writeReplace |
Field Detail |
---|
private static final java.lang.String THE_ACTOR_HAS_BEEN_STOPPED
private static final java.lang.String THE_ACTOR_HAS_NOT_BEEN_STARTED
private static final long serialVersionUID
Constructor Detail |
---|
AbstractPooledActor()
Method Detail |
---|
protected void act()
private void checkStoppedFlags()
private void collectRequiredMessages(java.util.Collection messages, int toReceive)
private java.lang.Object enhanceAndUnwrap(ActorMessage message)
private void enhanceReplies(java.lang.Iterable messages)
messages
- List of ActorMessage wrapping the sender actor, who we need to be able to
respond to, plus the original message
@Override protected void handleStart()
@SuppressWarnings({"MethodOverloadsMethodOfSuperclass"}) protected final void receive(groovy.lang.Closure handler)
handler
- A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
protected final void receive(long timeout, java.util.concurrent.TimeUnit timeUnit, groovy.lang.Closure handler)
timeout
- how long to wait before giving up, in units of unittimeUnit
- a TimeUnit determining how to interpret the timeout parameterhandler
- A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
@SuppressWarnings({"MethodOverloadsMethodOfSuperclass", "TypeMayBeWeakened"}) protected final void receive(groovy.time.Duration duration, groovy.lang.Closure handler)
duration
- how long to wait before giving up, in units of unithandler
- A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
@Override protected final java.lang.Object receiveImpl()
@Override protected final java.lang.Object receiveImpl(long timeout, java.util.concurrent.TimeUnit units)
timeout
- how long to wait before giving up, in units of unitunits
- a TimeUnit determining how to interpret the timeout parameter
private static Object[] retrievePayloadOfMessages(java.util.List messages)
Copyright © 2008–2010 Václav Pech. All Rights Reserved.