org.codehaus.gpars

groovyx.gpars.actor
Class AbstractPooledActor

java.lang.Object
  groovyx.gpars.serial.WithSerialId
      groovyx.gpars.actor.impl.MessageStream
          groovyx.gpars.actor.Actor
              groovyx.gpars.actor.impl.ReplyingMessageStream
                  groovyx.gpars.actor.impl.SequentialProcessingActor
                      groovyx.gpars.actor.AbstractPooledActor

@Deprecated
@SuppressWarnings({"ThrowCaughtLocally", "UnqualifiedStaticUsage"})
class AbstractPooledActor
extends SequentialProcessingActor

AbstractPooledActor provides the default implementation of a stateful actor. Refer to DynamicDispatchActor or ReactiveActor for examples of stateless actors. AbstractPooledActor represents a standalone active object (actor), which reacts asynchronously to messages sent to it from outside through the send method, which preserving its internal implicit state. Each Actor has its own message queue and a thread pool shared with other Actors by means of an instance of the PGroup, which they have in common. The PGroup instance is responsible for the pool creation, management and shutdown. All work performed by an Actor is divided into chunks, which are sequentially submitted as independent tasks to the thread pool for processing. Whenever an Actor looks for a new message through the react method, the actor gets detached from the thread, making the thread available for other actors. Thanks to the ability to dynamically attach and detach threads to actors, Actors can scale far beyond the limits of the underlying platform on number of concurrently available threads. The receive method can be used to read a message from the queue without giving up the thread. If no message is available, the call to receive blocks until a message arrives or the supplied timeout expires. The loop method allows to repeatedly invoke a closure and yet perform each of the iterations sequentially in different thread from the thread pool. To support continuations correctly the react and loop methods never return.

 import static groovyx.gpars.actor.Actors.actor
 
 def actor = actor {
     loop {
         react { message ->
             println message
         }
         // This line will never be reached.
     }
     // This line will never be reached.
 }.start()
 
 actor.send 'Hi!'
 

This requires the code to be structured accordingly.

 def adder = actor {
     loop {
         react { a ->
             react { b ->
                 println a+b
                 replyIfExists a+b  // Sends reply, if b was sent by a PooledActor.
             }
         }
         // This line will never be reached.
     }
     // This line will never be reached.
 }.start()
 

The closures passed to the react method can call reply or replyIfExists, which will send a message back to the originator of the currently processed message. The replyIfExists method unlike the reply method will not fail if the original message wasn't sent by an actor nor if the original sender actor is no longer running. The reply and replyIfExists methods are also dynamically added to the processed messages.

 react { a ->
     react { b ->
         reply 'message'  //sent to senders of a as well as b
         a.reply 'private message'  //sent to the sender of a only
     }
 }
 

The react method accepts timeouts as well.

 react(10, TimeUnit.MINUTES) {
     println 'Received message: ' + it
 }
 

If no message arrives within the given timeout, the onTimeout lifecycle handler is invoked, if exists, and the Actor.TIMEOUT message is returned. Each Actor has at any point in time at most one active instance of ActorAction associated, which abstracts the current chunk of actor's work to perform. Once a thread is assigned to the ActorAction, it moves the actor forward till loop or react is called. These methods schedule another ActorAction for processing and throw dedicated exception to terminate the current ActorAction.

Each Actor can define lifecycle observing methods, which will be called by the Actor's background thread whenever a certain lifecycle event occurs.

Authors:
Vaclav Pech, Alex Tkachman, Dierk Koenig


Field Summary
private static java.lang.String THE_ACTOR_HAS_BEEN_STOPPED

private static java.lang.String THE_ACTOR_HAS_NOT_BEEN_STARTED

private static long serialVersionUID

 
Fields inherited from class SequentialProcessingActor
S_ACTIVE_MASK, S_FINISHED_MASK, S_FINISHING_MASK, S_NOT_STARTED, S_RUNNING, S_STOPPED, S_STOPPING, S_STOP_TERMINATE_MASK, S_TERMINATED, S_TERMINATING, afterLoopCode, loopCode, loopCondition, stopFlag, stopFlagUpdater
 
Fields inherited from class ReplyingMessageStream
obj2Sender
 
Fields inherited from class Actor
ACTOR_HAS_ALREADY_BEEN_STARTED, START_MESSAGE, STOP_MESSAGE, TERMINATE_MESSAGE, TIMEOUT_MESSAGE, currentThread, parallelGroup, timer
 
Constructor Summary
AbstractPooledActor()

 
Method Summary
protected void act()

This method represents the body of the actor.

private void checkStoppedFlags()

private void collectRequiredMessages(java.util.Collection messages, int toReceive)

private java.lang.Object enhanceAndUnwrap(ActorMessage message)

private void enhanceReplies(java.lang.Iterable messages)

Adds reply and replyIfExists methods to the current Actor and the message.

protected void handleStart()

