5.5 Classic Examples - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.1.0
5.5 Classic Examples
A few examples on Actors use
Examples
- The Sieve of Eratosthenes
- Sleeping Barber
- Dining Philosophers
- Word Sort
- Load Balancer
The Sieve of Eratosthenes
Problem descriptionimport groovyx.gpars.actor.DynamicDispatchActor/** * Demonstrates concurrent implementation of the Sieve of Eratosthenes using actors * * In principle, the algorithm consists of concurrently run chained filters, * each of which detects whether the current number can be divided by a single prime number. * (generate nums 1, 2, 3, 4, 5, ...) -> (filter by mod 2) -> (filter by mod 3) -> (filter by mod 5) -> (filter by mod 7) -> (filter by mod 11) -> (caution! Primes falling out here) * The chain is built (grows) on the fly, whenever a new prime is found. */int requestedPrimeNumberBoundary = 1000final def firstFilter = new FilterActor(2).start()/** * Generating candidate numbers and sending them to the actor chain */ (2..requestedPrimeNumberBoundary).each { firstFilter it } firstFilter.sendAndWait 'Poison'/** * Filter out numbers that can be divided by a single prime number */ final class FilterActor extends DynamicDispatchActor { private final int myPrime private def follower def FilterActor(final myPrime) { this.myPrime = myPrime; } /** * Try to divide the received number with the prime. If the number cannot be divided, send it along the chain. * If there's no-one to send it to, I'm the last in the chain, the number is a prime and so I will create and chain * a new actor responsible for filtering by this newly found prime number. */ def onMessage(int value) { if (value % myPrime != 0) { if (follower) follower value else { println "Found $value" follower = new FilterActor(value).start() } } } /** * Stop the actor on poisson reception */ def onMessage(def poisson) { if (follower) { def sender = sender follower.sendAndContinue(poisson, {this.stop(); sender?.send('Done')}) //Pass the poisson along and stop after a reply } else { //I am the last in the chain stop() reply 'Done' } } }
Sleeping Barber
Problem descriptionimport groovyx.gpars.group.DefaultPGroup import groovyx.gpars.actor.DefaultActor import groovyx.gpars.group.DefaultPGroup import groovyx.gpars.actor.Actorfinal def group = new DefaultPGroup()final def barber = group.actor { final def random = new Random() loop { react {message -> switch (message) { case Enter: message.customer.send new Start() println "Barber: Processing customer ${message.customer.name}" doTheWork(random) message.customer.send new Done() reply new Next() break case Wait: println "Barber: No customers. Going to have a sleep" break } } } }private def doTheWork(Random random) { Thread.sleep(random.nextInt(10) * 1000) }final Actor waitingRoomwaitingRoom = group.actor { final int capacity = 5 final List<Customer> waitingCustomers = [] boolean barberAsleep = true loop { react {message -> switch (message) { case Enter: if (waitingCustomers.size() == capacity) { reply new Full() } else { waitingCustomers << message.customer if (barberAsleep) { assert waitingCustomers.size() == 1 barberAsleep = false waitingRoom.send new Next() } else reply new Wait() } break case Next: if (waitingCustomers.size()>0) { def customer = waitingCustomers.remove(0) barber.send new Enter(customer:customer) } else { barber.send new Wait() barberAsleep = true } } } }}class Customer extends DefaultActor { String name Actor localBarbers void act() { localBarbers << new Enter(customer:this) loop { react {message -> switch (message) { case Full: println "Customer: $name: The waiting room is full. I am leaving." stop() break case Wait: println "Customer: $name: I will wait." break case Start: println "Customer: $name: I am now being served." break case Done: println "Customer: $name: I have been served." stop(); break } } } } }class Enter { Customer customer } class Full {} class Wait {} class Next {} class Start {} class Done {}def customers = [] customers << new Customer(name:'Joe', localBarbers:waitingRoom).start() customers << new Customer(name:'Dave', localBarbers:waitingRoom).start() customers << new Customer(name:'Alice', localBarbers:waitingRoom).start()sleep 15000 customers << new Customer(name: 'James', localBarbers: waitingRoom).start() sleep 5000 customers*.join() barber.stop() waitingRoom.stop()
Dining Philosophers
Problem descriptionimport groovyx.gpars.actor.DefaultActor import groovyx.gpars.actor.ActorsActors.defaultActorPGroup.resize 5final class Philosopher extends DefaultActor { private Random random = new Random() String name def forks = [] void act() { assert 2 == forks.size() loop { think() forks*.send new Take() def messages = [] react {a -> messages << [a, sender] react {b -> messages << [b, sender] if ([a, b].any {Rejected.isCase it}) { println "$name: tOops, can't get my forks! Giving up." final def accepted = messages.find {Accepted.isCase it[0]} if (accepted!=null) accepted[1].send new Finished() } else { eat() reply new Finished() } } } } } void think() { println "$name: tI'm thinking" Thread.sleep random.nextInt(5000) println "$name: tI'm done thinking" } void eat() { println "$name: tI'm EATING" Thread.sleep random.nextInt(2000) println "$name: tI'm done EATING" } }final class Fork extends DefaultActor { String name boolean available = true void act() { loop { react {message -> switch (message) { case Take: if (available) { available = false reply new Accepted() } else reply new Rejected() break case Finished: assert !available available = true break default: throw new IllegalStateException("Cannot process the message: $message") } } } } }final class Take {} final class Accepted {} final class Rejected {} final class Finished {}def forks = [ new Fork(name:'Fork 1'), new Fork(name:'Fork 2'), new Fork(name:'Fork 3'), new Fork(name:'Fork 4'), new Fork(name:'Fork 5') ]def philosophers = [ new Philosopher(name:'Joe', forks:[forks[0], forks[1]]), new Philosopher(name:'Dave', forks:[forks[1], forks[2]]), new Philosopher(name:'Alice', forks:[forks[2], forks[3]]), new Philosopher(name:'James', forks:[forks[3], forks[4]]), new Philosopher(name:'Phil', forks:[forks[4], forks[0]]), ]forks*.start() philosophers*.start()sleep 10000 forks*.stop() philosophers*.stop()
Word sort
Given a folder name, the script will sort words in all files in the folder. The SortMaster actor creates a given number of WordSortActors , splits among them the files to sort words in and collects the results.Inspired by Scala Concurrency blog post by Michael Galpin//Messages private final class FileToSort { String fileName } private final class SortResult { String fileName; List<String> words }//Worker actor class WordSortActor extends DefaultActor { private List<String> sortedWords(String fileName) { parseFile(fileName).sort {it.toLowerCase()} } private List<String> parseFile(String fileName) { List<String> words = [] new File(fileName).splitEachLine(' ') {words.addAll(it)} return words } void act() { loop { react {message -> switch (message) { case FileToSort: println "Sorting file=${message.fileName} on thread ${Thread.currentThread().name}" reply new SortResult(fileName: message.fileName, words: sortedWords(message.fileName)) } } } } }//Master actor final class SortMaster extends DefaultActor { String docRoot = '/' int numActors = 1 List<List<String>> sorted = [] private CountDownLatch startupLatch = new CountDownLatch(1) private CountDownLatch doneLatch private void beginSorting() { int cnt = sendTasksToWorkers() doneLatch = new CountDownLatch(cnt) } private List createWorkers() { return (1..numActors).collect {new WordSortActor().start()} } private int sendTasksToWorkers() { List<Actor> workers = createWorkers() int cnt = 0 new File(docRoot).eachFile { workers[cnt % numActors] << new FileToSort(fileName: it) cnt += 1 } return cnt } public void waitUntilDone() { startupLatch.await() doneLatch.await() } void act() { beginSorting() startupLatch.countDown() loop { react { switch (it) { case SortResult: sorted << it.words doneLatch.countDown() println "Received results for file=${it.fileName}" } } } } }//start the actors to sort words def master = new SortMaster(docRoot: 'c:/tmp/Logs/', numActors: 5).start() master.waitUntilDone() println 'Done'File file = new File("c:/tmp/Logs/sorted_words.txt") file.withPrintWriter { printer -> master.sorted.each { printer.println it } }
Load Balancer
Demonstrates work balancing among adaptable set of workers. The load balancer receives tasks and queues them in a temporary task queue. When a worker finishes his assignment, it asks the load balancer for a new task.If the load balancer doesn't have any tasks available in the task queue, the worker is stopped. If the number of tasks in the task queue exceeds certain limit, a new worker is created to increase size of the worker pool.import groovyx.gpars.actor.Actor import groovyx.gpars.actor.DefaultActor/** * Demonstrates work balancing among adaptable set of workers. * The load balancer receives tasks and queues them in a temporary task queue. * When a worker finishes his assignment, it asks the load balancer for a new task. * If the load balancer doesn't have any tasks available in the task queue, the worker is stopped. * If the number of tasks in the task queue exceeds certain limit, a new worker is created * to increase size of the worker pool. */final class LoadBalancer extends DefaultActor { int workers = 0 List taskQueue = [] private static final QUEUE_SIZE_TRIGGER = 10 void act() { loop { react { message -> switch (message) { case NeedMoreWork: if (taskQueue.size() == 0) { println 'No more tasks in the task queue. Terminating the worker.' reply DemoWorker.EXIT workers -= 1 } else reply taskQueue.remove(0) break case WorkToDo: taskQueue << message if ((workers == 0) || (taskQueue.size() >= QUEUE_SIZE_TRIGGER)) { println 'Need more workers. Starting one.' workers += 1 new DemoWorker(this).start() } } println "Active workers=${workers}tTasks in queue=${taskQueue.size()}" } } } }final class DemoWorker extends DefaultActor { final static Object EXIT = new Object() private static final Random random = new Random() Actor balancer def DemoWorker(balancer) { this.balancer = balancer } void act() { loop { this.balancer << new NeedMoreWork() react { switch (it) { case WorkToDo: processMessage(it) break case EXIT: terminate() } } } } private void processMessage(message) { synchronized (random) { Thread.sleep random.nextInt(5000) } } } final class WorkToDo {} final class NeedMoreWork {}final Actor balancer = new LoadBalancer().start()//produce tasks for (i in 1..20) { Thread.sleep 100 balancer << new WorkToDo() }//produce tasks in a parallel thread Thread.start { for (i in 1..10) { Thread.sleep 1000 balancer << new WorkToDo() } }Thread.sleep 35000 //let the queues get empty balancer << new WorkToDo() balancer << new WorkToDo() Thread.sleep 10000balancer.stop() balancer.join()