Organizing and optimizing data in HDFS
TECHNIQUE 29 Using Avro to store multiple small binary files
Similarly, filecrush uses the block size to determine the output file sizes. By default, it won’t create output files that occupy more than eight blocks, but this can be custom- ized with the --max-file-blocks argument.
■Summary
Filecrush is a simple and quick way to combine small files together. It supports any type of input or output files as long as there are associated input format and output format classes. Unfortunately, it doesn’t work with Hadoop 2, and there hasn’t been much activity in the project over the last few years, so these points may rule out this utility for your environment.
The example presented in this technique works well in situations where the direc- tory being crushed is an external Hive table, or if you’re running it against a directory in a standard location where other users in a cluster expect your data to exist.
Currently, the filecrush project doesn’t work with Hadoop 2. If you’re looking for a solution for Hadoop 2, take a look at another HDFS compactor that I’m currently working on at https://github.com/alexholmes/hdfscompact.
Because filecrush requires input and output formats, one use case where it falls short is if you’re working with binary data and you need a way to combine small binary files together.
TECHNIQUE 29 Using Avro to store multiple small binary files
Let’s say that you’re working on a project akin to Google Images, where you crawl the web and download image files from websites. Your project is internet-scale, so you’re downloading millions of files and storing them individually in HDFS. You already know that HDFS doesn’t work well with a large number of small files, but you’re dealing with binary data, so the previous technique doesn’t fit your needs.
This technique shows how you can use Avro as a container file format for binary data in HDFS.
■Problem
You want to store a large number of binary files in HDFS, and to do so without hitting the NameNode memory limits.
■Solution
The easiest way to work with small binary files in HDFS is to package them into a larger containing file. For this technique, you’ll read all of the files in a directory stored on local disk and save them in a single Avro file in HDFS. You’ll also see how to use the Avro file in MapReduce to process the contents of the original files.
■Discussion
Figure 4.2 shows the first part of this technique, where you create the Avro file in HDFS. In doing so, you create fewer files in HDFS, which means less data to be stored in NameNode memory, which also means you can store more stuff.
Avro is a data serialization and RPC library invented by Doug Cutting, the creator of Hadoop. Avro has strong schema-evolution capabilities that give it an advantage
over competitors such as SequenceFile. Avro and its competitors were covered exten- sively in chapter 3.
Take a look at the Java code in the following listing, which will create the Avro file.9
public class SmallFilesWrite {
public static final String FIELD_FILENAME = "filename"; public static final String FIELD_CONTENTS = "contents"; private static final String SCHEMA_JSON =
"{\"type\": \"record\", \"name\": \"SmallFilesTest\", " + "\"fields\": ["
+ "{\"name\":\" + FIELD_FILENAME + "\", \"type\":\"string\"}," + "{\"name\":\" + FIELD_CONTENTS + "\", \"type\":\"bytes\"}]}";
public static final Schema SCHEMA = Schema.parse(SCHEMA_JSON); public static void writeToAvro(File srcPath,
OutputStream outputStream) throws IOException { DataFileWriter<Object> writer = new DataFileWriter<Object>( new GenericDatumWriter<Object>()) .setSyncInterval(100);
Listing 4.1 Read a directory containing small files and produce a single Avro file in HDFS
9 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/ SmallFilesWrite.java. Individual files. Read small files. Write into a single Avro file.
Avro files consist of a number of Avro
records. In this example, each Avro record represents a
single file.
The multiplexer
is responsible for reading the small files from the local filesystem and packaging
them in an Avro file in HDFS. Local filesystem Multiplexer filename:string contents:byte[] b a c Avro file Avro record HDFS b a c
Each Avro record consists of the original
filename and also the file contents.
Figure 4.2 Storing small files in Avro allows you to store more.
Avro uses JSON to define the data structure schema, which in this example is defined in the SCHEMA_JSON variable. You define two items per record: the filename you’re storing, and the raw contents of the file.
Create an Avro writer.
153
TECHNIQUE 29 Using Avro to store multiple small binary files
writer.setCodec(CodecFactory.snappyCodec()); writer.create(SCHEMA, outputStream); for (Object obj :
FileUtils.listFiles(srcPath, null, false)) { File file = (File) obj;
String filename = file.getAbsolutePath();
byte content[] = FileUtils.readFileToByteArray(file); GenericRecord record = new GenericData.Record(SCHEMA); record.put(FIELD_FILENAME, filename); record.put(FIELD_CONTENTS, ByteBuffer.wrap(content)); writer.append(record); System.out.println( file.getAbsolutePath() + ": " + DigestUtils.md5Hex(content)); } IOUtils.cleanup(null, writer); IOUtils.cleanup(null, outputStream); }
public static void main(String... args) throws Exception { Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config); File sourceDir = new File(args[0]); Path destFile = new Path(args[1]); OutputStream os = hdfs.create(destFile); writeToAvro(sourceDir, os);
} }
Compression dependency To run the code in this chapter, you’ll need to have both the Snappy and LZOP compression codecs installed on your host. Please refer to the appendix for details on how to install and configure them.
Let’s see what happens when you run this script against Hadoop’s config directory (replace $HADOOP_CONF_DIR with the directory containing your Hadoop configuration files):
$ hip hip.ch4.SmallFilesWrite $HADOOP_CONF_DIR test.avro /etc/hadoop/conf/ssl-server.xml.example: cb6f1b218... /etc/hadoop/conf/log4j.properties: 6920ca49b9790cb... /etc/hadoop/conf/fair-scheduler.xml: b3e5f2bbb1d6c... ...
Looks promising—let’s make sure that the output file is in HDFS:
$ hadoop fs -ls test.avro
2011-08-20 12:38 /user/aholmes/test.avro
Compress Avro content using the Snappy codec.
Associate the schema and output stream with the writer.
For each file in the input directory, create a new Avro record specifying your schema. Then write the filename and contents to the record using the names you defined in the schema.
GenericRecord is Avro’s generic wrapper around a single record. Set the filename field for the record.
Set the raw file bytes in the record.
Write the record to the writer (and its associated stream, which in this case will write into HDFS).
As you’re writing the file contents, you’ll also produce an MD5 hash so that later you can visually compare that your writing and reading are correct.
To be sure everything’s working as expected, you can also write some code that will read the Avro file from HDFS and output the MD5 hash for each file’s content:10
public class SmallFilesRead {
private static final String FIELD_FILENAME = "filename"; private static final String FIELD_CONTENTS = "contents";
public static void readFromAvro(InputStream is) throws IOException { DataFileStream<Object> reader =
new DataFileStream<Object>(
is, new GenericDatumReader<Object>()); for (Object o : reader) {
GenericRecord r = (GenericRecord) o; System.out.println( r.get(FIELD_FILENAME) + ": " + DigestUtils.md5Hex( ((ByteBuffer) r.get(FIELD_CONTENTS)).array())); } IOUtils.cleanup(null, is); IOUtils.cleanup(null, reader); }
public static void main(String... args) throws Exception { Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config); Path destFile = new Path(args[0]); InputStream is = hdfs.open(destFile); readFromAvro(is);
} }
This code is simpler than the write. Because Avro writes the schema into every Avro file, you don’t need to give Avro any information about the schema as part of deserial- ization. Give the code a spin:
$ hip hip.ch4.SmallFilesRead test.avro
/etc/hadoop/conf/ssl-server.xml.example: cb6f1b21... /etc/hadoop/conf/fair-scheduler.xml: b3e5f2bbb1d6... ...
At this point you have Avro files in HDFS. Even though this chapter is about HDFS, the next thing you’ll likely want to do is process the files that you wrote in MapReduce. Let’s look at how to do that, writing a map-only MapReduce job that can read the Avro records as input and write out a text file containing the filenames and MD5 hashes of the file contents, as shown in figure 4.3.
10GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/ SmallFilesRead.java.
Create an Avro reader object by supplying the InputStream of the file in HDFS. Note that you don’t need to supply schema information because Avro encodes that in the Avro file.
Loop through every record in the Avro file.
Cast each record to a GenericRecord instance. Retrieve the filename and content from the record.
The hashes are the same as those you generated with the write, so things are looking good.
155
TECHNIQUE 29 Using Avro to store multiple small binary files
The next listing shows the code for this MapReduce job.11
public class SmallFilesMapReduce {
public static void main(String... args) throws Exception { JobConf job = new JobConf();
job.setJarByClass(SmallFilesMapReduce.class); Path input = new Path(args[0]);
Path output = new Path(args[1]);
output.getFileSystem(job).delete(output, true); AvroJob.setInputSchema(job, SmallFilesWrite.SCHEMA); job.setInputFormat(AvroInputFormat.class); job.setOutputFormat(TextOutputFormat.class); job.setMapperClass(Map.class); FileInputFormat.setInputPaths(job, input); FileOutputFormat.setOutputPath(job, output); job.setNumReduceTasks(0); JobClient.runJob(job); }
public static class Mapper implements
Mapper<AvroWrapper<GenericRecord>, NullWritable,
Listing 4.2 A MapReduce job that takes as input Avro files containing the small files
11GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch4/ SmallFilesMapReduce.java.
This map-only job reads each Avro record
from HDFS…
… and produces output that contains the original filename and a MD5 hash
of the contents of the file.
Filename. MD5 hash. Mapper 1 Avro records. Avro file in HDFS a 8e9591a43c… b c00f7cd895… c 93987fleb3… Mapper 2 … ……… … ……… Input split 1 Text file in HDFS Input split 2 b a c e d f …
Figure 4.3 Map job to read Avro files and write out a text file
Avro has a convenience method to help set the appropriate job configuration settings for Avro input files.
Set the Avro-specific input format for your job.
The Avro file uses the basic building-block GenericRecord objects, so you define this type as your input type for the mapper.
Text, Text> { @Override
public void map(AvroWrapper<GenericRecord> key, NullWritable value,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException { outKey.set( key.datum().get( SmallFilesWrite.FIELD_FILENAME).toString()); outValue.set(DigestUtils.md5Hex( ((ByteBuffer) key.datum().get(SmallFilesWrite.FIELD_CONTENTS)) .array())); output.collect(outKey, outValue); } } }
If you run this MapReduce job over the Avro file you created earlier, the job log files will contain your filenames and hashes:
$ hip hip.ch4.SmallFilesMapReduce \ --input test.avro \
--output output
$ hadoop fs -cat output/part*
/etc/hadoop/conf/capacity-scheduler.xml: 0601a2... /etc/hadoop/conf/taskcontroller.cfg: 5c2c191420... /etc/hadoop/conf/configuration.xsl: e4e5e17b4a8... ...
In this technique, it was assumed that you were working with a file format (such as image files) that couldn’t have separate files concatenated together. If your files can be concatenated, you should consider that option. If you go this route, try your best to make sure that the file size is at least as large as the HDFS block size to minimize the data stored in NameNode.
■ Summary
You could have used Hadoop’s SequenceFile as a mechanism to hold your small files. SequenceFile is a more mature technology, having been around longer than Avro files. But SequenceFiles are Java-specific, and they don’t provide the rich interopera- bility and versioning semantics you get with Avro.
Google’s Protocol Buffers, as well as Apache Thrift (which originated from Face- book), can also be used to store small files. But neither has a input format that works with native Thrift or Protocol Buffers files.
Another approach you could use is to write the files into a zip file. The downsides to this approach are first that you’d have to write a custom input format12 to process the zip file, and second that zip files aren’t splittable (as opposed to Avro files and
12There has been a ticket open since 2008 asking for a zip input format implementation; see https:// issues.apache.org/jira/browse/MAPREDUCE-210.
Extract your data from the GenericRecord using the simple get methods.
Compare the hashes here with the output of the SmallFilesRead utility executed earlier in this technique, and you’ll see they are the same, which verifies that the files are identical.
157
TECHNIQUE 29 Using Avro to store multiple small binary files
SequenceFiles). This could be mitigated by generating multiple zip files and attempt- ing to make them close to the HDFS block size.
Hadoop also has a CombineFileInputFormat that can feed multiple input splits (across multiple files) into a single map task, which greatly decreases the number of map tasks needed to run.
You also could have created a tarball file containing all the files, and then pro- duced a separate text file that contained the locations of the tarball file in HDFS. This text file would be supplied as the input to the MapReduce job, and the mapper would open the tarball directly. But that approach would circumvent the locality in Map- Reduce, because the mappers would be scheduled to execute on the node that con- tained the text file, and would therefore likely need to read the tarball blocks from remote HDFS nodes, incurring unnecessary network I/O.
Hadoop Archive files (HARs) are Hadoop files specifically created to solve the problem of small files. They are a virtual filesystem that sits on top of HDFS. The disad- vantages of HAR files are that they can’t be optimized for local disk access in Map- Reduce, and they can’t be compressed.
Hadoop version 2 supports HDFS Federation, where HDFS is partitioned into multi- ple distinct namespaces, with each independently managed by a separate NameNode. This, in effect, means that the overall impact of keeping block information in memory can be spread across multiple NameNodes, thereby supporting a much larger number of small files. Hortonworks has a good blog post that contains more details about HDFS Federation (“An Introduction to HDFS Federation” [August 23, 2011], http:// hortonworks.com/an-introduction-to-hdfs-federation/).
Finally, MapR, which provides a Hadoop distribution, has its own distributed file- system that supports large numbers of small files. Using MapR for your distributed storage is a big change to your system, so it’s unlikely you’ll move to MapR to mitigate this problem with HDFS.
You may encounter times when you’ll want to work with small files in Hadoop, and using them directly would result in bloated NameNode memory use and MapReduce jobs that run slowly. This technique helps you mitigate these issues by packaging small files into larger container files. I picked Avro for this technique because of its support for splittable files and compression and its expressive schema language, which will help with versioning.
What if you have the opposite problem, where your files are big and you want to be more efficient about how you store your data? Our coverage of compression in Hadoop (section 4.2) will come to your rescue in these situations. But before we get to that section, let’s continue with our look at data organization and discover some tips on how to move data atomically in HDFS.
4.1.5 Atomic data movement
Activities such as partitioning and compacting tend to follow a similar pattern—they produce output files in a staging directory, and then need to atomically move them to
their final destination once all the output files have been successfully staged. This may bring up some questions:
■ What trigger do you use to determine that you’re ready to perform the atomic
move?
■ How do you move data atomically in HDFS?
■ What impact does your data movement have on any readers of the final data? It may be tempting to perform the atomic move as a postprocessing step within your MapReduce driver, but what will happen if the client process dies before the Map- Reduce application completes? This is where using the OutputCommitter in Hadoop is useful, because you can perform any atomic file movement as part of your job, as opposed to using the driver. An example of the OutputCommitter is shown in sec- tion 3.5.2.
The next question is how you can move data atomically in HDFS. For the longest time, it was thought that the rename method on the DistributedFileSystem class (which is the concrete implementation supporting HDFS) was atomic. But it turns out that there are situations where this isn’t an atomic operation. This was remedied in HADOOP-6240, but for backward compatibility reasons, the rename method wasn’t updated. As a result, the rename method is still not truly atomic; instead, you need to use a new API. As you can see, the code is cumbersome and it only works with newer versions of Hadoop:
DistributedFileSystem fs = (DistributedFileSystem) FileSystem.get(new Configuration()); fs.rename(src, dest, Options.Rename.NONE);
One thing that’s missing from HDFS is the ability to atomically swap directories. This would be useful in situations such as compacting, where you need to replace the entire contents of a directory that is being used by other processes such as Hive. There’s an open JIRA ticket titled “Atomic Directory Swapping Operation” (https:// issues.apache.org/jira/browse/HDFS-5902) that will hopefully provide this ability in the future.
It’s important that you factor the points discussed here into the design of your sys- tem. And if you’re using a third-party utility or library, try to determine whether it’s atomically moving data.
This concludes our look at data organization techniques. Let’s switch to another important data management topic in Hadoop, that of data compression.
4.2
Efficient storage with compression
Data compression is a mechanism that reduces data to a more compact form to save on storage space and to make it more efficient to transfer the data. Compression is an important aspect of dealing with files, and it becomes all the more important when deal- ing with the data sizes that Hadoop supports. Your goal with Hadoop is to be as efficient
159