groovyx.gpars.actor
Class Actor

java.lang.Object
  extended by groovyx.gpars.serial.WithSerialId
      extended by groovyx.gpars.actor.impl.MessageStream
          extended by groovyx.gpars.actor.Actor
All Implemented Interfaces:
java.io.Serializable
Direct Known Subclasses:
AbstractLoopingActor, Actor.RemoteActor, ReplyingMessageStream

public abstract class Actor
extends MessageStream

Actors are active objects, which borrow a thread from a thread pool. The Actor interface provides means to send messages to the actor, start and stop the background thread as well as check its status.

Author:
Vaclav Pech, Alex Tkachman
See Also:
Serialized Form

Nested Class Summary
static class Actor.MyRemoteHandle
           
static class Actor.RemoteActor
           
 
Nested classes/interfaces inherited from class groovyx.gpars.actor.impl.MessageStream
MessageStream.RemoteMessageStream, MessageStream.SendTo
 
Field Summary
protected static java.lang.String ACTOR_HAS_ALREADY_BEEN_STARTED
           
private static java.lang.String AFTER_START
           
static java.lang.String CANNOT_SEND_REPLIES_NO_SENDER_HAS_BEEN_REGISTERED
           
private static java.lang.ThreadLocal<Actor> currentActorPerThread
          Maps each thread to the actor it currently processes.
protected  java.lang.Thread currentThread
           
private static java.lang.Object[] EMPTY_ARGUMENTS
           
private  DataflowExpression<java.lang.Object> joinLatch
           
private static java.lang.String ON_DELIVERY_ERROR
           
private  groovy.lang.Closure onStop
           
protected  groovyx.gpars.group.PGroup parallelGroup
          The parallel group to which the message stream belongs
private static java.lang.String RESPONDS_TO
           
private static long serialVersionUID
           
protected static ActorMessage START_MESSAGE
           
protected static ActorMessage STOP_MESSAGE
           
protected static ActorMessage TERMINATE_MESSAGE
           
static java.lang.String TIMEOUT
           
protected static ActorMessage TIMEOUT_MESSAGE
           
protected static java.util.Timer timer
          Timer holding timeouts for react methods
 
Fields inherited from class groovyx.gpars.serial.WithSerialId
serialHandle
 
Constructor Summary
protected Actor()
           
protected Actor(DataflowExpression<java.lang.Object> joinLatch)
          Constructor to be used by deserialization
protected Actor(DataflowExpression<java.lang.Object> joinLatch, groovyx.gpars.group.PGroup parallelGroup)
           
 
Method Summary
private  boolean callDynamic(java.lang.String method, java.lang.Object[] args)
           
protected  ActorMessage createActorMessage(java.lang.Object message)
           
protected  RemoteHandle createRemoteHandle(SerialHandle handle, SerialContext host)
           
protected static void deregisterCurrentActorWithThread()
          Deregisters the actor registered from the thread
 DataflowExpression<java.lang.Object> getJoinLatch()
          Join-point for this actor
 groovyx.gpars.group.PGroup getParallelGroup()
          Retrieves the group to which the actor belongs
protected  void handleException(java.lang.Throwable exception)
           
protected  void handleInterrupt(java.lang.InterruptedException exception)
           
protected  void handleStart()
           
protected  void handleTermination()
           
protected  void handleTimeout()
           
protected abstract  boolean hasBeenStopped()
           
abstract  boolean isActive()
          Checks the current status of the Actor.
 boolean isActorThread()
          Checks whether the current thread is the actor's current thread.
 void join()
          Joins the actor.
 void join(groovy.time.BaseDuration duration)
          Joins the actor.
 void join(long timeout, java.util.concurrent.TimeUnit unit)
          Joins the actor.
 void join(MessageStream listener)
          Notify listener when finished
 void onStop(groovy.lang.Closure onStop)
          Set on stop handler for this actor
protected static void registerCurrentActorWithThread(Actor currentActor)
          Registers the actor with the current thread