protected void receive(groovy.lang.Closure handler)

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected void receive(long timeout, java.util.concurrent.TimeUnit timeUnit, groovy.lang.Closure handler)

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected void receive(groovy.time.Duration duration, groovy.lang.Closure handler)

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected java.lang.Object receiveImpl()

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

protected java.lang.Object receiveImpl(long timeout, java.util.concurrent.TimeUnit units)

Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

private static Object[] retrievePayloadOfMessages(java.util.List messages)

 
Methods inherited from class SequentialProcessingActor
checkStopTerminate, handleTermination, hasBeenStopped, isActive, loop, loop, loop, loop, loop, pollMessage, react, react, react, react, receive, receive, receive, receiveImpl, receiveImpl, run, runReaction, scheduleLoop, send, setParallelGroup, silentStart, start, stop, sweepNextMessage, takeMessage, takeMessage, terminate
 
Methods inherited from class ReplyingMessageStream
getSender, getSenders, reply, replyIfExists, runEnhancedWithRepliesOnMessages
 
Methods inherited from class Actor
createActorMessage, createRemoteHandle, deregisterCurrentActorWithThread, getJoinLatch, getParallelGroup, handleException, handleInterrupt, handleStart, handleTermination, handleTimeout, hasBeenStopped, isActive, isActorThread, join, join, join, join, onStop, registerCurrentActorWithThread, sendAndContinue, setParallelGroup, silentStart, start, stop, sweepNextMessage, sweepQueue, terminate, threadBoundActor
 
Methods inherited from class MessageStream
call, getRemoteClass, leftShift, reInterrupt, send, send, send, sendAndWait, sendAndWait, sendAndWait
 
Methods inherited from class WithSerialId
createRemoteHandle, getOrCreateSerialHandle, getRemoteClass, writeReplace
 

Field Detail

THE_ACTOR_HAS_BEEN_STOPPED

private static final java.lang.String THE_ACTOR_HAS_BEEN_STOPPED


THE_ACTOR_HAS_NOT_BEEN_STARTED

private static final java.lang.String THE_ACTOR_HAS_NOT_BEEN_STARTED


serialVersionUID

private static final long serialVersionUID


 
Constructor Detail

AbstractPooledActor

AbstractPooledActor()


 
Method Detail

act

protected void act()
This method represents the body of the actor. It is called upon actor's start and can exit either normally by return or due to actor being stopped through the stop() method, which cancels the current actor action. Provides an extension point for subclasses to provide their custom Actor's message handling code.


checkStoppedFlags

private void checkStoppedFlags()


collectRequiredMessages

private void collectRequiredMessages(java.util.Collection messages, int toReceive)


enhanceAndUnwrap

private java.lang.Object enhanceAndUnwrap(ActorMessage message)


enhanceReplies

private void enhanceReplies(java.lang.Iterable messages)
Adds reply and replyIfExists methods to the current Actor and the message. These methods will call send on the target actor (the sender of the original message). The reply/replyIfExists methods invoked on the actor will be sent to all currently processed messages, reply/replyIfExists invoked on a message will send a reply to the sender of that particular message only.
Parameters:
messages - List of ActorMessage wrapping the sender actor, who we need to be able to respond to, plus the original message


handleStart

@Override
protected void handleStart()


receive

@SuppressWarnings({"MethodOverloadsMethodOfSuperclass"})
protected final void receive(groovy.lang.Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. The message retrieved from the queue is passed into the handler as the only parameter.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.
Parameters:
handler - A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.


receive

protected final void receive(long timeout, java.util.concurrent.TimeUnit timeUnit, groovy.lang.Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. The message retrieved from the queue is passed into the handler as the only parameter. A null value is passed into the handler, if the timeout expires
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.
Parameters:
timeout - how long to wait before giving up, in units of unit
timeUnit - a TimeUnit determining how to interpret the timeout parameter
handler - A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.


receive

@SuppressWarnings({"MethodOverloadsMethodOfSuperclass", "TypeMayBeWeakened"})
protected final void receive(groovy.time.Duration duration, groovy.lang.Closure handler)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. The message retrieved from the queue is passed into the handler as the only parameter. A null value is passed into the handler, if the timeout expires
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.
Parameters:
duration - how long to wait before giving up, in units of unit
handler - A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.


receiveImpl

@Override
protected final java.lang.Object receiveImpl()
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.
Returns:
The message retrieved from the queue.


receiveImpl

@Override
protected final java.lang.Object receiveImpl(long timeout, java.util.concurrent.TimeUnit units)
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
throws:
InterruptedException If the thread is interrupted during the wait. Should propagate up to stop the thread.
Parameters:
timeout - how long to wait before giving up, in units of unit
units - a TimeUnit determining how to interpret the timeout parameter
Returns:
The message retrieved from the queue, or null, if the timeout expires.


retrievePayloadOfMessages

private static Object[] retrievePayloadOfMessages(java.util.List messages)


 

Copyright © 2008–2010 Václav Pech. All Rights Reserved.