(Quick Reference)

7.8 Synchronous Variables and Channels - Reference Documentation

Authors: The Whole GPars Gang

Version: 1.0.0

7.8 Synchronous Variables and Channels

When using asynchronous dataflow channels, apart from the fact that readers have to wait for a value to be available for consumption, the communicating parties remain completely independent. Writers don't wait for their messages to get consumed. Readers obtain values immediately as they come and ask. Synchronous channels, on the other hand, can synchronize writers with the readers as well as multiple readers among themselves. This is particularly useful when you need to increase the level of determinism. The writer-to-reader partial ordering imposed by asynchronous communication is complemented with reader-to-writer partial ordering, when using synchronous communication. In other words, you are guaranteed that whatever the reader did before reading a value from a synchronous channel preceded whatever the writer did after writing the value. Also, with synchronous communication writers can never get too far ahead of readers, which simplifies reasoning about the system and reduces the need to manage data production speed in order to avoid system overload.

Synchronous dataflow queue

The SyncDataflowQueue class should be used for point-to-point (1:1 or n:1) communication. Each message written to the queue will be consumed by exactly one reader. Writers are blocked until their message is consumed, readers are blocked until there's a value available for them to read.

import groovyx.gpars.dataflow.SyncDataflowQueue
import groovyx.gpars.group.NonDaemonPGroup

/** * Shows how synchronous dataflow queues can be used to throttle fast producer when serving data to a slow consumer. * Unlike when using asynchronous channels, synchronous channels block both the writer and the readers until all parties are ready to exchange messages. */

def group = new NonDaemonPGroup()

final SyncDataflowQueue channel = new SyncDataflowQueue()

def producer = group.task { (1..30).each { channel << it println "Just sent $it" } channel << -1 }

def consumer = group.task { while (true) { sleep 500 //simulating a slow consumer final Object msg = channel.val if (msg == -1) return println "Received $msg" } }

consumer.join()

group.shutdown()

Synchronous dataflow broadcast

The SyncDataflowBroadcast class should be used for publish-subscribe (1:n or n:m) communication. Each message written to the broadcast will be consumed by all subscribed readers. Writers are blocked until their message is consumed by all readers, readers are blocked until there's a value available for them to read and all the other subscribed readers ask for the message as well. With SyncDataflowBroadcast you get all readers processing the same message at the same time and waiting for one-another before getting the next one.

import groovyx.gpars.dataflow.SyncDataflowBroadcast
import groovyx.gpars.group.NonDaemonPGroup

/** * Shows how synchronous dataflow broadcasts can be used to throttle fast producer when serving data to slow consumers. * Unlike when using asynchronous channels, synchronous channels block both the writer and the readers until all parties are ready to exchange messages. */

def group = new NonDaemonPGroup()

final SyncDataflowBroadcast channel = new SyncDataflowBroadcast()

def subscription1 = channel.createReadChannel() def fastConsumer = group.task { while (true) { sleep 10 //simulating a fast consumer final Object msg = subscription1.val if (msg == -1) return println "Fast consumer received $msg" } }

def subscription2 = channel.createReadChannel() def slowConsumer = group.task { while (true) { sleep 500 //simulating a slow consumer final Object msg = subscription2.val if (msg == -1) return println "Slow consumer received $msg" } }

def producer = group.task { (1..30).each { println "Sending $it" channel << it println "Sent $it" } channel << -1 }

[fastConsumer, slowConsumer]*.join()

group.shutdown()

Synchronous dataflow variable

Unlike DataflowVariable , which is asynchronous and only blocks the readers until a value is bound to the variable, the SyncDataflowVariable class provides a one-shot data exchange mechanism that blocks the writer and all readers until a specified number of waiting parties is reached.

import groovyx.gpars.dataflow.SyncDataflowVariable
import groovyx.gpars.group.NonDaemonPGroup

final NonDaemonPGroup group = new NonDaemonPGroup()

final SyncDataflowVariable value = new SyncDataflowVariable(2) //two readers required to exchange the message

def writer = group.task { println "Writer about to write a value" value << 'Hello' println "Writer has written the value" }

def reader = group.task { println "Reader about to read a value" println "Reader has read the value: ${value.val}" }

def slowReader = group.task { sleep 5000 println "Slow reader about to read a value" println "Slow reader has read the value: ${value.val}" }

[reader, slowReader]*.join()

group.shutdown()