groovyx.gpars.actor
Class AbstractPooledActor

java.lang.Object
  extended by groovyx.gpars.serial.WithSerialId
      extended by groovyx.gpars.actor.impl.MessageStream
          extended by groovyx.gpars.actor.Actor
              extended by groovyx.gpars.actor.impl.ReplyingMessageStream
                  extended by groovyx.gpars.actor.impl.SequentialProcessingActor
                      extended by groovyx.gpars.actor.AbstractPooledActor
All Implemented Interfaces:
java.io.Serializable, java.lang.Runnable
Direct Known Subclasses:
RunnableBackedPooledActor, SingleRunActor

Deprecated.

@Deprecated
public abstract class AbstractPooledActor
extends SequentialProcessingActor

AbstractPooledActor provides the default implementation of a stateful actor. Refer to DynamicDispatchActor or ReactiveActor for examples of stateless actors. AbstractPooledActor represents a standalone active object (actor), which reacts asynchronously to messages sent to it from outside through the send method, which preserving its internal implicit state. Each Actor has its own message queue and a thread pool shared with other Actors by means of an instance of the PGroup, which they have in common. The PGroup instance is responsible for the pool creation, management and shutdown. All work performed by an Actor is divided into chunks, which are sequentially submitted as independent tasks to the thread pool for processing. Whenever an Actor looks for a new message through the react method, the actor gets detached from the thread, making the thread available for other actors. Thanks to the ability to dynamically attach and detach threads to actors, Actors can scale far beyond the limits of the underlying platform on number of concurrently available threads. The receive method can be used to read a message from the queue without giving up the thread. If no message is available, the call to receive blocks until a message arrives or the supplied timeout expires. The loop method allows to repeatedly invoke a closure and yet perform each of the iterations sequentially in different thread from the thread pool. To support continuations correctly the react and loop methods never return.

 import static groovyx.gpars.actor.Actors.actor
 
 def actor = actor {
     loop {
         react { message ->
             println message
         }
         // This line will never be reached.
     }
     // This line will never be reached.
 }.start()
 
 actor.send 'Hi!'
 

This requires the code to be structured accordingly.

 def adder = actor {
     loop {
         react { a ->
             react { b ->
                 println a+b
                 replyIfExists a+b  // Sends reply, if b was sent by a PooledActor.
             }
         }
         // This line will never be reached.
     }
     // This line will never be reached.
 }.start()
 

The closures passed to the react method can call reply or replyIfExists, which will send a message back to the originator of the currently processed message. The replyIfExists method unlike the reply method will not fail if the original message wasn't sent by an actor nor if the original sender actor is no longer running. The reply and replyIfExists methods are also dynamically added to the processed messages.

 react { a ->
     react { b ->
         reply 'message'  //sent to senders of a as well as b
         a.reply 'private message'  //sent to the sender of a only
     }
 }
 

The react method accepts timeouts as well.

 react(10, TimeUnit.MINUTES) {
     println 'Received message: ' + it
 }
 

If no message arrives within the given timeout, the onTimeout lifecycle handler is invoked, if exists, and the Actor.TIMEOUT message is returned. Each Actor has at any point in time at most one active instance of ActorAction associated, which abstracts the current chunk of actor's work to perform. Once a thread is assigned to the ActorAction, it moves the actor forward till loop or react is called. These methods schedule another ActorAction for processing and throw dedicated exception to terminate the current ActorAction.

Each Actor can define lifecycle observing methods, which will be called by the Actor's background thread whenever a certain lifecycle event occurs.

Author:
Vaclav Pech, Alex Tkachman, Dierk Koenig
See Also:
Serialized Form

Nested Class Summary
 
Nested classes/interfaces inherited from class groovyx.gpars.actor.Actor
Actor.MyRemoteHandle, Actor.RemoteActor
 
Nested classes/interfaces inherited from class groovyx.gpars.actor.impl.MessageStream
MessageStream.RemoteMessageStream, MessageStream.SendTo
 
