Batch layer: Illustration
7.2 Common pitfalls of data-processing tools
7.3.6 Custom predicate operations
You’ll frequently need to create additional predicate types to implement your business logic. Toward this end, JCascalog exposes simple interfaces to define new filters, func- tions, and aggregators. Most importantly, this is all done with regular Java code by implementing the appropriate interfaces.
FILTERS
We’ll begin with filters. A filter predicate requires a single method named isKeep that returns true if the input tuple should be kept, and false if it should be filtered. The fol- lowing is a filter that keeps all tuples where the input is greater than 10:
public static class GreaterThanTenFilter extends CascalogFilter { public boolean isKeep(FlowProcess process, FilterCall call) {
return call.getArguments().getInteger(0) > 10; }
}
FUNCTIONS
Next up are functions. Like filters, a function predicate implements a single method—in this case named operate. A function takes in a set of inputs and then emits zero or more tuples as output. Here’s a simple function that increments its input value by one:
public static class IncrementFunction extends CascalogFunction { public void operate(FlowProcess process, FunctionCall call) {
int v = call.getArguments().getInteger(0); call.getOutputCollector().add(new Tuple(v + 1)); }
}
A verbose explanation
You may have noticed that this example computes an average by doing a count, sum, and division. This was solely for the purposes of illustration—these operations can be abstracted into an Average aggregator, as you saw earlier in this chapter. You may have also noticed that some variables are never used after a point, yet still remain in the resulting tuple sets. For example, the ?b variable is not used after the
LT predicate is applied, but it’s still grouped along with the other variables. In reality, JCascalog will drop any variables once they’re no longer needed so that they aren’t serialized or transferred over the network. This is the optimization mentioned in the previous chapter that can be applied to any pipe diagram.
Obtains the first element of the input tuple and treats the value as an integer
Obtains the value from the input tuple
Emits a new tuple with the incremented value
Figure 7.12 shows the result of applying this function to a set of tuples.
Recall from earlier that a function can act as a filter if it emits zero tuples for a given tuple. Here’s a function that attempts to parse an integer from a string, filtering out the tuple if the parsing fails:
public static class TryParseInteger extends CascalogFunction { public void operate(FlowProcess process, FunctionCall call) {
String s = call.getArguments().getString(0); try { int i = Integer.parseInt(s); call.getOutputCollector().add(new Tuple(i)); } catch(NumberFormatException e) {} } }
Figure 7.13 illustrates this function applied to a tuple set. You can observe that one tuple is filtered by the process.
Finally, if a function emits multiple output tuples, each output tuple is appended to its own copy of the input arguments. As an example, here’s the Split function from word count:
public static class Split extends CascalogFunction {
public void operate(FlowProcess process, FunctionCall call) { String sentence = call.getArguments().getString(0);
for(String word: sentence.split(" ")) {
call.getOutputCollector().add(new Tuple(word)); } } } ?a "a" "a" 1 4 ?b 1 "b" 5 ?c 2 2 ?a "a" "a" 1 4 ?b 1 "b" .predicate(new IncrementFunction(), "?b") .out("?c")
Figure 7.12 The IncrementFunction predicate applied to some sample tuples
Regards input value as a string Emits value as integer if parsing succeeds Emits nothing if parsing fails ?a "aaa" "3" 1 4 ?b 1 "2" 3 2 1 "2" ?c "3" ?b 4 ?a
.predicate(new TryParseInteger(), "?a") .out("?c")
Figure 7.13 The TryParseInteger function filters rows where ?a can’t be converted to an integer value.
For simplicity, splits into words using a single whitespace Emits each
word as a separate tuple
Figure 7.14 shows the result of applying this function to a set of sentences. You can see that each input sentence gets duplicated for each word it contains.
AGGREGATORS
The last class of customizable predicate operations is aggregators. As we mentioned earlier, there are three types of aggregators, each with different properties regarding composition and performance.
Perhaps rather obviously, the first type of aggregator is literally called an aggregator. An aggregator looks at one tuple at a time for each tuple in a group, adjusting some internal state for each observed tuple. The following is an implementation of sum as an aggregator:
public static class SumAggregator extends CascalogAggregator { public void start(FlowProcess process, AggregatorCall call) {
call.setContext(0); }
public void aggregate(FlowProcess process, AggregatorCall call) { int total = (Integer) call.getContext();
call.setContext(total + call.getArguments().getInteger(0)); }
public void complete(FlowProcess process, AggregatorCall call) { int total = (Integer) call.getContext();
call.getOutputCollector().add(new Tuple(total)); }
}
The next type of aggregator is called a buffer. A buffer receives an iterator to the entire set of tuples for a group. Here’s an implementation of sum as a buffer:
public static class SumBuffer extends CascalogBuffer {
public void operate(FlowProcess process, BufferCall call) { Iterator<TupleEntry> it = call.getArgumentsIterator(); int total = 0; while(it.hasNext()) { TupleEntry t = it.next(); total+=t.getInteger(0); } "data" ?s "the big dog"
"data" "data"
"dog" "the big dog"
?w "the" "big" "the big dog"
?s "the big dog"
.predicate(new Split(), "?s").out("?w")
Figure 7.14 The Split function can emit multiple tuples from a single input tuple.
Initializes the aggregator internal state
Called for each tuple; updates the internal state to store the running sum
Once all tuples are processed, emits a tuple with the final result
The tuple set is accessible via an iterator.
call.getOutputCollector().add(new Tuple(total)); }
}
Buffers are easier to write than aggregators because you only need to implement one method rather than three. But unlike buffers, aggregators can be chained in a query.
Chaining means you can compute multiple aggregations at the same time for the same group. Buffers can’t be used along with any other aggregator type, but aggregators can be used with other aggregators.
In the context of the MapReduce framework, both buffers and aggregators rely on reducers to perform the actual computation for these operators. This is illustrated in figure 7.15.
JCascalog packs together as many operations as possible into map and reduce tasks, but these aggregator operators are solely performed by reducers. This necessi- tates a network-intensive approach because all data for the computation must flow from the mappers to the reducers. Furthermore, if there were only a single group (such as if you were counting the number of tuples in a dataset), all the tuples would have to be sent to a single reducer for aggregation, defeating the purpose of using a parallel computation system.
Fortunately, the last type of aggregator operation can do aggregations more scal- ably and efficiently. These aggregators are analogous to combiner aggregators from pipe diagrams, though in JCascalog they’re called parallel aggregators. A parallel aggre- gator performs an aggregation incrementally by doing partial aggregations in the map tasks.
Figure 7.16 shows the division of labor for sum when implemented as a parallel aggregator. Not every aggregator can be implemented as a parallel aggregator, but when it’s possible, you can achieve huge performance gains by avoiding all that net- work I/O.
A single function iterates over all tuples and emits the output tuple.
Map input 4 3 Map output 4 3 9 Map input 1 1 9 Map output 1 1 9 1 4 1 3 Reduce input 18 Reduce output Netw ork
For buffers and aggregators, the reducers are responsible for the computation.
All data is thus transferred to the reducers, affecting performance.
b
c
To write your own parallel aggregator, you must implement two functions:
■ The init function maps the arguments from a single tuple to a partial aggrega- tion for that tuple.
■ The combine function specifies how to combine two partial aggregations into a single aggregation value.
The following code implements sum as a parallel aggregator:
public static class SumParallel implements ParallelAgg {
public void prepare(FlowProcess process, OperationCall call) {} public List<Object> init(List<Object> input) {
return input; }
public List<Object> combine(List<Object> input1, List<Object> input2) {
int val1 = (Integer) input1.get(0); int val2 = (Integer) input2.get(0);
return Arrays.asList((Object) (val1 + val2)); }
}
Parallel aggregators can be chained with other parallel aggregators or regular aggre- gators. But when chained with regular aggregators, parallel aggregators are unable to do partial aggregations in the map tasks and will act like regular aggregators.
You’ve now seen all the abstractions that comprise JCascalog subqueries: predi- cates, functions, filters, and aggregators. The power of these abstractions lies in how they promote reuse and composability. Let’s now take a look at the various composi- tion techniques possible with JCascalog.
Map input 4 3 Map output 7 9 Map input 1 1 Map output 11 7 11 Reduce input 18 Reduce output Netw ork
For parallel aggregators, maps perform intermediate work where possible.
Performance is improved because less data transfers across the network, and reducers are responsible for less work.
b
c
Figure 7.16 Execution of a sum parallel aggregator at the MapReduce level
For sum, the partial aggregation is just the value in the argument.
To combine two partial aggregations, simply sum the values.