<T> MessageStream
sendAndContinue(T message, groovy.lang.Closure closure)
          Sends a message and execute continuation when reply became available.
 void setParallelGroup(groovyx.gpars.group.PGroup group)
          Sets the parallel group.
abstract  Actor silentStart()
          Starts the Actor without sending the START_MESSAGE message to speed the start-up.
abstract  Actor start()
          Starts the Actor and sends it the START_MESSAGE to run any afterStart handlers.
abstract  Actor stop()
          Send message to stop to the Actor.
protected abstract  ActorMessage sweepNextMessage()
          Removes the head of the message queue
(package private)  java.util.List<ActorMessage> sweepQueue()
          Clears the message queue returning all the messages it held.
abstract  Actor terminate()
          Terminates the Actor.
static Actor threadBoundActor()
          Retrieves the actor registered with the current thread
 
Methods inherited from class groovyx.gpars.actor.impl.MessageStream
call, getRemoteClass, leftShift, send, send, send, sendAndWait, sendAndWait, sendAndWait
 
Methods inherited from class groovyx.gpars.serial.WithSerialId
getOrCreateSerialHandle, writeReplace
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

currentActorPerThread

private static final java.lang.ThreadLocal<Actor> currentActorPerThread
Maps each thread to the actor it currently processes. Used in the send() method to remember the sender of each message for potential replies


serialVersionUID

private static final long serialVersionUID
See Also:
Constant Field Values

CANNOT_SEND_REPLIES_NO_SENDER_HAS_BEEN_REGISTERED

public static final java.lang.String CANNOT_SEND_REPLIES_NO_SENDER_HAS_BEEN_REGISTERED
See Also:
Constant Field Values

joinLatch

private final DataflowExpression<java.lang.Object> joinLatch

parallelGroup

protected volatile groovyx.gpars.group.PGroup parallelGroup
The parallel group to which the message stream belongs


START_MESSAGE

protected static final ActorMessage START_MESSAGE

STOP_MESSAGE

protected static final ActorMessage STOP_MESSAGE

TERMINATE_MESSAGE

protected static final ActorMessage TERMINATE_MESSAGE

AFTER_START

private static final java.lang.String AFTER_START
See Also:
Constant Field Values

RESPONDS_TO

private static final java.lang.String RESPONDS_TO
See Also:
Constant Field Values

ON_DELIVERY_ERROR

private static final java.lang.String ON_DELIVERY_ERROR
See Also:
Constant Field Values

EMPTY_ARGUMENTS

private static final java.lang.Object[] EMPTY_ARGUMENTS

TIMEOUT

public static final java.lang.String TIMEOUT
See Also:
Constant Field Values

TIMEOUT_MESSAGE

protected static final ActorMessage TIMEOUT_MESSAGE

onStop

private volatile groovy.lang.Closure onStop

currentThread

protected volatile java.lang.Thread currentThread

ACTOR_HAS_ALREADY_BEEN_STARTED

protected static final java.lang.String ACTOR_HAS_ALREADY_BEEN_STARTED
See Also:
Constant Field Values

timer

protected static final java.util.Timer timer
Timer holding timeouts for react methods

Constructor Detail

Actor

protected Actor()

Actor

protected Actor(DataflowExpression<java.lang.Object> joinLatch)
Constructor to be used by deserialization

Parameters:
joinLatch - The instance of DataflowExpression to use for join operation

Actor

protected Actor(DataflowExpression<java.lang.Object> joinLatch,
                groovyx.gpars.group.PGroup parallelGroup)
Method Detail

getParallelGroup

public final groovyx.gpars.group.PGroup getParallelGroup()
Retrieves the group to which the actor belongs

Returns:
The group

setParallelGroup

public void setParallelGroup(groovyx.gpars.group.PGroup group)
Sets the parallel group. It can only be invoked before the actor is started.

Parameters:
group - new group

sendAndContinue

public final <T> MessageStream sendAndContinue(T message,
                                               groovy.lang.Closure closure)
Sends a message and execute continuation when reply became available.