Field Summary
private static long serialVersionUID
          Deprecated.  
private static java.lang.String THE_ACTOR_HAS_BEEN_STOPPED
          Deprecated.  
private static java.lang.String THE_ACTOR_HAS_NOT_BEEN_STARTED
          Deprecated.  
 
Fields inherited from class groovyx.gpars.actor.impl.SequentialProcessingActor
afterLoopCode, loopCode, loopCondition, S_ACTIVE_MASK, S_FINISHED_MASK, S_FINISHING_MASK, S_NOT_STARTED, S_RUNNING, S_STOP_TERMINATE_MASK, S_STOPPED, S_STOPPING, S_TERMINATED, S_TERMINATING, stopFlag, stopFlagUpdater
 
Fields inherited from class groovyx.gpars.actor.impl.ReplyingMessageStream
obj2Sender
 
Fields inherited from class groovyx.gpars.actor.Actor
ACTOR_HAS_ALREADY_BEEN_STARTED, currentThread, parallelGroup, START_MESSAGE, STOP_MESSAGE, TERMINATE_MESSAGE, TIMEOUT, TIMEOUT_MESSAGE, timer
 
Fields inherited from class groovyx.gpars.serial.WithSerialId
serialHandle
 
Constructor Summary
AbstractPooledActor()
          Deprecated.  
 
Method Summary
protected abstract  void act()
          Deprecated. This method represents the body of the actor.
private  void collectRequiredMessages(java.util.Collection<ActorMessage> messages, int toReceive)
          Deprecated.  
private  java.lang.Object enhanceAndUnwrap(ActorMessage message)
          Deprecated.  
private  void enhanceReplies(java.lang.Iterable<ActorMessage> messages)
          Deprecated. Adds reply and replyIfExists methods to the current Actor and the message.
protected  void handleStart()
          Deprecated.  
private  void checkStoppedFlags()
          Deprecated.  
protected  void receive(groovy.lang.Closure handler)
          Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
protected  void receive(groovy.time.Duration duration, groovy.lang.Closure handler)
          Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
protected  void receive(long timeout, java.util.concurrent.TimeUnit timeUnit, groovy.lang.Closure handler)
          Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
protected  java.lang.Object receiveImpl()
          Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
protected  java.lang.Object receiveImpl(long timeout, java.util.concurrent.TimeUnit units)
          Deprecated. Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.
private static java.lang.Object[] retrievePayloadOfMessages(java.util.List<ActorMessage> messages)
          Deprecated.  
 
Methods inherited from class groovyx.gpars.actor.impl.SequentialProcessingActor
handleTermination, hasBeenStopped, checkStopTerminate, isActive, loop, loop, loop, loop, loop, pollMessage, react, react, react, react, receive, receive, receive, run, send, setParallelGroup, scheduleLoop, silentStart, start, stop, sweepNextMessage, takeMessage, takeMessage, terminate
 
Methods inherited from class groovyx.gpars.actor.impl.ReplyingMessageStream
getSender, getSenders, reply, replyIfExists, runEnhancedWithRepliesOnMessages
 
Methods inherited from class groovyx.gpars.actor.Actor
createActorMessage, createRemoteHandle, deregisterCurrentActorWithThread, getJoinLatch, getParallelGroup, handleException, handleInterrupt, handleTimeout, isActorThread, join, join, join, join, onStop, registerCurrentActorWithThread, sendAndContinue, sweepQueue, threadBoundActor
 
Methods inherited from class groovyx.gpars.actor.impl.MessageStream
call, getRemoteClass, leftShift, send, send, sendAndWait, sendAndWait, sendAndWait
 
Methods inherited from class groovyx.gpars.serial.WithSerialId
getOrCreateSerialHandle, writeReplace
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

THE_ACTOR_HAS_NOT_BEEN_STARTED

private static final java.lang.String THE_ACTOR_HAS_NOT_BEEN_STARTED
Deprecated. 
See Also:
Constant Field Values

THE_ACTOR_HAS_BEEN_STOPPED

