By default, all the input files provided to a MapReduce job are interpreted and parsed by a single InputFormat class. If your input files have a different structure and you want them to be parsed using multiple InputFormat classes or a combination of them, you can use the MultipleInputs class, which enables you to specify the InputFormat
and Mapper on each path basis.
Click here to view code image
//Define path1 with a TextInputFormat input to the list of inputs for Mapper1
MultipleInputs.addInputPath(conf, path1, TextInputFormat.class, Mapper1.class);
//Define path1 with a SequenceFileInputFormat input to the list of inputs for Mapper2
MultipleInputs.addInputPath(conf, path2, SequenceFileInputFormat.class, Mapper2.class);
Likewise, you can use the MultipleOutputs class for writing to additional outputs of types other than the job default output. This requires you to pass the
OutputCollector to the map() and reduce() methods of the Mapper and Reducer implementations. Each additional output can be configured with its own
OutputFormat, its own key class, and its own value class: Output filenames can be derived from the output keys and values.
Click here to view code image
//Defines additional single text based output “text” for the job
MultipleOutputs.addNamedOutput(conf, “text”, TextOutputFormat.class, LongWritable.class, Text.class);
//Defines additional multi sequencefile based output ‘sequence’ for the job
MultipleOutputs.addMultiNamedOutput(conf, “seq”, SequenceFileOutputFormat.class,
LongWritable.class, Text.class);
Mapper
The user-defined Mapper class extends the Mapper<KeyIn, ValueIn, KeyOut, ValueOut> base class from the org.apache.hadoop.mapreduce package and performs user-defined work during the first phase of the MapReduce job execution. From the implementation point of view, a Mapper implementation takes input data in the form of a series of input key-value pairs (k1, v1) and produces a set of intermediate output key- value pairs (k2, v2) that eventually are used as inputs for shuffle and sort operations. For each InputSplit generated by the InputFormat for the MapReduce job, a separate instance of Mapper is instantiated in a separate instance of the Java Virtual Machine (JVM). These multiple instances of Mapper running in parallel cannot communicate, so the local machine governs each Mapper instance.
In our word count MapReduce job example, we use the TextInputFormat (a default
InputFormat as well) class. InputSplits generated by TextInputFormat consider each input line as a record that has a byte offset within the file as its key and consider the content of the line as its value (see Figure 4.6).
FIGURE 4.6 InputSplit to Map task. Four methods extend from the base class:
setup (Context)—Called first and executed only once at the beginning of the task to initialize the Mapper. For example, you can leverage this method to read from shared resources. The default implementation is a no-op method.
map (Object, Object, Context)—Executed for each key-value pair of the given InputSplit. In this method, you write the business functionality or
application-specific logic of the Mapper to process each record or key-value pair.
cleanup (Context)—Called only once at the end of the execution of the task (after all the input records are processed) to clean up the Mapper’s resources. The default implementation is a no-op method.
run (Context)—Can be overridden for more complete control over the execution of the Mapper function. By default, the run () method simply takes each key-value pair supplied by the context and calls the map () method, as discussed earlier. If you want to change this default behavior (such as with multithreaded Mappers), you can override this method.
For optimum performance, the map function does not start writing data to the disk when it starts producing output; it takes advantage of the memory buffer by writing it to the
circular buffers assigned to it and presorting for efficiency. By default, each map task has 100MB of circular memory buffer and can be changed using the io.sort.mb property. A background thread starts spilling buffer contents to the disk at a certain threshold, controlled by the io.sort.spill.percent property (this defaults to 80 percent). This spillover process can happen even when the map function is writing to the buffer. If the MapReduce job has 0 (zero) Reducer instance, the output of the Mapper is directly written to the OutputFormat (FileSystem) without sorting by keys.
The MapReduce framework also enables you to specify whether and how the intermediate outputs are to be compressed (by default, output is not compressed) and which
CompressionCodecs are to be used via the job configuration (for faster write to disk, to save disk space, and to minimize load on the network while transferring data to the Reducer).
Mapper Example
For our MapReduce job example, you can see in Figure 4.7 that the text is stored in different blocks on the data node. TextInputFormat reads these blocks and splits the data into multiple InputSplits—this example has two InputSplits, so two instance of Mapper are instantiated. Each Mapper takes each line from the InputSplit as an input and breaks it into words. It emits a key-value pair on each occurrence of the word, followed by a 1.
FIGURE 4.7 Map function flow for the word count example.
Listing 4.1 contains the implementation of Mapper for the word count example, which processes one line at a time, as provided by the TextInputFormat. It then splits the line into tokens (words) separated by whitespace, via the StringTokenizer class, and emits a key-value pair of < <word>, 1> as you saw in Figure 4.7.
LISTING 4.1 Mapper Implementation for the Word Count Example
Click here to view code image
1: public static class WordCountMapper
2: extends Mapper<Object, Text, Text, IntWritable>{ 3:
4: private final static IntWritable one = new IntWritable(1); 5: private Text word = new Text();
6:
7: public void map(Object key, Text value, Context context 8: ) throws IOException, InterruptedException { 9: StringTokenizer itr = new StringTokenizer(value.toString()); 10: while (itr.hasMoreTokens()) { 11: word.set(itr.nextToken()); 12: context.write(word, one); 13: } 14: } 15: }
The MapReduce job is executed across many nodes in the Hadoop cluster. To pass data from one node to another, it must be serializable and deserializable. This is where
type>Writable classes implement the Writable interface, which provides a simple, efficient, serialization protocol based on DataInput and DataOutput.
Note
Serialization is the process of converting structured objects into a byte stream so that it can be persisted over to the disk or transmitted over a network. Deserialization is the reverse process of serialization and actually converts the byte stream back into a series of structured objects.
Typically, the data type classes that implement the Writable interface can be passed as values. To pass keys, your data type classes must implement the
WritableComparable<T> interface because keys are sorted before the reduce function can be called; comparability (comparing to each other) thus is needed. The
WritableComparable<T> interface is a combination of the Writable and
java.lang.Comparable<T> interfaces, so you can use them not only for keys but also for values.
The MapReduce framework comes with basic data type wrapper classes that already have implemented the WritableComparable interface (for example,
BooleanWritable, ByteWritable, DoubleWritable, FloatWritable,
IntWritable, LongWritable, Text, VlongWritable, and NullWritable, a placeholder when the key or value is not needed). If needed, you can create your own custom data type classes by implementing either the Writable or
WritableComparable<T> interfaces.
Note: MapReduce API Versions
Hadoop provides two versions of MapReduce APIs. You can find the latest in the org.apache.hadoop.mapreduce package. For older ones, look in the org.apache.hadoop.mapred package. We recommend using the latest APIs from the org.apache.hadoop.mapreduce package and thus refer to it in our examples.
Number of Mappers
The number of Mappers initiated for a MapReduce job depends on the number of
InputSplits being generated by FileInputFormat. Similarly, the number of
InputSplits generated by FileInputFormat depends on a couple other factors. First among them is the isSplitable function, which determines whether a file is splittable. If this function returns false, the file will not be split and, instead, will be processed as a whole by a single Mapper.
Ideally, the split size is the same as the block size, which is controlled by
dfs.blocksize (the default value is 64MB or 128MB) and is appropriate for most situations. However, if you want to change the split size, you can use the combination of these three configuration properties to determine the actual split size:
mapred.min.split.size—The default value is 1. This configuration comes into the picture only when you set its value to larger than dfs.blocksize to set the minimum split size. Changing this property is not recommended in most cases because of the risk of the number of blocks (coming from different data nodes against the data locality policy) that are not local to a given map task. For example, if you set its value to 128MB on a cluster where the block size is 64MB, the input split will be a minimum of 128MB, containing two blocks, each block of 64MB. These two blocks, part of a single input split, might come from two different data nodes.
mapred.max.split.size—The default value is the maximum value for the
Long data type (Long.MAX_VALUE), or 9223372036854775807 bytes. This configuration comes into play only when you sets its value to smaller than
dfs.block.size, forcing the split to be smaller than the block size. Again, setting this value to a lower number causes too many Mappers to be created, which increases overhead in maintaining and monitoring job execution.
dfs.block.size—The default value is either 67108864 (64MB) or
134217728 (128MB).
For the actual split size, max (mapred.min.split.size, dfs.block.size)
sets the lower bound and min (dfs.block.size, mapred.max.split.size)
sets the upper bound. You calculate the actual split size in the computeSplitSize()
method of FileInputFormat:
Click here to view code image
max (mapred.min.split.size, min (mapred.max.split.size, dfs.block.size))
By default, compressed input files (such as gzip files) are not splittable, although some patches are available for this or you can write your custom code to make it splittable.
Note: Recommended Size for InputSplit
Keep each split size small enough for parallelization but large enough to avoid too much overhead in starting and managing too many map tasks. Also, in most cases, using the same size for input split and block size works
efficiently.