7.5 Pipeline DSL - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.0-SNAPSHOT
7.5 Pipeline DSL
A DSL for building operators pipelines
Building dataflow networks can be further simplified. GPars offers handy shortcuts for the common scenario of building (mostly linear) pipelines of operators.def toUpperCase = {s -> s.toUpperCase()}final DataflowReadChannel encrypt = new DataflowQueue() final DataflowReadChannel encrypted = encrypt | toUpperCase | {it.reverse()} | {'###encrypted###' + it + '###'}encrypt << "I need to keep this message secret!" encrypt << "GPars can build linear operator pipelines really easily"println encrypted.val println encrypted.val
def toUpperCase = {s -> s.toUpperCase()}final DataflowReadChannel encrypt = new DataflowQueue() final DataflowReadChannel encrypted = encrypt.chainWith toUpperCase chainWith {it.reverse()} chainWith {'###encrypted###' + it + '###'}encrypt << "I need to keep this message secret!" encrypt << "GPars can build linear operator pipelines really easily"println encrypted.val println encrypted.val
Combining pipelines with straight operators
Since each operator pipeline has an entry and an exit channel, pipelines can be wired into more complex operator networks. Only your imagination can limit your ability to mix pipelines with channels and operators in the same network definitions.def toUpperCase = {s -> s.toUpperCase()} def save = {text -> //Just pretending to be saving the text to disk, database or whatever println 'Saving ' + text }final DataflowReadChannel toEncrypt = new DataflowQueue() final DataflowReadChannel encrypted = toEncrypt.chainWith toUpperCase chainWith {it.reverse()} chainWith {'###encrypted###' + it + '###'}final DataflowQueue fork1 = new DataflowQueue() final DataflowQueue fork2 = new DataflowQueue() splitter(encrypted, [fork1, fork2]) //Split the data flowfork1.chainWith save //Hook in the save operation//Hook in a sneaky decryption pipeline final DataflowReadChannel decrypted = fork2.chainWith {it[15..-4]} chainWith {it.reverse()} chainWith {it.toLowerCase()} .chainWith {'Groovy leaks! Check out a decrypted secret message: ' + it}toEncrypt << "I need to keep this message secret!" toEncrypt << "GPars can build operator pipelines really easy"println decrypted.val println decrypted.val
The type of the channel is preserved across the whole pipeline. E.g. if you start chaining off a synchronous channel, all the channels in the pipeline will be synchronous. In that case, obviously, the whole chain blocks, including the writer who writes into the channel at head, until someone reads data off the tail of the pipeline.final SyncDataflowQueue queue = new SyncDataflowQueue() final result = queue.chainWith {it * 2}.chainWith {it + 1} chainWith {it * 100}Thread.start { 5.times { println result.val } }queue << 1 queue << 2 queue << 3 queue << 4 queue << 5
Joining pipelines
Two pipelines (or channels) can be connected using the into() method:final DataflowReadChannel encrypt = new DataflowQueue() final DataflowWriteChannel messagesToSave = new DataflowQueue() encrypt.chainWith toUpperCase chainWith {it.reverse()} into messagesToSavetask { encrypt << "I need to keep this message secret!" encrypt << "GPars can build operator pipelines really easy" }task { 2.times { println "Saving " + messagesToSave.val } }
Forking the data flow
When a need comes to copy the output of a pipeline/channel into more than one following pipeline/channel, the split() method will help you:final DataflowReadChannel encrypt = new DataflowQueue() final DataflowWriteChannel messagesToSave = new DataflowQueue() final DataflowWriteChannel messagesToLog = new DataflowQueue()encrypt.chainWith toUpperCase chainWith {it.reverse()}.split(messagesToSave, messagesToLog)
Tapping into the pipeline
Like split() the tap() method allows you to fork the data flow into multiple channels. Tapping, however, is slightly more convenient in some scenarios, since it treats one of the two new forks as the successor of the pipeline.queue.chainWith {it * 2}.tap(logChannel).chainWith{it + 1}.tap(logChannel).into(PrintChannel)
Merging channels
Merging allows you to join multiple read channels as inputs for a single dataflow operator. The function passed as the second argument needs to accept as many arguments as there are channels being merged - each will hold a value of the corresponding channel.maleChannel.merge(femaleChannel) {m, f -> m.marry(f)}.into(mortgageCandidatesChannel)
Separation
Separation is the opposite operation to merge . The supplied closure returns a list of values, each of which will be output into an output channel with the corresponding position index.queue1.separate([queue2, queue3, queue4]) {a -> [a-1, a, a+1]}
Choices
The binaryChoice() and choice() methods allow you to send a value to one out of two (or many) output channels, as indicated by the return value from a closure.queue1.binaryChoice(queue2, queue3) {a -> a > 0} queue1.choice([queue2, queue3, queue4]) {a -> a % 3}
Filtering
The filter() method allows to filter data in the pipeline using boolean predicates.final DataflowQueue queue1 = new DataflowQueue() final DataflowQueue queue2 = new DataflowQueue() final odd = {num -> num % 2 != 0 } queue1.filter(odd) into queue2 (1..5).each {queue1 << it} assert 1 == queue2.val assert 3 == queue2.val assert 5 == queue2.val
Null values
If a chained function returns a null value, it is normally passed along the pipeline as a valid value. To indicate to the operator that no value should be passed further down the pipeline, a NullObject.nullObject instance must be returned.final DataflowQueue queue1 = new DataflowQueue() final DataflowQueue queue2 = new DataflowQueue() final odd = {num -> if (num == 5) return null //null values are normally passed on if (num % 2 != 0) return num else return NullObject.nullObject //this value gets blocked } queue1.chainWith odd into queue2 (1..5).each {queue1 << it} assert 1 == queue2.val assert 3 == queue2.val assert null == queue2.val
Customizing the thread pools
All of the Pipeline DSL methods allow for custom thread pools or PGroups to be specified:channel | {it * 2}channel.chainWith(closure) channel.chainWith(pool) {it * 2} channel.chainWith(group) {it * 2}channel.into(otherChannel) channel.into(pool, otherChannel) channel.into(group, otherChannel)channel.split(otherChannel1, otherChannel2) channel.split(otherChannels) channel.split(pool, otherChannel1, otherChannel2) channel.split(pool, otherChannels) channel.split(group, otherChannel1, otherChannel2) channel.split(group, otherChannels)channel.tap(otherChannel) channel.tap(pool, otherChannel) channel.tap(group, otherChannel)channel.merge(otherChannel) channel.merge(otherChannels) channel.merge(pool, otherChannel) channel.merge(pool, otherChannels) channel.merge(group, otherChannel) channel.merge(group, otherChannels)channel.filter( otherChannel) channel.filter(pool, otherChannel) channel.filter(group, otherChannel)channel.binaryChoice( trueBranch, falseBranch) channel.binaryChoice(pool, trueBranch, falseBranch) channel.binaryChoice(group, trueBranch, falseBranch)channel.choice( branches) channel.choice(pool, branches) channel.choice(group, branches)channel.separate( outputs) channel.separate(pool, outputs) channel.separate(group, outputs)
The pipeline builder
The Pipeline class offers an intuitive builder for operator pipelines. The greatest benefit of using the Pipeline class compared to chaining the channels directly is the ease with which a custom thread pool/group can be applied to all the operators along the constructed chain. The available methods and overloaded operators are identical to the ones available on channels directly.import groovyx.gpars.dataflow.DataflowQueue import groovyx.gpars.dataflow.operator.Pipeline import groovyx.gpars.scheduler.DefaultPool import groovyx.gpars.scheduler.Poolfinal DataflowQueue queue = new DataflowQueue() final DataflowQueue result1 = new DataflowQueue() final DataflowQueue result2 = new DataflowQueue() final Pool pool = new DefaultPool(false, 2)final negate = {-it}final Pipeline pipeline = new Pipeline(pool, queue)pipeline | {it * 2} | {it + 1} | negate pipeline.split(result1, result2)queue << 1 queue << 2 queue << 3assert -3 == result1.val assert -5 == result1.val assert -7 == result1.valassert -3 == result2.val assert -5 == result2.val assert -7 == result2.valpool.shutdown()