private static final java.lang.String THE_ACTOR_HAS_BEEN_STOPPED
Deprecated. 
See Also:
Constant Field Values

serialVersionUID

private static final long serialVersionUID
Deprecated. 
See Also:
Constant Field Values
Constructor Detail

AbstractPooledActor

public AbstractPooledActor()
Deprecated. 
Method Detail

act

protected abstract void act()
Deprecated. 
This method represents the body of the actor. It is called upon actor's start and can exit either normally by return or due to actor being stopped through the stop() method, which cancels the current actor action. Provides an extension point for subclasses to provide their custom Actor's message handling code.


enhanceReplies

private void enhanceReplies(java.lang.Iterable<ActorMessage> messages)
Deprecated. 
Adds reply and replyIfExists methods to the current Actor and the message. These methods will call send on the target actor (the sender of the original message). The reply/replyIfExists methods invoked on the actor will be sent to all currently processed messages, reply/replyIfExists invoked on a message will send a reply to the sender of that particular message only.

Parameters:
messages - List of ActorMessage wrapping the sender actor, who we need to be able to respond to, plus the original message

receiveImpl

protected final java.lang.Object receiveImpl()
                                      throws java.lang.InterruptedException
Deprecated. 
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

Specified by:
receiveImpl in class SequentialProcessingActor
Returns:
The message retrieved from the queue.
Throws:
java.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.

receiveImpl

protected final java.lang.Object receiveImpl(long timeout,
                                             java.util.concurrent.TimeUnit units)
                                      throws java.lang.InterruptedException
Deprecated. 
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive.

Specified by:
receiveImpl in class SequentialProcessingActor
Parameters:
timeout - how long to wait before giving up, in units of unit
units - a TimeUnit determining how to interpret the timeout parameter
Returns:
The message retrieved from the queue, or null, if the timeout expires.
Throws:
java.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.

enhanceAndUnwrap

private java.lang.Object enhanceAndUnwrap(ActorMessage message)
Deprecated. 

checkStoppedFlags

private void checkStoppedFlags()
Deprecated. 

receive

protected final void receive(groovy.lang.Closure handler)
                      throws java.lang.InterruptedException
Deprecated. 
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. The message retrieved from the queue is passed into the handler as the only parameter.

Parameters:
handler - A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
Throws:
java.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.

collectRequiredMessages

private void collectRequiredMessages(java.util.Collection<ActorMessage> messages,
                                     int toReceive)
                              throws java.lang.InterruptedException
Deprecated. 
Throws:
java.lang.InterruptedException

receive

protected final void receive(long timeout,
                             java.util.concurrent.TimeUnit timeUnit,
                             groovy.lang.Closure handler)
                      throws java.lang.InterruptedException
Deprecated. 
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. The message retrieved from the queue is passed into the handler as the only parameter. A null value is passed into the handler, if the timeout expires

Parameters:
timeout - how long to wait before giving up, in units of unit
timeUnit - a TimeUnit determining how to interpret the timeout parameter
handler - A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
Throws:
java.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.

retrievePayloadOfMessages

private static java.lang.Object[] retrievePayloadOfMessages(java.util.List<ActorMessage> messages)
Deprecated. 

receive

protected final void receive(groovy.time.Duration duration,
                             groovy.lang.Closure handler)
                      throws java.lang.InterruptedException
Deprecated. 
Retrieves a message from the message queue, waiting, if necessary, for a message to arrive. The message retrieved from the queue is passed into the handler as the only parameter. A null value is passed into the handler, if the timeout expires

Parameters:
duration - how long to wait before giving up, in units of unit
handler - A closure accepting the retrieved message as a parameter, which will be invoked after a message is received.
Throws:
java.lang.InterruptedException - If the thread is interrupted during the wait. Should propagate up to stop the thread.

handleStart

protected void handleStart()
Deprecated. 
Overrides:
handleStart in class Actor

Copyright © 2008–2010 Václav Pech. All Rights Reserved.