5.1 Actors Principles - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.0-SNAPSHOT
5.1 Actors Principles
Actors share a pool of threads, which are dynamically assigned to actors when the actors need to react to messages sent to them. The threads are returned back to the pool once a message has been processed and the actor is idle waiting for some more messages to arrive. Actors become detached from the underlying threads and so a relatively small thread pool can serve potentially unlimited number of actors. Virtually unlimited scalability in number of actors is the main advantage of event-based actors , which are detached from the underlying physical threads.Here are some examples of how to use actors. This is how you create an actor that prints out all messages that it receives.import static groovyx.gpars.actor.Actors.*def console = actor { loop { react { println it } }
class CustomActor extends DefaultActor { @Override protected void act() { loop { react { println it } } } }def console=new CustomActor() console.start()
console.send('Message') console 'Message' console.sendAndWait 'Message' //Wait for a reply console.sendAndContinue 'Message', {reply -> println "I received reply: $reply"} //Forward the reply to a function
Creating an asynchronous service
import static groovyx.gpars.actor.Actors.*final def decryptor = actor { loop { react {String message-> reply message.reverse() } } }def console = actor { decryptor.send 'lellarap si yvoorG' react { println 'Decrypted message: ' + it } }console.join()
def friend = Actors.actor { react { //this doesn't reply -> caller won't receive any answer in time println it //reply 'Hello' //uncomment this to answer conversation react { println it } } }def me = Actors.actor { friend.send('Hi') //wait for answer 1sec react(1000) {msg -> if (msg == Actor.TIMEOUT) { friend.send('I see, busy as usual. Never mind.') stop() } else { //continue conversation println "Thank you for $msg" } } }me.join()
def friend = Actors.actor { react { //this doesn't reply -> caller won't receive any answer in time println it //reply 'Hello' //uncomment this to answer conversation react { println it } } }def me = Actors.actor { friend.send('Hi') delegate.metaClass.onTimeout = {-> friend.send('I see, busy as usual. Never mind.') stop() } //wait for answer 1sec react(1000) {msg -> if (msg != Actor.TIMEOUT) { //continue conversation println "Thank you for $msg" } } }me.join()
class MyActor extends DefaultActor { public void onTimeout() { … } protected void act() { … } }
Actors guarantee thread-safety for non-thread-safe code
Actors guarantee that always at most one thread processes the actor's body at a time and also under the covers the memory gets synchronized each time a thread gets assigned to an actor so the actor's state can be safely modified by code in the body without any other extra (synchronization or locking) effort .class MyCounterActor extends DefaultActor { private Integer counter = 0 protected void act() { loop { react { counter++ } } } }
Simple calculator
A little bit more realistic example of an event-driven actor that receives two numeric messages, sums them up and sends the result to the console actor.import groovyx.gpars.group.DefaultPGroup//not necessary, just showing that a single-threaded pool can still handle multiple actors def group = new DefaultPGroup(1);final def console = group.actor { loop { react { println 'Result: ' + it } } }final def calculator = group.actor { react {a -> react {b -> console.send(a + b) } } }calculator.send 2 calculator.send 3calculator.join() group.shutdown()
Concurrent Merge Sort Example
For comparison I'm also including a more involved example performing a concurrent merge sort of a list of integers using actors. You can see that thanks to flexibility of Groovy we came pretty close to the Scala model, although I still miss Scala pattern matching for message handling.import groovyx.gpars.group.DefaultPGroup import static groovyx.gpars.actor.Actors.actorClosure createMessageHandler(def parentActor) { return { react {List<Integer> message -> assert message != null switch (message.size()) { case 0..1: parentActor.send(message) break case 2: if (message[0] <= message[1]) parentActor.send(message) else parentActor.send(message[-1..0]) break default: def splitList = split(message) def child1 = actor(createMessageHandler(delegate)) def child2 = actor(createMessageHandler(delegate)) child1.send(splitList[0]) child2.send(splitList[1]) react {message1 -> react {message2 -> parentActor.send merge(message1, message2) } } } } } }def console = new DefaultPGroup(1).actor { react { println "Sorted array:t${it}" System.exit 0 } }def sorter = actor(createMessageHandler(console)) sorter.send([1, 5, 2, 4, 3, 8, 6, 7, 3, 9, 5, 3]) console.join()def split(List<Integer> list) { int listSize = list.size() int middleIndex = listSize / 2 def list1 = list[0..<middleIndex] def list2 = list[middleIndex..listSize - 1] return [list1, list2] }List<Integer> merge(List<Integer> a, List<Integer> b) { int i = 0, j = 0 final int newSize = a.size() + b.size() List<Integer> result = new ArrayList<Integer>(newSize) while ((i < a.size()) && (j < b.size())) { if (a[i] <= b[j]) result << a[i++] else result << b[j++] } if (i < a.size()) result.addAll(a[i..-1]) else result.addAll(b[j..-1]) return result }
Actor lifecycle methods
Each Actor can define lifecycle observing methods, which will be called whenever a certain lifecycle event occurs.- afterStart() - called right after the actor has been started.
- afterStop(List undeliveredMessages) - called right after the actor is stopped, passing in all the unprocessed messages from the queue.
- onInterrupt(InterruptedException e) - called when the actor's thread gets interrupted. Thread interruption will result in the stopping the actor in any case.
- onTimeout() - called when no messages are sent to the actor within the timeout specified for the currently blocking react method.
- onException(Throwable e) - called when an exception occurs in the actor's event handler. Actor will stop after return from this method.
class MyActor extends DefaultActor { public void afterStart() { … } public void onTimeout() { … } protected void act() { … } }
def myActor = actor { delegate.metaClass.onException = { log.error('Exception occurred', it) }… }
To help performance, you may consider using the silentStart() method instead of start() when starting a DynamicDispatchActor or a ReactiveActor . Calling silentStart() will by-pass some of the start-up machinery and as a result will also avoid calling the afterStart() method. Due to its stateful nature, DefaultActor cannot be started silently.
Pool management
Actors can be organized into groups and as a default there's always an application-wide pooled actor group available. And just like the Actors abstract factory can be used to create actors in the default group, custom groups can be used as abstract factories to create new actors instances belonging to these groups.def myGroup = new DefaultPGroup()def actor1 = myGroup.actor {
…
}def actor2 = myGroup.actor {
…
}
def myGroup = new DefaultPGroup(10) //the pool will contain 10 threads
… (n+1 threads in the default pool after startup)Actors.defaultActorPGroup.resize 1 //use one-thread pool… (1 thread in the pool)Actors.defaultActorPGroup.resetDefaultSize()… (n+1 threads in the pool)Actors.defaultActorPGroup.shutdown()
def daemonGroup = new DefaultPGroup()def actor1 = daemonGroup.actor { … }def nonDaemonGroup = new NonDaemonPGroup()def actor2 = nonDaemonGroup.actor { … }class MyActor { def MyActor() { this.parallelGroup = nonDaemonGroup } void act() {...} }
def coreActors = new NonDaemonPGroup(5) //5 non-daemon threads pool def helperActors = new DefaultPGroup(1) //1 daemon thread pooldef priceCalculator = coreActors.actor { … }def paymentProcessor = coreActors.actor { … }def emailNotifier = helperActors.actor { … }def cleanupActor = helperActors.actor { … }//increase size of the core actor group coreActors.resize 6//shutdown the group's pool once you no longer need the group to release resources helperActors.shutdown()
Common trap: App terminates while actors do not receive messages
Most likely you're using daemon threads and pools, which is the default setting, and your main thread finishes. Calling actor.join() on any, some or all of your actors would block the main thread until the actor terminates and thus keep all your actors running. Alternatively use instances of NonDaemonPGroup and assign some of your actors to these groups.def nonDaemonGroup = new NonDaemonPGroup()
def myActor = nonDaemonGroup.actor {...}
def nonDaemonGroup = new NonDaemonPGroup()class MyActor extends DefaultActor { def MyActor() { this.parallelGroup = nonDaemonGroup } void act() {...} }def myActor = new MyActor()
Blocking Actors
Instead of event-driven continuation-styled actors, you may in some scenarios prefer using blocking actors. Blocking actors hold a single pooled thread for their whole life-time including the time when waiting for messages. They avoid some of the thread management overhead, since they never fight for threads after start, and also they let you write straight code without the necessity of continuation style, since they only do blocking message reads via the receive method. Obviously the number of blocking actors running concurrently is limited by the number of threads available in the shared pool. On the other hand, blocking actors typically provide better performance compared to continuation-style actors, especially when the actor's message queue rarely gets empty.def decryptor = blockingActor { while (true) { receive {message -> if (message instanceof String) reply message.reverse() else stop() } } }def console = blockingActor { decryptor.send 'lellarap si yvoorG' println 'Decrypted message: ' + receive() decryptor.send false }[decryptor, console]*.join()