3.2 Map-Reduce - Reference Documentation
Authors: The Whole GPars Gang
Version: 1.2.1
3.2 Map-Reduce
The Parallel Collection Map/Reduce DSL gives GPars a more functional flavor. In general, the Map/Reduce DSL may be used for the same purpose as the xxxParallel() family methods and has very similar semantics. On the other hand, Map/Reduce can perform considerably faster, if you need to chain multiple methods to process a single collection in multiple steps:println 'Number of occurrences of the word GROOVY today: ' + urls.parallel
.map {it.toURL().text.toUpperCase()}
.filter {it.contains('GROOVY')}
.map{it.split()}
.map{it.findAll{word -> word.contains 'GROOVY'}.size()}
.sum()
- map()
- reduce()
- filter()
- size()
- sum()
- min()
- max()
- sort()
- groupBy()
- combine()
def myNumbers = (1..1000).parallel.filter{it % 2 == 0}.map{Math.sqrt it}.collection
Avoid side-effects in functions
Once again we need to warn you. To avoid nasty surprises, please, keep your closures, which you pass to the Map/Reduce functions, stateless and clean from side-effects.Availability
This feature is only available when using in the Fork/Join-based GParsPool , not in GParsExecutorsPool .Classical Example
A classical example, inspired by http://github.com/thevery, counting occurrences of words in a string:import static groovyx.gpars.GParsPool.withPooldef words = "This is just a plain text to count words in" print count(words)def count(arg) { withPool { return arg.parallel .map{[it, 1]} .groupBy{it[0]}.getParallel() .map {it.value=it.value.size();it} .sort{-it.value}.collection } }
def words = "This is just a plain text to count words in" print count(words)def count(arg) { withPool { return arg.parallel .map{[it, 1]} .combine(0) {sum, value -> sum + value}.getParallel() .sort{-it.value}.collection } }
Combine
The combine operation expects on its input a list of tuples (two-element lists) considered to be key-value pairs (such as [key1, value1, key2, value2, key1, value3, key3, value4 … ] ) with potentially repeating keys. When invoked, combine merges the values for identical keys using the provided accumulator function and produces a map mapping the original (unique) keys to their accumulated values. E.g. [a, b, c, d, a, e, c, f] will be combined into a : b+e, c : d+f, while the '+' operation on the values needs to be provided by the user as the accumulation closure.The accumulation function argument needs to specify a function to use for combining (accumulating) the values belonging to the same key. An initial accumulator value needs to be provided as well. Since the combine method processes items in parallel, the initial accumulator value will be reused multiple times. Thus the provided value must allow for reuse. It should be either a cloneable or immutable value or a closure returning a fresh initial accumulator each time requested. Good combinations of accumulator functions and reusable initial values include:accumulator = {List acc, value -> acc << value} initialValue = [] accumulator = {List acc, value -> acc << value} initialValue = {-> []} accumulator = {int sum, int value -> acc + value} initialValue = 0 accumulator = {int sum, int value -> sum + value} initialValue = {-> 0} accumulator = {ShoppingCart cart, Item value -> cart.addItem(value)} initialValue = {-> new ShoppingCart()}
The keys will be mutually compared using their equals and hashCode methods. Consider using @Canonical or @EqualsAndHashCode to annotate classes that you use as keys. Just like with all hash maps in Groovy, be sure you're using a String not a GString as a key!For more involved scenarios when you combine() complex objects, a good strategy here is to have a class that can be used as a key for the common use cases and apply different keys for uncommon cases.
import groovy.transform.ToString
import groovy.transform.TupleConstructorimport static groovyx.gpars.GParsPool.withPoolTupleConstructor
ToString
class PricedCar implements Cloneable {
String model
String color
Double price boolean equals(final o) {
if (this.is(o)) return true
if (getClass() != o.class) return false final PricedCar pricedCar = (PricedCar) o if (color != pricedCar.color) return false
if (model != pricedCar.model) return false return true
} int hashCode() {
int result
result = (model != null ? model.hashCode() : 0)
result = 31 * result + (color != null ? color.hashCode() : 0)
return result
} @Override
protected Object clone() {
return super.clone()
}
}def cars = [new PricedCar('F550', 'blue', 2342.223),
new PricedCar('F550', 'red', 234.234),
new PricedCar('Da', 'white', 2222.2),
new PricedCar('Da', 'white', 1111.1)]withPool {
//Combine by model
def result =
cars.parallel.map {
[it.model, it]
}.combine(new PricedCar('', 'N/A', 0.0)) {sum, value ->
sum.model = value.model
sum.price += value.price
sum
}.values() println result
//Combine by model and color (the PricedCar's equals and hashCode))
result =
cars.parallel.map {
[it, it]
}.combine(new PricedCar('', 'N/A', 0.0)) {sum, value ->
sum.model = value.model
sum.color = value.color
sum.price += value.price
sum
}.values() println result
}