• No results found

The Context object enables the Mapper, Reducer, and Driver to interact with the rest of the Hadoop system even though they are executed in different processes. It includes

configuration data for the job, as well as interfaces that enable it to collect and emit output (context.write (Key, Value)) and to exchange required system- and job-wide information. You can use the Context object for one of these actions:

Report progress

Set application-level status messages Update counters

Indicate they are alive

Note: MapReduce for Hadoop 1.0 and Hadoop 2.0

MapReduce went through a complete overhaul in Hadoop 2.0 and is now called MapReduce 2. However, the MapReduce programming model has not changed; you can still use the MapReduce APIs as discussed throughout the book. YARN provides a new resource management and job scheduling

model, and its implementation executes MapReduce jobs. In most cases, your existing MapReduce jobs will run without any changes. In some cases, minor changes and recompilation might be needed.

Summary

MapReduce is a fault-tolerant, batch-oriented, parallel programming framework. It enables developers to write a program that processes a massive amount of data in a

distributed manner across hundreds or thousands of nodes in the cluster so that it finishes data processing in a reasonable amount of time on the large volume of data.

In this hour, you looked in detail at the MapReduce programming paradigm, its architecture, the different components of a MapReduce job, and the MapReduce job execution flow. In the next hour, you explore advanced concepts related to MapReduce (such as MapReduce streaming, MapReduce joins, distributed caches, different types of failures, and performance optimization for MapReduce jobs) and YARN of Hadoop 2.0 (architecture, different components, job execution pipeline in YARN, different types of failures, and so on).

Q&A

Q. What are the benefits of using the MapReduce framework?

A. The MapReduce framework offers several benefits over the traditional way of programming. For example, the MapReduce framework uses the concept of data locality, which means that it moves the computation near the data instead of moving the data to the computation. In doing so, it significantly improves performance. The framework breaks a bigger problem into smaller problems and executes these

smaller problems on several nodes in parallel, to ensure that the problem can be solved in a reasonable amount of time. It scales up to hundreds of thousands of nodes, for extremely parallel execution. The framework has built-in fault tolerance capabilities, so if any task fails, the framework itself recovers and re-executes it transparently.

Q. What is the difference between a JobTracker and a TaskTracker?

