• No results found

Using a custom MapReduce partitioner

Organizing and optimizing data in HDFS

TECHNIQUE 27 Using a custom MapReduce partitioner

■Summary

There are plenty of things to like about MultipleOutputs: its support for both “old” and “new” MapReduce APIs and its support for multiple output format classes. But using MultipleOutputs does carry with it some constraints that you should be aware of:

Be cautious when using MultipleOutputs in a mapper—remember that you’ll end up with NumberOfMappers * NumberOfPartition output files, which in my experience can bring down clusters with large numbers of both values!

Each partition incurs the overhead of an HDFS file handle for the duration of

the task.

■ You can often end up with a large number of small files that accumulate across multiple uses of your partitioner. You’ll probably want to make sure that you have a compaction strategy in place to mitigate this problem (see section 4.1.4 for more details).

Although Avro comes with the AvroMultipleOutputs class, it’s quite slow due to some inefficiencies in the code.

In addition to the MultipleOutputs approach, Hadoop also comes with a MultipleOut- putFormat class that has features similar to MultipleOutputs. Its primary pitfalls are that it only supports the old MapReduce API and only one output format can be used for all the partitions.

Another partitioning strategy that you can employ is to use the MapReduce parti- tioner, which can help mitigate the large number of files that may be produced using MultipleOutputs.

TECHNIQUE 27 Using a custom MapReduce partitioner

Another partitioning approach is to use the partitioning facilities built into Map- Reduce. By default, MapReduce uses a hash partitioner that calculates the hash of each map output key and performs a modulo over the number of reducers to deter- mine which reducer the record should be sent to. You can control how partitioning occurs by writing your own custom partitioner and then route records according to your partitioning scheme.

This technique has an added benefit over the previous technique in that you’ll generally end up with fewer output files because each reducer will only create a single output file, as opposed to MultipleOutputs, where each map or reduce task will gener- ate N output files—one for each partition.

■Problem

You want to partition your input data.

■Solution

Write a custom partitioner that partitions records to the appropriate reducer.

■Discussion

Let’s look at the custom partitioner first. It exposes a helper method to the MapReduce driver that allows you to define a mapping from a date to a partition, and it writes this mapping to the job configuration. Then, when MapReduce loads the partitioner,

MapReduce calls the setConf method; in this partitioner you’ll read the mappings into a map, which is subsequently used when partitioning.3

public static class DatePartitioner extends Partitioner<Text, Text> implements Configurable { public static final String CONF_PARTITIONS = "partition.map"; public static final String PARTITION_DELIM = ":";

private Configuration conf;

private java.util.Map<Text, Integer> datePartitions = Maps.newHashMap();

public static void addPartitionToConfig(

Configuration conf, String date, int partition) { String addition = String.format("%s%s%d",

date, PARTITION_DELIM, partition); String existing = conf.get(CONF_PARTITIONS);

conf.set(CONF_PARTITIONS, existing == null ? addition : existing + "," + addition); }

@Override

public void setConf(Configuration conf) { this.conf = conf;

for (String partition : conf.getStrings(CONF_PARTITIONS)) { String[] parts = partition.split(PARTITION_DELIM); datePartitions.put(new Text(parts[0]),

Integer.valueOf(parts[1])); }

} @Override

public int getPartition(Text date, Text stock, int numPartitions) { return datePartitions.get(date);

} @Override

public Configuration getConf() { return conf;

} }

Your driver code needs to set up the custom partitioner configuration. The parti- tions in this example are dates, and you want to make sure that each reducer will correspond to a unique date. The stocks example data has 10 unique dates, so you configure your job with 10 reducers. You also call the partition helper function that was defined previously to set up the configuration that maps each unique date to a unique reducer.4

3 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/ CustomPartitionerJob.java.

4 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/ CustomPartitionerJob.java.

A helper function for the driver to add date-to- reducer partitions.

Load the date-to-reducer details from configuration into a map.

For each map output, pull the reducer (partition) that the record should be sent to.

147

TECHNIQUE 27 Using a custom MapReduce partitioner

Configuration conf = super.getConf();