Parameters:
message - message to send
closure - closure to execute when reply became available
Returns:
The message that came in reply to the original send.
Throws:
java.lang.InterruptedException - if interrupted while waiting

silentStart

public abstract Actor silentStart()
Starts the Actor without sending the START_MESSAGE message to speed the start-up. The potential custom afterStart handlers won't be run. No messages can be sent or received before an Actor is started.

Returns:
same actor

start

public abstract Actor start()
Starts the Actor and sends it the START_MESSAGE to run any afterStart handlers. No messages can be sent or received before an Actor is started.

Returns:
same actor

stop

public abstract Actor stop()
Send message to stop to the Actor. The actor will finish processing the current message and all unprocessed messages will be passed to the afterStop method, if such exists. No new messages will be accepted since that point. Has no effect if the Actor is not started.

Returns:
same actor

terminate

public abstract Actor terminate()
Terminates the Actor. Unprocessed messages will be passed to the afterStop method, if exists. No new messages will be accepted since that point. Has no effect if the Actor is not started.

Returns:
same actor

isActive

public abstract boolean isActive()
Checks the current status of the Actor.

Returns:
current status of the Actor.

join

public final void join()
                throws java.lang.InterruptedException
Joins the actor. Waits for its termination.

Throws:
java.lang.InterruptedException - when interrupted while waiting

join

public final void join(MessageStream listener)
Notify listener when finished

Parameters:
listener - listener to notify
Throws:
java.lang.InterruptedException - if interrupted while waiting

join

public final void join(long timeout,
                       java.util.concurrent.TimeUnit unit)
                throws java.lang.InterruptedException
Joins the actor. Waits for its termination.

Parameters:
timeout - timeout
unit - units of timeout
Throws:
java.lang.InterruptedException - if interrupted while waiting

join

public final void join(groovy.time.BaseDuration duration)
                throws java.lang.InterruptedException
Joins the actor. Waits for its termination.

Parameters:
duration - timeout to wait
Throws:
java.lang.InterruptedException - if interrupted while waiting

getJoinLatch

public DataflowExpression<java.lang.Object> getJoinLatch()
Join-point for this actor

Returns:
The DataflowExpression instance, which is used to join this actor

registerCurrentActorWithThread

protected static void registerCurrentActorWithThread(Actor currentActor)
Registers the actor with the current thread

Parameters:
currentActor - The actor to register

deregisterCurrentActorWithThread

protected static void deregisterCurrentActorWithThread()
Deregisters the actor registered from the thread


threadBoundActor

public static Actor threadBoundActor()
Retrieves the actor registered with the current thread

Returns:
The associated actor

createActorMessage

protected final ActorMessage createActorMessage(java.lang.Object message)

hasBeenStopped

protected abstract boolean hasBeenStopped()

createRemoteHandle

protected RemoteHandle createRemoteHandle(SerialHandle handle,
                                          SerialContext host)
Overrides:
createRemoteHandle in class WithSerialId

handleStart

protected void handleStart()

handleTermination

protected void handleTermination()

onStop

public final void onStop(groovy.lang.Closure onStop)
Set on stop handler for this actor

Parameters:
onStop - The code to invoke when stopping

handleException

protected void handleException(java.lang.Throwable exception)

handleInterrupt

protected void handleInterrupt(java.lang.InterruptedException exception)

handleTimeout

protected void handleTimeout()

callDynamic

private boolean callDynamic(java.lang.String method,
                            java.lang.Object[] args)

sweepNextMessage

protected abstract ActorMessage sweepNextMessage()
Removes the head of the message queue

Returns:
The head message, or null, if the message queue is empty

sweepQueue

final java.util.List<ActorMessage> sweepQueue()
Clears the message queue returning all the messages it held.

Returns:
The messages stored in the queue

isActorThread

public final boolean isActorThread()
Checks whether the current thread is the actor's current thread.

Returns:
True if invoked from within an actor thread

Copyright © 2008–2010 Václav Pech. All Rights Reserved.