(Quick Reference)

7.9 Kanban Flow - Reference Documentation

Authors: The Whole GPars Gang

Version: 1.1.0

7.9 Kanban Flow

APIs: KanbanFlow | KanbanLink | KanbanTray | ProcessingNode

KanbanFlow

A KanbanFlow is a composed object that uses dataflow abstractions to define dependencies between multiple concurrent producer and consumer operators.

Each link between a producer and a consumer is defined by a KanbanLink .

Inside each KanbanLink, the communication between producer and consumer follows the KanbanFlow pattern as described in The KanbanFlow Pattern (recommended read). They use objects of type KanbanTray to send products downstream and signal requests for further products back to the producer.

The figure below shows a KanbanLink with one producer, one consumer and five trays numbered 0 to 4. Tray number 0 has been used to take a product from producer to consumer, has been emptied by the consumer and is now sent back to the producer's input queue. Trays 1 and 2 wait carry products waiting for consumption, trays 3 and 4 wait to be used by producers.

A KanbanFlow object links producers to consumers thus creating KanbanLink objects. In the course of this activity, a second link may be constructed where the producer is the same object that acted as the consumer in a formerly created link such that the two links become connected to build a chain.

Here is an example of a KanbanFlow with only one link, e.g. one producer and one consumer. The producer always sends the number 1 downstream and the consumer prints this number.

import static groovyx.gpars.dataflow.ProcessingNode.node
import groovyx.gpars.dataflow.KanbanFlow

def producer = node { down -> down 1 } def consumer = node { up -> println up.take() }

new KanbanFlow().with { link producer to consumer start() // run for a while stop() }

For putting a product into a tray and sending the tray downstream, one can either use the send() method, the << operator, or use the tray as a method object. The following lines are equivalent:

node { down -> down.send 1 }
node { down -> down << 1 }
node { down -> down 1 }

When a product is taken from the input tray with the take() method, the empty tray is automatically released.

You should call take() only once!

If you prefer to not using an empty tray for sending products downstream (as typically the case when a ProcessingNode acts as a filter), you must release the tray in order to keep it in play. Otherwise, the number of trays in the system decreases. You can release a tray either by calling the release() method or by using the ~ operator (think "shake it off"). The following lines are equivalent:

node { down -> down.release() }
node { down -> ~down }

Trays are automatically released, if you call any of the take() or send() methods.

Various linking structures

In addition to a linear chains, a KanbanFlow can also link a single producer to multiple consumers (tree) or multiple producers to a single consumer (collector) or any combination of the above that results in a directed acyclic graph (DAG).

The KanbanFlowTest class has many examples for such structures, including scenarios where a single producer delegates work to multiple consumers with

  • a work-stealing strategy where all consumers get their pick from the downstream,
  • a master-slave strategy where a producer chooses from the available consumers, and
  • a broadcast strategy where a producer sends all products to all consumers.

Cycles are forbidden by default but when enabled, they can be used as so-called generators. A producer can even be his own consumer that increases a product value in every cycle. The generator itself remains state-free since the value is only stored as a product riding on a tray. Such a generator can be used for e.g. lazy sequences or as a the "heartbeat" of a subsequent flow.

The approach of generator "loops" can equally be applied to collectors, where a collector does not maintain any internal state but sends a collection onto itself, adding products at each call.

Generally speaking, a ProcessingNode can link to itself for exporting state to the tray/product that it sends to itself. Access to the product is then thread-safe by design.

Composing KanbanFlows

Just as KanbanLink objects can be chained together to form a KanbanFlow , flows themselves can be composed again to form new greater flows from existing smaller ones.

def firstFlow = new KanbanFlow()
def producer  = node(counter)
def consumer  = node(repeater)
firstFlow.link(producer).to(consumer)

def secondFlow = new KanbanFlow() def producer2 = node(repeater) def consumer2 = node(reporter) secondFlow.link(producer2).to(consumer2)

flow = firstFlow + secondFlow

flow.start()

Customizing concurrency characteristics

The amount of concurrency in a kanban system is determined by the number of trays (sometimes called WIP = work in progress). With no trays in the streams, the system does nothing.

  • With one tray only, the system is confined to sequential execution.
  • With more trays, concurrency begins.
  • With more trays than available processing units, the system begins to waste resources.

The number of trays can be controlled in various ways. They are typically set when starting the flow.

flow.start(0) // start without trays
flow.start(1) // start with one tray per link in the flow
flow.start()  // start with the optimal number of trays

In addition to the trays, the KanbanFlow may also be constrained by its underlying ThreadPool . A pool of size 1 for example will not allow much concurrency.

KanbanFlows use a default pool that is dimensioned by the number of available cores. This can be customized by setting the pooledGroup property.

Test:
KanbanFlowTest
Demos:
DemoKanbanFlow
DemoKanbanFlowBroadcast
DemoKanbanFlowCycle
DemoKanbanLazyPrimeSequenceLoops