7.4 Shutting Down Dataflow Networks - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.2.1
7.4 Shutting Down Dataflow Networks
Shutting down a network of dataflow processors (operators and selectors) may sometimes be a non-trivial task, especially if you need a generic mechanism that will not leave any messages unprocessed.Dataflow operators and selectors can be terminated in three ways:- by calling the terminate() method on all operators that need to be terminated
- by sending a poisson message
- by setting up a network of activity monitors that will shutdown the network after all messages have been processed
Shutting down the thread poolIf you use a custom PGroup to maintain a thread pool for your dataflow network, you should not forget to shutdown the pool once the network is terminated. Otherwise the thread pool will consume system resources and, in case of using non-daemon threads, it will prevent JVM from exit.
Emergency shutdown
You can call terminate() on any operator/selector to immediately shut it down. Provided you keep track of all your processors, perhaps by adding them to a list, the fastest way to stop the network would be:allMyProcessors*.terminate()
def op1 = operator(inputs: [a, b, c], outputs: [d, e]) {x, y, z -> }def op2 = selector(inputs: [d], outputs: [f, out]) { }def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }[op1, op2, op3]*.terminate() //Terminate all operators by calling the terminate() method on them
op1.join()
op2.join()
op3.join()
Shutting down the whole JVM through System.exit() will also obviously shutdown the dataflow network, however, no lifecycle listeners will be invoked in such cases.
Stopping operators gently
Operators handle incoming messages repeatedly. The only safe moment for stopping an operator without the risk of loosing any messages is right after the operator has finished processing messages and is just about to look for more messages in its incoming pipes. This is exactly what the terminateAfterNextRun() method does. It will schedule the operator for shutdown after the next set of messages gets handled.The unprocessed messages will stay in the input channels, which allows you to handle them later, perhaps with a different operator/selector or in some other way. Using terminateAfterNextRun() you will not loose any input messages. This may be particularly handy when you use a group of operators/selectors to load-balance messages coming from a channel. Once the work-load decreases, the terminateAfterNextRun() method may be used to safely reduce the pool of load-balancing operators.Detecting shutdownOperators and electors offer a handy join() method for those who need to block until the operator terminates.This is the easies way to wait until the whole dataflow network shuts down, irrespective of the shutdown method used.allMyProcessors*.join()
PoisonPill
PoisonPill is a common term for a strategy that uses special-purpose messages to stop entities that receive it. GPars offers the PoisonPill class, which has exactly such effect or operators and selectors. Since PoisonPill is a ControlMessage , it is invisible to operator's body and custom code does not need to handle it in any way. DataflowEventListeners may react to ControlMessages through the controlMessageArrived() handler method.def op1 = operator(inputs: [a, b, c], outputs: [d, e]) {x, y, z -> }def op2 = selector(inputs: [d], outputs: [f, out]) { }def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }a << PoisonPill.instance //Send the poissonop1.join()
op2.join()
op3.join()
Given the potential variety of operator networks and their asynchronous nature, a good termination strategy is that operators and selectors should only ever terminate themselves. All ways of terminating them from outside (either by calling the terminate() method or by sending poisson down the stream) may result in messages being lost somewhere in the pipes, when the reading operators terminate before they fully handle the messages waiting in their input channels.
Immediate poison pill
Especially for selectors to shutdown immediately after receiving a poison pill, a notion of immediate poison pill has been introduced. Since normal, non-immediate poison pills merely close the input channel leaving the selector alive until at least one input channel remains open, the immediate poison pill closes the selector instantly. Obviously, unprocessed messages from the other selector's input channels will not be handled by the selector, once it reads an immediate poison pill.With immediate poison pill you can safely shutdown networks with selectors involved in feedback loops.def op1 = selector(inputs: [a, b, c], outputs: [d, e]) {value, index -> } def op2 = selector(inputs: [d], outputs: [f, out]) { } def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }a << PoisonPill.immediateInstance[op1, op2, op3]*.join()
Poison with counting
When sending a poison pill down the operator network you may need to be notified when all the operators or a specified number of them have been stopped. The CountingPoisonPill class serves exactly this purpose:operator(inputs: [a, b, c], outputs: [d, e]) {x, y, z -> } selector(inputs: [d], outputs: [f, out]) { } prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }//Send the poisson indicating the number of operators than need to be terminated before we can continue final pill = new CountingPoisonPill(3) a << pill//Wait for all operators to terminate pill.join() //At least 3 operators should be terminated by now
//Send the poisson indicating the number of operators than need to be terminated before we can continue final pill = new CountingPoisonPill(3) pill.termination.whenBound {println "Reporting asynchronously that the network has been stopped"} a << pillif (pill.termination.bound) println "Wow, that was quick. We are done already!" else println "Things are being slow today. The network is still running."//Wait for all operators to terminate assert pill.termination.get() //At least 3 operators should be terminated by now
An immediate variant of CountingPoisonPill is also available - ImmediateCountingPoisonPill .ImmediateCountingPoisonPill will safely and instantly shutdown dataflow networks even with selectors involved in feedback loops, which normal non-immediate poison pill would not be able to.def op1 = selector(inputs: [a, b, c], outputs: [d, e]) {value, index -> } def op2 = selector(inputs: [d], outputs: [f, out]) { } def op3 = prioritySelector(inputs: [e, f], outputs: [b]) {value, index -> }final pill = new ImmediateCountingPoisonPill(3) a << pill pill.join()
Poison strategies
To correctly shutdown a network using PoisonPill you must identify the appropriate set of channels to send PoisonPill to. PoisonPill will spread in the network the usual way through the channels and processors down the stream. Typically the right channels to send PoisonPill to will be those that serve as data sources for the network. This may be difficult to achieve for general cases or for complex networks. On the other hand, for networks with a prevalent direction of message flow PoisonPill provides a very straightforward way to shutdown the whole network gracefully.Load-balancing architectures, which use multiple operators reading messages off a shared channel (queue), will also prevent poison shutdown to work properly, since only one of the reading operators will get to read the poison message. You may consider using forked operators instead, by setting the maxForks property to a value greater than 1. Another alternative is to manually split the message stream into multiple channels, each of which would be consumed by one of the original operators.
Termination tips and tricks
Notice that GPars tasks return a DataflowVariable , which gets bound to a value as soon as the task finishes. The 'terminator' operator below leverages the fact that DataflowVariables are implementations of the DataflowReadChannel interface and thus can be consumed by operators. As soon as both tasks finish, the operator will send a PoisonPill down the q channel to stop the consumer as soon as it processes all data.import groovyx.gpars.dataflow.DataflowQueue import groovyx.gpars.group.NonDaemonPGroup def group = new NonDaemonPGroup()final DataflowQueue q = new DataflowQueue()// final destination def customs = group.operator(inputs: [q], outputs: []) { value -> println "Customs received $value" }// big producer def green = group.task { (1..100).each { q << 'green channel ' + it sleep 10 } }// little producer def red = group.task { (1..10).each { q << 'red channel ' + it sleep 15 } }def terminator = group.operator(inputs: [green, red], outputs: []) { t1, t2 -> q << PoisonPill.instance }customs.join() group.shutdown()
Keeping PoisonPill inside a given network
If your network passed values through channels to entities outside of it, you may need to stop the PoisonPill messages on the network boundaries. This can be easily achieved by putting a single-input single-output filtering operator on each such channel.operator(networkLeavingChannel, otherNetworkEnteringChannel) {value -> if (!(value instanceOf PoisonPill)) bindOutput it }
networkLeavingChannel.filter { !(it instanceOf PoisonPill) } into otherNetworkEnteringChannel
Check out the Pipeline DSL section to find out more on pipelines.
Graceful shutdown
GPars provides a generic way to shutdown a dataflow network. Unlike the previously mentioned mechanisms this approach will keep the network running until all the messages get handled and than gracefully shuts all operators down letting you know when this happens. You have to pay a modest performance penalty, though. This is unavoidable since we need to keep track of what's happening inside the network.import groovyx.gpars.dataflow.DataflowBroadcast import groovyx.gpars.dataflow.DataflowQueue import groovyx.gpars.dataflow.operator.component.GracefulShutdownListener import groovyx.gpars.dataflow.operator.component.GracefulShutdownMonitor import groovyx.gpars.group.DefaultPGroup import groovyx.gpars.group.PGroupPGroup group = new DefaultPGroup(10) final a = new DataflowQueue() final b = new DataflowQueue() final c = new DataflowQueue() final d = new DataflowQueue<Object>() final e = new DataflowBroadcast<Object>() final f = new DataflowQueue<Object>() final result = new DataflowQueue<Object>()final monitor = new GracefulShutdownMonitor(100);def op1 = group.operator(inputs: [a, b], outputs: [c], listeners: [new GracefulShutdownListener(monitor)]) {x, y -> sleep 5 bindOutput x + y } def op2 = group.operator(inputs: [c], outputs: [d, e], listeners: [new GracefulShutdownListener(monitor)]) {x -> sleep 10 bindAllOutputs 2*x } def op3 = group.operator(inputs: [d], outputs: [f], listeners: [new GracefulShutdownListener(monitor)]) {x -> sleep 5 bindOutput x + 40 } def op4 = group.operator(inputs: [e.createReadChannel(), f], outputs: [result], listeners: [new GracefulShutdownListener(monitor)]) {x, y -> sleep 5 bindOutput x + y }100.times{a << 10} 100.times{b << 20}final shutdownPromise = monitor.shutdownNetwork()100.times{assert 160 == result.val}shutdownPromise.get() [op1, op2, op3, op4]*.join()group.shutdown()
Please make sure that no new messages enter the dataflow network after the shutdown has been initiated, since this may cause the network to never terminate. The shutdown process should only be started after all data producers have ceased sending additional messages to the monitored network.The shutdownNetwork() method returns a Promise so that you can do the usual set of tricks with it - block waiting for the network to terminate using the get() method, register a callback using the whenBound() method or make it trigger a whole set of activities through the then() method.
Limitations of graceful shutdown
- For GracefulShutdownListener to work correctly, its messageArrived() event handler must see the original value that has arrived through the input channel. Since some event listeners may alter the messages as they pass through the listeners it is advisable to add the GracefulShutdownListener first to the list of listeners on each dataflow processor.
- Also, graceful shutdown will not work for those rare operators that have listeners, which turn control messages into plain value messages in the controlMessageArrived() event handler.
- Third and last, load-balancing architectures, which use multiple operators reading messages off a shared channel (queue), will also prevent graceful shutdown to work properly. You may consider using forked operators instead, by setting the maxForks property to a value greater than 1. Another alternative is to manually split the message stream into multiple channels, each of which would be consumed by one of the original operators.