Data serialization— working with text
3.3 Big data serialization formats
Unstructured text works well when you’re working with scalar or tabular data. Semi- structured text formats such as XML and JSON can model more sophisticated data structures that include composite fields or hierarchical data. But when you’re working with big data volumes, you’ll need serialization formats with compact serialized forms that natively support partitioning and have schema evolution features.
In this section we’ll compare the serialization formats that work best with big data in MapReduce and follow up with how you can use them with MapReduce.
3.3.1 Comparing SequenceFile, Protocol Buffers, Thrift, and Avro
In my experience, the following characteristics are important when selecting a data serialization format:
■ Code generation—Some serialization formats are accompanied by libraries with
code-generation abilities that allow you to generate rich objects, making it eas- ier for you to interact with your data. The generated code also provides the added benefit of type-safety to make sure that your consumers and producers are working with the right data types.
■ Schema evolution—Data models evolve over time, and it’s important that your
data formats support your need to modify your data models. Schema evolution allows you to add, modify, and in some cases delete attributes, while at the same time providing backward and forward compatibility for readers and writers.
■ Language support—It’s likely that you’ll need to access your data in more than one programming language, and it’s important that the mainstream languages have support for a data format.
■ Transparent compression—Data compression is important given the volumes of data you’ll work with, and a desirable data format has the ability to internally compress and decompress data on writes and reads. It’s a much bigger head- ache for you as a programmer if the data format doesn’t support compression, because it means that you’ll have to manage compression and decompression as part of your data pipeline (as is the case when you’re working with text-based file formats).
77
Big data serialization formats
■ Splittability—Newer data formats understand the importance of supporting mul-
tiple parallel readers that are reading and processing different chunks of a large file. It’s crucial that file formats contain synchronization markers (and thereby support the ability for a reader to perform a random seek and scan to the start of the next record).
■ Support in MapReduce and the Hadoop ecosystem—A data format that you select
must have support in MapReduce and other critical Hadoop ecosystem proj- ects, such as Hive. Without this support, you’ll be responsible for writing the code to make a file format work with these systems.
Table 3.1 compares the more popular data serialization frameworks to see how they stack up against each other. Additional background on these technologies is provided in the following discussion.
Let’s look at each of these formats in more detail.
SequenceFile
The SequenceFile format was created to work with MapReduce, Pig, and Hive, and therefore integrates well with all of those tools. Its shortcomings are mainly its lack of code generation and versioning support, as well as limited language support.
Protocol Buffers
The Protocol Buffers format has been used heavily by Google for interoperability. Its strengths are its versioning support and compact binary format. Downsides include its lack of support in MapReduce (or in any third-party software) for reading files generated by Protocol Buffers serialization. Not all is lost, however; we’ll look at how Elephant Bird uses Protocol Buffers serialization within a higher-level container file in section 3.3.3. Table 3.1 Feature comparison of data serialization frameworks
Library Code generation Schema evolution Language support Transparent compression Splittable Native support in MapReduce
Pig and Hive support
Sequence- File
No No Java, Python Yes Yes Yes Yes
Protocol Buffers Yes (optional) Yes C++, Java, Python, Perl, Ruby No No No No Thrift Yes (mandatory) Yes C, C++, Java, Python, Ruby, Perl Noa
a Thrift does support compression, but not in the Java library.
No No No Avro Yes (optional) Yes C, C++, Java, Python, Ruby, C#
Yes Yes Yes Yes
Parquet No Yes Java, Python (C++ planned in 2.0)
Thrift
Thrift was developed at Facebook as a data-serialization and RPC framework. It doesn’t have support in MapReduce for its native data-serialization format, but it can support different wire-level data representations, including JSON and various binary encod- ings. Thrift also includes an RPC layer with various types of servers, including a non- blocking implementation. We’ll ignore the RPC capabilities for this chapter and focus on the data serialization.
Avro
The Avro format is Doug Cutting’s creation to help address the shortcomings of SequenceFile.
Parquet
Parquet is a columnar file format with rich Hadoop system support, and it works well with data models such as Avro, Protocol Buffers, and Thrift. Parquet is covered in depth in section 3.4.
Based on certain evaluation criteria, Avro seems to be the best fit as a data serializa- tion framework in Hadoop. SequenceFile is a close second due to its inherent compat- ibility with Hadoop (it was designed for use with Hadoop).
You can review a useful jvm-serializers project at https://github.com/eishay/jvm- serializers/wiki/, which runs various benchmarks to compare file formats based on items such as serialization and deserialization times. It contains benchmarks for Avro, Protocol Buffers, and Thrift, along with a number of other frameworks.
After looking at how the various data-serialization frameworks compare, we’ll dedi- cate the next few sections to working with them. We’ll start off with a look at SequenceFile.
3.3.2 SequenceFile
Because SequenceFile was created for use with MapReduce, this format arguably offers the highest level of integration support in conjunction with MapReduce, Pig, and Hive. SequenceFile is a splittable binary file format that stores data in the form of key/value pairs. All SequenceFiles share the same header format, as shown in figure 3.8.
SequenceFiles come in three types, which vary based on how you apply compression. In addition, each type has its own corresponding Writer classes.
Uncompressed
Uncompressed SequenceFiles are written using the SequenceFile.Writer class. No advantage exists for this over the compressed formats, because compression generally reduces your storage footprint and is more efficient for reads and writes. The file format is shown in figure 3.9.
Version Header
Key class name Value class name
Is compressed? Is block compressed? Compression codec Metadata Sync Figure 3.8 SequenceFile header format
79
Big data serialization formats
Record-compressed
Record-compressed SequenceFiles are written using the SequenceFile.RecordCompress- Writer class. When a record is added to the SequenceFile, it’s immediately compressed and written to the file. The disadvantage of this approach is that your compression ratio will suffer compared to block compression. This file format, which is essentially the same as that of uncompressed SequenceFiles, is shown in figure 3.9.
Block-compressed
Block-compressed SequenceFiles are written using the SequenceFile.BlockCompress- Writer class. By default, the block size is the same as the HDFS block size, although this can be overridden. The advantage to this compression is that it’s more aggressive; the whole block is compressed, rather than compressing at the record level. Data isn’t written until it reaches the block size, at which point the whole block is compressed, resulting in good overall compression. The file format is shown in figure 3.10.
You only need one Reader class (SequenceFile.Reader) to read all three types of SequenceFiles. Even the Writer is abstracted, because you can call SequenceFile.create- Writer to choose the preferred format, and it returns a base class that can be used for writing regardless of compression.
Two of the three SequenceFile formats
(uncompressed and record-compressed) utilize the same file
format.
Header Record 1 Record 2 Sync Record 3
Record length Key length Key Value
The only difference between uncompressed and record-compressed
is compression of the value.
Figure 3.9 File format for record-compressed and uncompressed SequenceFiles
Header Block 1 Sync Block 2 Sync Block 3
Number of records
Length Key lengths
Length Keys
Length Value lengths
Length Values
Each one of these fields contains N entries, where
N is the number of records. They are also all compressed.
...
Block-compressed file format
N N
SequenceFiles have a pluggable serialization framework. Written keys and values must have a related org.apache.hadoop.io.serializer.Serializer and Deserializer for mar- shaling and unmarshaling. Hadoop comes with four serializers: Avro, Java, Tether (for binary data contained within a TetherData class), and Writable (the default serializer).9
Custom SequenceFile serialization If you want your SequenceFile to contain objects that aren’t Writable or Serializable, you’ll need to implement your own Serializer and register it. You register it by updating core-site.xml and appending the class name of the custom serialization implementation to the io.serializations property.
SequenceFiles are splittable because a synchronization marker is written approxi- mately every 6 KiB (1 kibibyte = 1024 bytes) in the file for record-based files, and before every block for block-based files.
Now let’s look at how to use SequenceFiles in MapReduce. TECHNIQUE 10 Working with SequenceFiles
Working with text in MapReduce can start to get tricky when you have to support com- plex types of data, which may include nonscalar data types such as lists or dictionaries. In addition, large compressed text files require some additional wrangling if Map- Reduce’s data locality properties are important to you. These challenges can be over- come by using a file format such as SequenceFile.
■ Problem
You want to work with a structured file format in MapReduce that you can use to model complex data structures and that also supports compression and splittable inputs.
■ Solution
This technique looks at how the SequenceFile file format can be used from both standalone applications and MapReduce.
■ Discussion
The SequenceFile format offers a high level of integration with computational tools such as MapReduce and can also model complex data structures. We’ll examine how to read and write SequenceFiles, and also how to use them with MapReduce, Pig, and Hive.
We’ll work with the stock data for this technique. The most common serialization method used with SequenceFiles is Writable, so you’ll need to create a Writable to rep- resent the stock data. The key elements of writing a complex Writable are extending the Writable class and defining serialization and deserialization methods, as shown in the following listing.10
9 Writable is an interface in Hadoop used to support general-purpose data serialization, and it’s used for
sending data across the wire between Hadoop components. Yahoo has a good introduction to Writables at
http://developer.yahoo.com/hadoop/tutorial/module5.html#writable.
10GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/ StockPriceWritable.java.
81
TECHNIQUE 10 Working with SequenceFiles
public class StockPriceWritable
implements WritableComparable<StockPriceWritable>, Cloneable { String symbol; String date; double open; double high; double low; double close; int volume; double adjClose; @Override
public void write(DataOutput out) throws IOException { WritableUtils.writeString(out, symbol); WritableUtils.writeString(out, date); out.writeDouble(open); out.writeDouble(high); out.writeDouble(low); out.writeDouble(close); out.writeInt(volume); out.writeDouble(adjClose); } @Override
public void readFields(DataInput in) throws IOException { symbol = WritableUtils.readString(in); date = WritableUtils.readString(in); open = in.readDouble(); high = in.readDouble(); low = in.readDouble(); close = in.readDouble(); volume = in.readInt(); adjClose = in.readDouble(); }
public static StockPriceWritable fromLine(String line) throws IOException {
CSVParser parser = new CSVParser(); String[] parts = parser.parseLine(line);
StockPriceWritable stock = new StockPriceWritable( parts[0], parts[1], Double.valueOf(parts[2]), Double.valueOf(parts[3]), Double.valueOf(parts[4]), Double.valueOf(parts[5]), Integer.valueOf(parts[6]), Double.valueOf(parts[7]) ); return stock; } }
Listing 3.3 A Writable implementation to represent a stock price
Write out the fields of this Writable in byte form to the output stream.
Read the stock fields in binary form into the Writable fields. Note that this method reads fields in the same order as they were written in the write method.
This helper method engineers a StockPriceWritable from a CSV line. It uses the open source OpenCSV project to parse the CSV.
Now that you have your Writable, you’ll need to write some code that will create a SequenceFile. You’ll read the stocks file from the local disk, create the StockWritable, and write it to your SequenceFile, using the stock symbol as your key:11
SequenceFile.Writer writer = SequenceFile.createWriter(conf, SequenceFile.Writer.file(outputPath), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(StockPriceWritable.class), SequenceFile.Writer.compression( SequenceFile.CompressionType.BLOCK, new DefaultCodec()) ); try {
Text key = new Text();
for (String line : FileUtils.readLines(inputFile)) {
StockPriceWritable stock = StockPriceWritable.fromLine(line); System.out.println("Stock = " + stock); key.set(stock.getSymbol()); writer.append(key, stock); } } finally { writer.close(); }
Great! Now how do you go about reading the files created with your writer?12
SequenceFile.Reader reader = new SequenceFile.Reader(conf,
SequenceFile.Reader.file(inputFile)); try {
System.out.println("Is block compressed = " + reader.isBlockCompressed()); Text key = new Text();
StockPriceWritable value = new StockPriceWritable(); while (reader.next(key, value)) {
System.out.println(key + "," + value); }
} finally { reader.close(); }
11GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/seqfile/ writable/SequenceFileStockWriter.java.
12GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/seqfile/ writable/SequenceFileStockReader.java.
Create a new SequenceFile writer, specifying that you want block-level compression. Also set the types for the keys and values you’ll be writing (in this case Text and IntWritable). Any Hadoop compression codec can be used with
SequenceFiles; see chapter 4 for more details on compression.
Read all the lines in the input file and then split them into key/ value pairs.
Create the StockPriceWritable instance, using the fromLine helper method in the StockPriceWritable class. Append a record to
the SequenceFile.
Create a reader that can read records from the SequenceFile. Note that you don’t need to specify that you used block-level compression in the file or what key/value types are contained in the file.
The next method on the reader returns true until it hits the end of the file. It also sets the key and value settings.
83
TECHNIQUE 10 Working with SequenceFiles