• No results found

Time for action – using ChainMapper for field validation/ analysis

In document Hadoop Beginner's Guide(2013) pdf (Page 129-133)

Let's use this principle and employ the ChainMapper class to help us provide some record validation within our job:

1.

Create the following class as UFORecordValidationMapper.java:

import java.io.IOException;

import org.apache.hadoop.io.* ; import org.apache.hadoop.mapred.* ; import org.apache.hadoop.mapred.lib.* ;

public class UFORecordValidationMapper extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {

public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException {

if (validate(line))

output.collect(key, value); }

private boolean validate(String str) {

String[] parts = str.split("\t") ;

if (parts.length != 6) return false ;

return true ; }

}

2.

Create the following as UFOLocation.java:

import java.io.IOException; import java.util.Iterator ; import java.util.regex.* ; import org.apache.hadoop.conf.* ; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.* ; import org.apache.hadoop.mapred.* ; import org.apache.hadoop.mapred.lib.* ;

public class UFOLocation {

public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {

private final static LongWritable one = new LongWritable(1); private static Pattern locationPattern = Pattern.compile( "[a-zA-Z]{2}[^a-zA-Z]*$") ;

public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {

String line = value.toString();

String[] fields = line.split("\t") ; String location = fields[2].trim() ;

if (location.length() >= 2) {

Matcher matcher = locationPattern.matcher(location) ; if (matcher.find() )

{

int start = matcher.start() ;

String state = location.substring(start,start+2);

output.collect(new Text(state.toUpperCase()), One); } } } }

public static void main(String[] args) throws Exception {

Configuration config = new Configuration() ; JobConf conf = new JobConf(config, UFOLocation.class); conf.setJobName("UFOLocation");

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(LongWritable.class);

JobConf mapconf1 = new JobConf(false) ;

ChainMapper.addMapper( conf, UFORecordValidationMapper.class, LongWritable.class, Text.class, LongWritable.class,

Text.class, true, mapconf1) ;

JobConf mapconf2 = new JobConf(false) ; ChainMapper.addMapper( conf, MapClass.class, LongWritable.class, Text.class,

Text.class, LongWritable.class, true, mapconf2) ; conf.setMapperClass(ChainMapper.class);

conf.setCombinerClass(LongSumReducer.class); conf.setReducerClass(LongSumReducer.class);

FileInputFormat.setInputPaths(conf,args[0]) ;

FileOutputFormat.setOutputPath(conf, new Path(args[1])) ;

JobClient.runJob(conf); }

3.

Compile both files:

$ javac UFORecordValidationMapper.java UFOLocation.java

4.

Jar up the class files and submit the job to Hadoop:

$ Hadoop jar ufo.jar UFOLocation ufo.tsv output

5.

Copy the output file to the local filesystem and examine it:

$ Hadoop fs -get output/part-00000 locations.txt $ more locations.txt

What just happened?

There's quite a bit happening here, so let's look at it one piece at a time.

The first mapper is our simple validation mapper. The class follows the same interface as the standard MapReduce API and the map method simply returns the result of a utility validation method. We split this out into a separate method to highlight the functionality of the mapper, but the checks could easily have been within the main map method itself. For simplicity, we keep to our previous validation strategy of looking for the number of fields and discarding lines that don't break into exactly six tab-delimited fields.

Note that the ChainMapper class has unfortunately been one of the last components to be

migrated to the context object API and as of Hadoop 1.0, it can only be used with the older API. It remains a valid concept and useful tool but until Hadoop 2.0, where it will finally be migrated into the org.apache.hadoop.mapreduce.lib.chain package, its current

use requires the older approach.

The other file contains another mapper implementation and an updated driver in the main method. The mapper looks for a two-letter sequence at the end of the location field in a UFO sighting report. From some manual examination of data, it is obvious that most location fields are of the form city, state, where the standard two-character abbreviation is used

for the state.

Some records, however, add trailing parenthesis, periods, or other punctuation. Some others are simply not in this format. For our purposes, we are happy to discard those records and focus on those that have the trailing two-character state abbreviation we are looking for. The map method extracts this from the location field using another regular expression and gives the output as the capitalized form of the abbreviation along with a simple count. The driver for the job has the most changes as the previous configuration involving a single map class is replaced with multiple calls on the ChainMapper class.

The general model is to create a new configuration object for each mapper, then add the mapper to the ChainMapper class along with a specification of its input and output, and a reference to the overall job configuration object.

Notice that the two mappers have different signatures. Both input a key of type LongWritable and value of type Text which are also the output types of UFORecordValidationMapper. UFOLocationMapper however outputs the

reverse with a key of type Text and a value of type LongWritable.

The important thing here is to match the input from the final mapper in the chain

(UFOLocationMapper) with the inputs expected by the reduce class (LongSumReducer).

When using theChainMapper class the mappers in the chain can have different input and

output as long as the following are true:

‹ For all but the final mapper each map output matches the input of the subsequent

mapper in the chain

‹ For the final mapper, its output matches the input of the reducer

We compile these classes and put them in the same jar file. This is the first time we have bundled the output from more than one Java source file together. As may be expected, there is no magic here; the usual rules on jar files, path, and class names apply. Because in this case we have both our classes in the same package, we don't have to worry about an additional import in the driver class file.

We then run the MapReduce job and examine the output, which is not quite as expected.

In document Hadoop Beginner's Guide(2013) pdf (Page 129-133)