• No results found

A MapReduce job has different components. When you execute a MapReduce job, it goes through several distinct phases. Figure 4.5 shows these components and illustrates the MapReduce job execution flow pipeline at a high level.

FIGURE 4.5 MapReduce job—high-level execution flow.

In this section, we talk about the following components or classes and identify the role each plays during the MapReduce job execution flow:

InputFormat OutputFormat

Multiple Input and Output Format Mapper

Partitioner Combiner Driver

Context Object

We also use the following two examples to explain the execution pipeline of MapReduce jobs:

MapReduce job for counting the word in an input file—This job, which comes as a sample with Hadoop, counts the number of times each word appears in the input file. The output comes in an output text file that consists of lines, each containing a word and the total count (a key-value pair) of how often that specific word occurred in the input file.

MapReduce job for getting the total sales amount for each city—The input file for this job contains a daily sales amount for each city. The MapReduce program computes the total sales for each city by grouping on the city. The output comes in an output text file that consists of lines, each containing a city and the total sales amount (a key-value tab-separated pair) by grouping on city in the input file.

Note

The MapReduce framework is suitable not only for structured data processing, but also for unstructured data. The Mapper part of the MapReduce framework can be leveraged to bring structure to the unstructured data for further processing.

Note that, in Figure 4.5, Partitioning and Shuffling are shown separately for clarity,

although they both actually happen together. Based on the partitioning process being used, map-outputs are shuffled across appropriate Reducers.

In addition, in Figure 4.5, the InputFormat and OutputFormat are defined by the Driver component of the MapReduce job. You can think of the Driver component as a main program that initializes a MapReduce job and defines the job-specific configuration.

GO TO You can find more about the Driver component in the “Driver” section of this hour.

InputFormat

The MapReduce framework uses the InputFormat class to define how to read data from a file and make it available to the Map functions. It splits the input and spawns one map task for each InputSplit generated by it. This InputFormat class basically validates input specification and uses these two interfaces from the

org.apache.hadoop.mapreduce package to perform two main operations:

InputSplit interface—Divides the input data of the files into logical fragments, called InputSplit, that make up the inputs to individual Mappers (each

InputSplit is assigned to an individual Mapper). An InputSplit doesn’t copy the data—it just contains a reference to the actual data.

RecordReader interface—The RecordReader class is responsible for reading records from InputSplits and submitting them (as key-value pairs) to the map

functions. It converts the byte-oriented view of the input (provided by the

InputSplit) to a record-oriented view, to make it available to the Mapper for processing. It is responsible for processing record boundaries and presenting the tasks with keys and values. It also ensures that they do not miss records that span multiple InputSplit boundaries.

Note: InputSplits Versus Data Blocks

InputSplits are a logical division of records, whereas HDFS data blocks are a physical division of the input data of the files. In practice, they might not be perfectly aligned, even though both are the same size. For example, records can cross data block boundaries, but even then, the MapReduce

framework ensures the processing of all records. A Mapper ensures fetching a remaining part of the record (when a record has been stored in two data

blocks) from the other data block when processing the record from the main block.

Several implementations for InputFormat classes are already available for standard data reading, including the TextInputFormat class to read lines of text files, the

SequenceFileInputFormat class to read from a particular binary file, and the

DBInputFormat class to read data from a relational database. If you want to process application-specific or custom data, you can also define your own custom InputFormat

implementation. For example, the built-in TextInputFormat class reads lines of text files and produces a key (byte offset of the line read) and value (the contents of the line) for each line up to the terminating "\n".

Now suppose that you have a text file in which each record is not available as a new line, but rather is in a single line separated by some special characters. In this case, you can write your own custom implementation of the InputFormat class that parses lines from the file into records by splitting on that specific special character.

Note: InputSplit Versus the Map Function

An InputSplit, provided by InputFormat, is a chunk of the input data that is processed by a single map function. Based on the implementation, each InputSplit can be further divided into records, and the map function can process each record (a key-value pair) separately. For example, this might be a range of rows from a table, in the case of the DBInputFormat class.

FileInputFormat is the base class for all implementations of InputFormat that are based on files. With this, you can define which files are included as the input to a

MapReduce job execution and specify an implementation for generating splits—the

getSplits (JobConf, int) method—for the input files. The subclasses implement the job of dividing an InputSplit into records.

GO TO You can see an example of FileInputFormat in the “Driver” section later in this hour.

Note: Large Number of Small Files Versus Small Number of Large Files Hadoop is designed to store and process a smaller number of large files and might not yield better performance for a large number of small files. Having a smaller number of larger files is ideal, but you might not always have an ideal situation. If you have a large number of small files, resources will be wasted: More map functions will need to be initiated and the JobTracker will have more overhead in keeping track of it. In this case, you might consider

implementing the CombineFileInputFormat abstract base class, which combines many files into each split so that fewer map functions are initiated and each map function has more to process. Also, the

CombineFileInputFormat class has node and rack locality awareness. Take that into consideration when combining files, to ensure minimal data movement and faster processing of the MapReduce job.

OutputFormat

Hadoop has output data formats that validate the output specification of the job. For example, it checks whether the output directory already exists (to avoid overwrite) and provides the RecordWriter implementation to write the output <key, value> pairs to an output file. Output files of the job are stored in a file system.

These output data formats correspond to the input formats. For example, the

TextInputFormat input format has the corresponding TextOutputFormat output format for writing the reduce function output to plain-text files. Likewise, the

SequenceFileInputFormat input format has the corresponding

SequenceFileOutputFormat output format to write the reduce function output to flat files consisting of binary key-value pairs. The DBInputFormat input format has the corresponding DBOutputFormat output format to write the reduce function output to a SQL table, and so on.

Note: Avoiding Input File Splits

If you don’t want to split your input files into InputSplits and you want each file to be processed as a whole by a single map function, you can

override the isSplitable (FileSystem, Path) method of the

FileInputFormat class and set false as its return value. This comes in handy if you have XML files as your input and you want to process the whole file (or the entire XML document) as one input so that it gets processed by one map function.