• No results found

Understanding inputs and outputs in MapReduce

Data serialization— working with text

3.1 Understanding inputs and outputs in MapReduce

Your data might be XML files sitting behind a number of FTP servers, text log files sit- ting on a central web server, or Lucene indexes in HDFS.1 How does MapReduce sup- port reading and writing to these different serialization structures across the various storage mechanisms?

Figure 3.1 shows the high-level data flow through MapReduce and identifies the actors responsible for various parts of the flow. On the input side, you can see that some work (Create split) is performed outside of the map phase, and other work is per- formed as part of the map phase (Read split). All of the output work is performed in the reduce phase (Write output).

1 Apache Lucene is an information retrieval project that stores data in an inverted index data structure opti-

63

Understanding inputs and outputs in MapReduce

Figure 3.2 shows the same flow with a map-only job. In a map-only job, the Map- Reduce framework still uses the OutputFormat and RecordWriter classes to write the out- puts directly to the data sink.

Let’s walk through the data flow and discuss the responsibilities of the various actors. As we do this, we’ll also look at the relevant code from the built-in TextInputFormat and TextOutputFormat classes to better understand the concepts. The TextInputFormat andTextOutputFormat classes read and write line-oriented text files.

3.1.1 Data input

The two classes that support data input in MapReduce are InputFormat and RecordReader. The InputFormat class is consulted to determine how the input data should be partitioned for the map tasks, and the RecordReader performs the reading of data from the inputs. INPUTFORMAT

Every job in MapReduce must define its inputs according to contracts specified in the InputFormat abstract class. InputFormat implementers must fulfill three contracts: they describe type information for map input keys and values, they specify how the input

Map phase Mapper.map Reducer.reduce k,v RecordWriter.write InputFormat.getSplits RecordReader.nextKeyValue Create split Read

split Map Partition Partitioner.getPartition

The map and reduce functions are typically wrien by the user to

address a specific use case. The partitioner's job is to logically funnel

map outputs to the reducers.

The RecordWriter writes the reduce output to the destination data sink, which contains the final resting place of this MapReduce

data flow. The InputFormat and

RecordReader are responsible for determining what data to feed

into the map function.

k,list(v) Write output Reduce Reduce phase Input Output

Figure 3.1 High-level input and output actors in MapReduce

Input Map phase Mapper.map RecordWriter.write InputFormat.getSplits RecordReader.nextKeyValue Create split Read split Map Output Write output

data should be partitioned, and they indicate the RecordReader instance that should read the data from source. Figure 3.3 shows the InputFormat class and how these three contracts are defined.

Arguably, the most crucial contract is that of determining how to divide the input data. In MapReduce nomenclature, these divisions are referred to as input splits. The input splits directly impact the map parallelism, because each split is processed by a single map task. Working with an InputFormat that’s unable to create multiple input splits over a single data source (such as a file) will result in a slow map phase, because the file will be processed sequentially.

The TextInputFormat class (view source at http://mng.bz/h728) provides an imple- mentation of the InputFormat class’s createRecordReader method, but it delegates the calculation of input splits to its parent class, FileInputFormat. The following code shows the relevant parts of the TextInputFormat class:

public class TextInputFormat

extends FileInputFormat<LongWritable, Text> { @Override

public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,

TaskAttemptContext context) { String delimiter = context.getConfiguration().get(

"textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) {

recordDelimiterBytes = delimiter.getBytes(); }

return new LineRecordReader(recordDelimiterBytes); }

...

<<abstract>> InputFormat<K,V>

Type definitions for map input keys and values.

List<InputSplit> getSplits(JobContext context);

RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context);

Partition the input data into input splits.

Create a RecordReader to read data from the job inputs.

Figure 3.3 The annotated InputFormat class and its three contracts

The parent class, FileInputFormat, provides all of the input split functionality.

The default record delimiter is newline, but it can be overridden with textinputformat.record.delimiter.

Construct the RecordReader to read the data from the data source.

65

Understanding inputs and outputs in MapReduce