List<String> dates = Lists.newArrayList("2000-01-03", "2001-01-02", "2002-01-02", "2003-01-02", "2004-01-02", "2005-01-03", "2006-01-03", "2007-01-03", "2008-01-02", "2009-01-02");

for (int partition=0; partition < dates.size(); partition++) { DatePartitioner.addPartitionToConfig(conf,

dates.get(partition), partition); }

Job job = new Job(conf);

job.setPartitionerClass(DatePartitioner.class); ...

The mapper does little other than extract the stock date from the input data and emit it as the output key:5

public static class Map extends Mapper<LongWritable, Text, Text, Text> { private Text date = new Text();

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

StockPriceWritable stock = StockPriceWritable.fromLine(value.toString()); date.set(stock.getDate()); context.write(date, value); } }

The command to run the preceding example is as follows:

$ hip hip.ch4.CustomPartitionerJob --input stocks.txt --output output

This job will generate 10 output files, each containing the stocks for that day.

■Summary

Using the MapReduce framework to naturally partition your data gives you a couple of advantages:

Data in your partitions will be sorted because the shuffle will ensure that all data

streamed to a reducer will be sorted. This allows you to use optimized join strat- egies on your data.

You can deduplicate data in the reducer, again as a benefit of the shuffle phase.

5 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/ CustomPartitionerJob.java.

Define the 10 dates that correspond to the input data.

For each date, indicate the reducer (partition) that the date is associated with.

Specify the custom reducer that will be used for the job.

Emit the date as the output key and the stock as the output value.

The main problem to look out for with this technique is data skew. You want to make sure that you can spread the load across reducers as much as possible, which may be a challenge if there’s a natural skew in your data. For example, if your partitions are days, then it’s possible that the majority of your records will be for a single day, and you may have only a few records for either a previous or following day. In this case, you’ll ideally want to partition records in a way that allocates the majority of the reduc- ers to a single day, and then maybe one or two for the previous or following days. You can also sample your inputs and dynamically determine the optimal number of reduc- ers based on your sample data.

Once you’ve produced your partitioned output, the next challenge is how to deal with the potentially large number of small files that have resulted from the partitioning.

4.1.4 Compacting

Sometimes having small files in HDFS can’t be avoided—maybe you’re using a parti- tioning technique similar to those described previously, or maybe your data organi- cally lands in HDFS in small file sizes. Either way, you’ll be exposing some weaknesses in HDFS and MapReduce, including the following:

■ Hadoop’s NameNode keeps all the HDFS metadata in memory for fast metadata operations. Yahoo! estimated that each file, on average, occupies 600 bytes of space in memory,6 which translates to a metadata overhead of one billion files amounting to 60 GB, all of which needs to be stored in the NameNode’s mem- ory. That’s a lot of memory for a single process, even with today’s mid-tier server RAM capacities.

If your input to a MapReduce job is a large number of files, the number of map-

pers that will run (assuming your files are text or splittable) would be equiva- lent to the number of blocks that these files occupy. If you run a MapReduce job whose input is thousands or millions of files, your job will spend more time at the kernel layer dealing with creating and destroying your map task processes than it will on its work.

■ Finally, if you’re running in a controlled environment where there’s a sched- uler, you may have a cap on the number of tasks your MapReduce job can use. Because each file (by default) results in at least one map task, this could cause your job to be rejected by the scheduler.

If you’re thinking you won’t have this problem, think again. What percentage of your files are smaller than the HDFS block size?7 And how much smaller are they—50%, 70%, 90%? What if your big data project takes off and suddenly you need to be able to scale to handle

6 According to Yahoo! statistics, each block or file inode uses less than 200 bytes of memory, and on average

each file occupies 1.5 blocks with a 3x replication factor. See Yahoo!’s page titled “Scalability of the Hadoop Distributed File System,” http://developer.yahoo.com/blogs/hadoop/posts/2010/05/scalability_of_the _hadoop_dist/ and a JIRA ticket titled “Name-node memory size estimates and optimization proposal,”

https://issues.apache.org/jira/browse/HADOOP-1687.

149