A. The MapReduce framework is based on a master and slaves architecture in which the master is called the JobTracker and the slaves are called TaskTrackers. A JobTracker is a daemon that runs on the master node for submitting, scheduling, monitoring, and tracking the execution of the MapReduce jobs across TaskTrackers; it also re-executes failed tasks, up to a predefined limit of retries. A TaskTracker is a daemon that runs on slave nodes and accepts tasks (Map, Reduce, and Shuffle

Q. What does a Mapper do?

A. A Mapper function processes input data to generate intermediate key-value pairs as output. Basically, input data is split into smaller chunks or key-value pairs, and the Map function is applied on each chunk or key-value pair separately; they run in parallel with other Map functions on other sets of key-value pairs. The Mapper function creates a new output list (intermediate key-value pair) by applying a function to individual elements of an input data.

Q. What does a Reducer do?

A. A Reducer merges all the values that share the same key to produce the final single output or key-value pair. Results generated from the Map function are shuffled and sorted by framework to make them available to the Reducer for further processing; this means that the Reducer can start only after the Mapper completes. In other words, the Reduce function iterates over the input values to produce an aggregate value as output for each specific key. Usually 0 (zero) or 1 (one) output value is generated per Reduce function invocation.

Q. What does a Partitioner do?

A. A Partitioner determines to which Reducer a given intermediate key-value pair from the Mappers should go. Based on the partitioning process being used, map- outputs from all the Mappers are shuffled across appropriate Reducers.

Q. What does a Driver do?

A. A Driver is a program for initializing and configuring MapReduce jobs. Within the Driver, you specify the job-specific configuration and also specify all the

MapReduce components (input and output formats, Mapper and Reducer, use of a Combiner, use of a custom partitioner, and so on). The Driver instructs the

MapReduce framework to execute your code on a set of specified input files and also controls where the output files will be placed. The driver can also get the status of the MapReduce job execution.

Q. What is the use of the Context object?

A. The Context object allows the Mapper, Reducer, and Driver to interact with the rest of the Hadoop system, even though they are executed in different processes across different nodes of the cluster. You can use the Context object to report progress, set application-level status messages, update Counters, indicate whether the Mapper and Reducer are alive, and more.

Quiz

1. What is the difference between the Combiner and the Reducer?

2. How are resources from a TaskTracker considered for use?

3. How does the JobTracker identify a TaskTracker for task execution?

4. Explain the concept of data node and TaskTracker co-location.

executing your MapReduce jobs without actually compiling every time?

Answers

1. Both the Combiner and the Reducer normally perform the same operation—the difference lies in the dataset they work on and where they run. The Combiner runs right after the Mapper function and works on the dataset generated by that specific Mapper function. This is not absolutely necessary, but it is added to the flow to optimize performance by reducing the amount of data that needs to move across nodes in the cluster. The Reducer, on the other hand, runs on the dataset (after the shuffle and sort) that comes from all the Mapper functions and produces the final output.

2. Each TaskTracker in the cluster has a set of computational slots (Map slots as well as Reduce slots). The available slots indicate the number of tasks that the specific TaskTracker can accept for processing simultaneously.

By default, the configurations mapred.tasktracker.map.tasks.maximum

and mapred.tasktracker.reduce.tasks.maximum both have a default value of 2. This means that two Mappers (two slots for maps) and two Reducers (two slots for Reducers) at a given instance on a TaskTracker can run simultaneously in parallel.

3. When assigning a task to a TaskTracker for processing, the JobTracker first tries to locate a TaskTracker with an available free slot on the same server that has the data node containing the data (to ensure data locality). If it doesn’t find this TaskTracker, it looks for a TaskTracker on another node in the same rack before it goes across the racks to locate a TaskTracker.

4. Typically, both the TaskTracker and data node daemons run on the same node. This way, the JobTracker tries to process the data on the same node where the data exists (to ensure data locality). Typically (by default configuration), three replicas of a data block are stored on three different nodes (or data nodes). When the JobTracker

requires this data block to serve the request, it reaches out to the nodes for executing the TaskTracker in order from closest to farthest. If the JobTracker finds an available slot to process the request on any of these nodes, it processes the request there;

otherwise, it spins a TaskTracker on another node in the same rack before it goes across racks to locate an available slot for TaskTracker execution (which is very rare). In the latter case, data movement might occur across nodes. Although spinning a TaskTracker on another rack is not that frequent it might be required when a local slot is not available where data exists, so that submitted request is not blocked.

5. The tool interface supports the handling of generic command-line options: You can pass your configuration information from CLI or when you are actually executing your MapReduce job. The hard-coded configuration in the driver program is overwritten by these passed-on configuration values while executing the program.

Hour 5. MapReduce—Advanced Concepts and YARN

What You’ll Learn in This Hour: DistributedCache Hadoop Streaming MapReduce Joins Bloom Filter Performance Improvement Handling Failures Counter YARN Uber-Tasking Optimization Failures in YARN

Resource Manager High Availability and Automatic Failover in YARN

In Hour 4, “The MapReduce Job Framework and Job Execution Pipeline,” you learned about the MapReduce programming paradigm, its architecture, the different components of a MapReduce job, and MapReduce job execution flow. This hour primarily focuses on advanced concepts related to MapReduce (including MapReduce streaming, MapReduce joins, distributed caches, different types of failures, and performance optimization for MapReduce jobs).

In this hour, you also explore YARN, which brings a major architectural change to the Hadoop platform and opens a new window for scalability, performance, and multitenancy. You dive deep into the YARN architecture, its different components, and its job execution pipeline, and you look at how YARN transparently handles different types of failures.

DistributedCache

The MapReduce framework DistributedCache feature enables you to share static data globally among all nodes in the cluster. Unlike reading from HDFS, in which data from a single file typically is read from a single Mapper, the distributed cache comes in handy when you want to share some data (for example, global lookup JAR files [or archives] that contain executable code) with all the Mappers of the job, or for initialization or libraries of code that all nodes in the cluster might need to access.

Note

If you have lookup data that you want to reference during processing on all the nodes, you can use the DistributedCache feature to send it to all the nodes. For example, you might be processing a sales transaction and need to

reference customer information (lookup) during processing.

Distributed cache files can be either private or public. Private distributed cache files are cached in a local directory private to the user whose jobs need these files (shared by all tasks and jobs of the specific user only). Public distributed cache files are cached in a global directory; file access is set up so that they are publicly visible to all users and are shared by tasks and jobs of all users on all the machines of the cluster.

To use distributed cache functionality, the data location should be set before the MapReduce job starts. To do this, you must create an instance of the

DistributedCache class while setting up your job; then use the

DistributedCache.addCacheFile() method to add files that should be sent to all nodes in the cluster (or to the machines where the actual execution is to start) and make them available for local use.

Likewise, to access these distributed cache files on a machine where Mapper is running, call the DistributedCache.getLocalCacheFiles() method to get the list of paths local to the current node for the copies of the cached files. Then you can use regular Java file I/O mechanisms, such as java.io.FileInputStream, to access and read the file of the returned path.