The code in FileInputFormat (source at http://mng.bz/CZB8) that determines the input splits is a little more complicated. A simplified form of the code is shown in the following example to portray the main elements of the getSplits method:

public List<InputSplit> getSplits(JobContext job ) throws IOException { List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus>files = listStatus(job);

for (FileStatus file: files) { Path path = file.getPath(); BlockLocation[] blkLocations =

FileSystem.getFileBlockLocations(file, 0, length); long splitSize = file.getBlockSize();

while (splitsRemaining()) { splits.add(new FileSplit(path, ...)); } } return splits; }

The following code shows how you can specify the InputFormat to use for a MapReduce job:

job.setInputFormatClass(TextInputFormat.class);

RECORDREADER

You’ll create and use the RecordReader class in the map tasks to read data from an input split and to provide each record in the form of a key/value pair for use by mappers. A task is commonly created for each input split, and each task has a single RecordReader that’s responsible for reading the data for that input split. Figure 3.4 shows the abstract methods you must implement.

The listStatus method determines all the input files for the job. Retrieve all of

the file blocks.

The size of the splits is the same as the block size for the file. Each file can have a different block size.

Create a split for each file block and add it to the result.

<<abstract>>

RecordReader<KEYIN,VALUEIN>

void initialize(InputSplit split, TaskAttemptContext context) boolean nextKeyValue() KEYIN getCurrentKey() VALUEIN getCurrentValue() float getProgress() void close() Type definitions for map input keys

and values. Initialization, which could

involve seeking into a file and determining the logical starting

point of the next record. Read the next record from

file and return a flag indicating if the end of the

split has been reached.

Close any resources associated with the data source. Return the current

progress of the reader. Return the current record’s key. Return the current record’s value.

As shown previously, the TextInputFormat class creates a LineRecordReader to read records from the input splits. The LineRecordReader directly extends the RecordReader class and uses the LineReader class to read lines from the input split. The LineRecordReader uses the byte offset in the file for the map key, and the contents of the line for the map value. The following example shows a simplified version of the LineRecordReader (source at

http://mng.bz/mYO7):

public class LineRecordReader

extends RecordReader<LongWritable, Text> { private LineReader in;

private LongWritable key = new LongWritable(); private Text value = new Text();

public void initialize(InputSplit genericSplit,

TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit;

// open the file and seek to the start of the split FileSystem fs = file.getFileSystem(job);

FSDataInputStream fileIn = fs.open(split.getPath()); fileIn.seek(start);

in = new LineReader(fileIn, job); if (notAtStartOfFile) {

start += in.readLine(...); }

public boolean nextKeyValue() throws IOException { key.set(pos);

return in.readLine(value, ...) > 0; }

}

Because the LineReader class is easy, we’ll skip that code. The next step is to look at how MapReduce supports data outputs.

3.1.2 Data output

MapReduce uses similar processes for supporting both output and input data. Two classes must exist: an OutputFormat and a RecordWriter. The OutputFormat performs some basic validation of the data sink properties, and the RecordWriter writes each reducer output to the data sink.

OUTPUTFORMAT

Much like the InputFormat class, the OutputFormat class, as shown in figure 3.5, defines the contracts that implementers must fulfill: checking the information related to the job output, providing a RecordWriter, and specifying an output committer, which allows writes to be staged and then made “permanent” upon task or job success. (Out- put committing is covered in section 3.5.2.)

Open an InputStream to the input split file. Seek to the start

of the input split. Create a new LineReader that can read lines

from a stream. If you aren’t at the start of the file,

figure out where to start reading lines by reading characters until you hit a newline. At that point you’re ready to start supplying lines to the map. After the initialize method

is called, it’s called repeatedly by the MapReduce framework until it returns false, which signifies the end of the input split.

Set the byte offset in the file as the key.

Read the next line into the value. If you’ve gone beyond the end of the input split, return false.

67

Understanding inputs and outputs in MapReduce

Just like the TextInputFormat, the TextOutputFormat also extends a base class, FileOutput- Format, which takes care of some complicated logistics such as output committing, which we’ll cover later in this chapter. For now, let’s take a look at the work that TextOutputFormat performs (source at http://mng.bz/lnR0):

public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { public RecordWriter<K, V>

getRecordWriter(TaskAttemptContext job

) throws IOException, InterruptedException { boolean isCompressed = getCompressOutput(job);

String keyValueSeparator= conf.get( "mapred.textoutputformat.separator", "\t"); Path file = getDefaultWorkFile(job, extension); FileSystem fs = file.getFileSystem(conf);

FSDataOutputStream fileOut = fs.create(file, false); return new LineRecordWriter<K, V>(

fileOut, keyValueSeparator); }

The following code shows how you can specify the OutputFormat that should be used for a MapReduce job:

job.setOutputFormatClass(TextOutputFormat.class);

RECORDWRITER

You’ll use the RecordWriter to write the reducer outputs to the destination data sink. It’s a simple class, as figure 3.6 illustrates.

<<abstract>> OutputFormat<K,V>

RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) void checkOutputSpecs(JobContext context)

OutputCommitter getOutputCommitter(TaskAttemptContext context) Type definitions for reduce

output keys and values.

Create an instance to write data to the

destination.

Verify the output details associated with the MapReduce job are correct.

Get the associated output commier. Output commiers are responsible for “finalizing” the output aer successful

task and job completion. Figure 3.5 The annotated OutputFormat class

The default key/ value separator is the tab character, but this can be changed with the mapred.textoutput format.separator configuration setting.

Creates a unique filename for the reducer in a temporary directory.

Creates the output file.

Returns a RecordWriter used to write to the file.

The TextOutputFormat returns a LineRecordWriter object, which is an inner class of Text- OutputFormat, to perform the writing to the file. A simplified version of that class (source at http://mng.bz/lnR0) is shown in the following example:

protected static class LineRecordWriter<K, V> extends RecordWriter<K, V> {

protected DataOutputStream out;

public synchronized void write(K key, V value) throws IOException { writeObject(key); out.write(keyValueSeparator); writeObject(value); out.write(newline); }

private void writeObject(Object o) throws IOException { out.write(o);

}

Whereas on the map side it’s the InputFormat that determines how many map tasks are executed, on the reducer side the number of tasks is solely based on the value for mapred.reduce.tasks set by the client (or if it isn’t set, the value is picked up from mapred-site.xml, or from mapred-default.xml if it doesn’t exist in the site file).

Now that you know what’s involved in working with input and output data in Map- Reduce, it’s time to apply that knowledge to solving some common data-serialization problems. Your first step in this journey is to learn how to work with common file for- mats such as XML.