Time for action – reduce-side join using MultipleInputs
7. Examine the result file.
$ hadoop fs -cat /user/garry/outputs/part-r-00000 John Allen 3 124.929998
Abigail Smith 3 127.929996 April Stevens 1 499.989990 Nasser Hafez 1 13.420000
What just happened?
Firstly, we created the datafiles to be used in this example. We created two small data sets as this makes it easier to track the result output. The first data set we defined was the account details with four columns, as follows:
The account ID The client name
The type of account
The date the account was opened
We then created a sales record with three columns:
The account ID of the purchaser The value of the sale
The date of the sale
Naturally, real account and sales records would have many more fields than the ones mentioned here. After creating the files, we placed them onto HDFS.
We then created the ReduceJoin.java file, which looks very much like the previous
MapReduce jobs we have used. There are a few aspects to this job that make it special and allow us to implement a join.
Firstly, the class has two defined mappers. As we have seen before, jobs can have multiple mappers executed in a chain; but in this case, we wish to apply different mappers to each of the input locations. Accordingly, we have the sales and account data defined into the SalesRecordMapper and AccountRecordMapper classes. We used the MultipleInputs
class from the org.apache.hadoop.mapreduce.lib.io package as follows:
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, SalesRecordMapper.class) ; MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, AccountRecordMapper.class) ;
As you can see, unlike in previous examples where we add a single input location, the MultipleInputs class allows us to add multiple sources and associate each with a distinct input format and mapper.
The mappers are pretty straightforward; the SalesRecordMapper class emits an output of
the form <account number>, <sales value> while the AccountRecordMapper class
emits an output of the form <account number>, <client name>. We therefore have
the order value and client name for each sale being passed into the reducer where the actual join will happen.
Notice that both mappers actually emit more than the required values. The SalesRecordMapper class prefixes its value output with sales while
the AccountRecordMapper class uses the tag account.
If we look at the reducer, we can see why this is so. The reducer retrieves each record for a given key, but without these explicit tags we would not know if a given value came from the sales or account mapper and hence would not understand how to treat the data value.
The ReduceJoinReducer class therefore treats the values in the Iterator
object differently, depending on which mapper they came from. Values from the AccountRecordMapper class—and there should be only one—are used to populate the client name in the final output. For each sales record—likely to be multiple, as most clients buy more than a single item—the total number of orders is counted as is the overall combined value. The output from the reducer is therefore a key of the account holder name and a value string containing the number of orders and the total order value.
We compile and execute the class; notice how we provide three arguments representing the two input directories as well as the single output source. Because of how the
MultipleInputs class is configured, we must also ensure we specify the directories in the right order; there is no dynamic mechanism to determine which type of file is in which location.
After execution, we examine the output file and confirm that it does indeed contain the overall totals for named clients as expected.
DataJoinMapper and TaggedMapperOutput
There is a way of implementing a reduce-side join in a more sophisticated and object- oriented fashion. Within the org.apache.hadoop.contrib.join package are classes
such as DataJoinMapperBase and TaggedMapOutput that provide an encapsulated
means of deriving the tags for map output and having them processed at the reducer. This mechanism means you don't have to define explicit tag strings as we did previously and then carefully parse out the data received at the reducer to determine from which mapper the data came; there are methods in the provided classes that encapsulate this functionality. This capability is particularly valuable when using numeric or other non-textual data. For creating our own explicit tags as in the previous example, we would have to convert types such as integers into strings to allow us to add the required prefix tag. This will be more inefficient than using the numeric types in their normal form and relying on the additional classes to implement the tag.
The framework allows for quite sophisticated tag generation as well as concepts such as tag grouping that we didn't implement previously. There is additional work required to use this mechanism that includes overriding additional methods and using a different map base class. For straightforward joins such as in the previous example, this framework may be overkill, but if you find yourself implementing very complex tagging logic, it may be worth a look.