Data Extraction
Chapter 3. Processing Data in Hadoop
In the previous chapters we’ve covered considerations around modeling data in Hadoop and how to move data in and out of Hadoop. Once we have data loaded and modeled in Hadoop, we’ll of course want to access and work with that data. In this chapter we review the frameworks available for processing data in Hadoop.
With processing, just like everything else with Hadoop, we have to understand the available options before deciding on a specific framework. These options give us the knowledge to select the correct tool for the job, but they also add confusion for those new to the ecosystem. This chapter is written with the goal of giving you the knowledge to select the correct tool based on your specific use cases.
We will open the chapter by reviewing the main execution engines — the frameworks directly responsible for executing data processing tasks on Hadoop clusters. This includes the well-established MapReduce framework, as well as newer options such as data flow engines like Spark.
We’ll then move to higher-level abstractions such as Hive, Pig, Crunch, and Cascading. These tools are designed to provide easier-to-use abstractions over lower-level
frameworks such as MapReduce.
For each processing framework, we’ll provide: An overview of the framework
A simple example using the framework
Rules for when to use the framework
Recommended resources for further information on the framework
After reading this chapter, you will gain an understanding of the various data processing options, but not deep expertise in any of them. Our goal in this chapter is to give you confidence that you are selecting the correct tool for your use case. If you want more detail, we’ll provide references for you to dig deeper into a particular tool.
Shared Nothing Architectures
Before we dive into a specifics of each framework, note one thing they all have in common: as much as possible, they attempt to implement a shared nothing architecture. In distributed systems, this is an architecture where each node is completely independent of other nodes in the system. There are no shared resources that can become bottlenecks. The lack of shared resources refers to physical resources such as memory, disks, and CPUs — instead of using centralized storage, Hadoop’s processing framework uses the distributed HDFS storage. It also refers to lack of shared data — in those frameworks, each node is
processing a distinct subset of the data and there’s no need to manage access to shared data. Shared nothing architectures are very scalable: because there are no shared resources, addition of nodes adds resources to the system and does not introduce further contention. These architectures are also fault-tolerant: each node is independent, so there are no single points of failure, and the system can quickly recover from a failure of an individual node. As you read this chapter, notice how each framework preserves the principles of shared nothing architecture whereas its other details differ.
MapReduce
The MapReduce model was introduced in a white paper by Jeffrey Dean and Sanjay Ghemawat from Google called MapReduce: Simplified Data Processing on Large Clusters. This paper described a programming model and an implementation for
processing and generating large data sets. This programming model provided a way to develop applications to process large data sets in parallel, without many of the
programming challenges usually associated with developing distributed, concurrent applications. The shared nothing architecture described by this model provided a way to implement a system that could be scaled through the addition of more nodes, while also providing fault tolerance when individual nodes or processes failed.
MapReduce Overview
The MapReduce programming paradigm breaks processing into two basic phases: a map phase and a reduce phase. The input and output of each phase are key-value pairs.
The processes executing the map phase are called mappers. Mappers are Java processes (JVMs) that normally start up on nodes that also contain the data they will process. Data locality is an important principle of MapReduce; with large data sets, moving the
processing to the servers that contain the data is much more efficient than moving the data across the network. An example of the types of processing typically performed in mappers are parsing, transformation, and filtering. When the mapper has processed the input data it will output a key-value pair to the next phase, the sort and shuffle.
In the sort and shuffle phase, data is sorted and partitioned. We will discuss the details of how this works later in the chapter. This partitioned and sorted data is sent over the network to reducer JVMs that read the data ordered and partitioned by the keys. When a reducer process gets these records, the reduce0 function can do any number of operations
on the data, but most likely the reducer will write out some amount of the data or aggregate to a store like HDFS or HBase.
To summarize, there are two sets of JVMs. One gets data unsorted and the other gets data sorted and partitioned. There are many more parts to MapReduce that we will touch on in a minute, but Figure 3-1 shows what has been described so far.
Figure 3-1. MapReduce sort and shuffle
The following are some typical characteristics of MapReduce processing:
Mappers process input in key-value pairs and are only able to process a single pair at a time. The number of mappers is set by the framework, not the developer.
Mappers pass key-value pairs as output to reducers, but can’t pass information to other mappers. Reducers can’t communicate with other reducers either.
Mappers and reducers typically don’t use much memory and the JVM heap size is set relatively low.
Each reducer typically, although not always, has a single output stream — by default a set of files named part-r-00000, part-r-00001, and so on, in a single HDFS directory.
The output of the mapper and the reducer is written to disk. If the output of the reducer requires additional processing, the entire data set will be written to disk and then read again. This pattern is called synchronization barrier and is one of the major reasons MapReduce is considered inefficient for iterative processing of data.
Before we go into the lower-level details of MapReduce, it is important to note that
MapReduce has two major weaknesses that make it a poor option for iterative algorithms. The first is the startup time. Even if you are doing almost nothing in the MapReduce processing, there is a loss of 10 — 30 seconds just to startup cost. Second, MapReduce writes to disk frequently in order to facilitate fault tolerance. Later on in this chapter when we study Spark, we will learn that all this disk I/O isn’t required. Figure 3-2 illustrates how many times MapReduce reads and writes to disk during typical processing.
Figure 3-2. MapReduce I/O
One of the things that makes MapReduce so powerful is the fact that it is made not just of map and reduce tasks, but rather multiple components working together. Each one of these components can be extended by the developer. Therefore, in order to make the most out of MapReduce, it is important to understand its basic building blocks in detail. In the next section we’ll start with a detailed look into the map phase in order to work toward this understanding.
There are a number of good references that provide more detail on MapReduce than we can go into here, including implementations of various algorithms. Some good resources are Hadoop: The Definitive Guide, Hadoop in Practice, and MapReduce Design Patterns
by Donald Miner and Adam Shook (O’Reilly).
Map phase
Next, we provide a detailed overview of the major components involved in the map phase of a MapReduce job.
InputFormat
MapReduce jobs access their data through the InputFormat class. This class implements
two important methods:
getSplits()
This method implements the logic of how input will be distributed between the map processes. The most commonly used Input Format is the TextInputFormat, which
will generate an input split per block and give the location of the block to the map task. The framework will then execute a mapper for each of the splits. This is why developers usually assume the number of mappers in a MapReduce job is equal to the number of blocks in the data set it will process.
This method determines the number of map processes and the cluster nodes on which they will execute, but because it can be overridden by the developer of the
MapReduce job, you have complete control over the way in which files are read. For example, the NMapInputFormat in the HBase code base allows you to directly set the
number of mappers executing the job.
getReader()
This method provides a reader to the map task that allows it to access the data it will process. Because the developer can override this method, MapReduce can support any data type. As long as you can provide a method that reads the data into a writable object, you can process it with the MapReduce framework.
RecordReader
The RecordReader class reads the data blocks and returns key-value records to the map
task. The implementation of most RecordReaders is surprisingly simple: a RecordReader
instance is initialized with the start position in the file for the block it needs to read and the URI of the file in HDFS. After seeking to the start position, each call to nextKeyValue()
will find the next row delimiter and read the next record. This pattern is illustrated in
Figure 3-3. MapReduce RecordReader
The MapReduce framework and other ecosystem projects provide RecordReader
implementations for many file formats: text delimited, SequenceFile, Avro, Parquet, and more. There are even RecordReaders that don’t read any data — NMapInputFormat returns
a NullWritable as the key and value to the mapper. This is to make sure the map()
method gets called once.
Mapper.setup()
Before the map method of the map task gets called, the mapper’s setup() method is
called once. This method is used by developers to initialize variables and file handles that will later get used in the map process. Very frequently the setup() method is used to
retrieve values from the configuration object.
Every component in Hadoop is configured via a Configuration object, which contains
key-value pairs and is passed to the map and reduce JVMs when the job is executed. The contents of this object can be found in job.xml. By default the Configuration object
contains information regarding the cluster that every JVM requires for successful
execution, such as the URI of the NameNode and the process coordinating the job (e.g., the JobTracker when Hadoop is running within the MapReduce v1 framework or the Application Manager when it’s running with YARN).
Values can be added to the Configuration object in the setup phase, before the map and
reduce tasks are launched. After the job is executed, the mappers and reducers can access the Configuration object at any time to retrieve these values. Here is a simple example of
a setup() method that gets a Configuration value to populate a member variable:
public String fooBar;
public final String FOO_BAR_CONF = "custom.foo.bar.conf";
@Override
public void setup(Context context) throws IOException { foobar = context.getConfiguration().get(FOO_BAR_CONF); }
Note that anything you put in the Configuration object can be read through the
JobTracker (in MapReduce v1) or Application Manager (in YARN). These processes have a web UI that is often left unsecured and readable to anyone with access to its URL, so we recommend against passing sensitive information such as passwords through the
Configuration object. A better method is to pass the URI of a password file in HDFS,
which can have proper access permissions. The map and reduce tasks can then read the content of the file and get the password if the user executing the MapReduce job has sufficient privileges.
Mapper.map
The map() method is the heart of the mapper. Even if you decide to use the defaults and
not implement any other component of the map task, you will still need to implement a
map() method. This method has three inputs: key, value, and a context. The key and value are provided by the RecordReader and contain the data that the map() method
should process. The context is an object that provides common actions for a mapper:
sending output to the reducer, reading values from the Configuration object, and
incrementing counters to report on the progress of the map task.
When the map task writes to the reducer, the data it is writing is buffered and sorted. MapReduce will attempt to sort it in memory, with the available space defined by the
io.sort.mb configuration parameter. If the memory buffer is too small, not all the output
data can be sorted in memory. In this case the data is spilled to the local disk of the node where the map task is running and sorted on disk.
Partitioner
The partitioner implements the logic of how data is partitioned between the reducers. The default partitioner will simply take the key, hash it using a standard hash function, and divide by the number of reducers. The remainder will determine the target reducer for the record. This guarantees equal distribution of data between the reducers, which will help ensure that reducers can begin and end at the same time. But if there is any requirement for keeping certain values together for processing by the reducers, you will need to override the default and implement a custom partitioner.
One such example is a secondary sort. Suppose that you have a time series — for
example, stock market pricing information. You may wish to have each reducer scan all the trades of a given stock ticker ordered by the time of the trade in order to look for correlations in pricing over time. In this case you will define the key as ticker-time. The default partitioner could send records belonging to the same stock ticker to different reducers, so you will also want to implement your own partitioner to make sure the ticker symbol is used for partitioning the records to reducers, but the timestamp is not used. Here is a simple code example of how this type of partitioner method would be
public static class CustomPartitioner extends Partitioner<Text, Text> { @Override
public int getPartition(Text key, Text value, int numPartitions) { String ticker = key.toString().substring(5);
return ticker.hashCode() % numPartitions; }
}
We simply extract the ticker symbol out of the key and use only the hash of this part for partitioning instead of the entire key.
Mapper.cleanup()
The cleanup() method is called after the map() method has executed for all records. This
is a good place to close files and to do some last-minute reporting — for example, to write a message to the log with final status.
Combiner
Combiners in MapReduce can provide an easy method to reduce the amount of network traffic between the mappers and reducers. Let’s look at the famous word count example. In word count, the mapper takes each input line, splits it into individual words and writes out each word with “1” after it, to indicate current count, like the following:
the => 1
cat => 1
and => 1
the => 1
hat => 1
If a combine() method is defined it can aggregate the values produced by the mapper. It
executes locally on the same node where the mapper executes, so this aggregation reduces the output that is later sent through the network to the reducer. The reducer will still have to aggregate the results from different mappers, but this will be over significantly smaller data sets. It is important to remember that you have no control on whether the combiner will execute. Therefore, the output of the combiner has to be identical in format to the output of the mapper, because the reducer will have to process either of them. Also note that the combiner executes after the output of the mapper is already sorted, so you can assume that the input of the combiner is sorted.
In our example, this would be the output of a combiner: and => 1
cat => 1
the => 2
Reducer
The reduce task is not as complex as the map task, but there are a few components of which you should be aware.
Shuffle
Before the reduce stage begins, the reduce tasks copy the output of the mappers from the map nodes to the reduce nodes. Since each reducer will need data to aggregate data from multiple mappers, we can have each reducer just read the data locally in the same way that map tasks do. Copying data over the network is mandatory, so a high-throughput network within the cluster will improve processing times significantly. This is the main reason why using a combiner can be very effective; aggregating the results of the mapper before
sending them over the network will speed up this phase significantly.
Reducer.setup()
The reducer setup() step is very similar to the map setup(). The method executes before
the reducer starts processing individual records and is typically used to initialize variables and file handles.
Reducer.reduce()
Similar to the map() method in the mapper, the reduce() method is where the reducer
does most of the data processing. There are a few significant differences in the inputs, however:
The keys are sorted.
The value parameter has changed to values. So for one key the input will be all the
values for that key, allowing you to then perform any type of aggregation and
processing for all the values of the key. It is important to remember that a key and all its values will never be split across more than one reducer; this seems obvious, but often developers are surprised when one reducer takes significantly longer to finish than the rest. This is typically the result of this reducer processing a key that has
significantly more values than the rest. This kind of skew in the way data is partitioned is a very common cause of performance concerns, and as a result a skilled MapReduce developer will invest significant effort in making sure data is partitioned between the