Data serialization— working with text
3.2 Processing common serialization formats
XML and JSON are industry-standard data interchange formats. Their ubiquity in the technology industry is evidenced by their heavy adoption in data storage and
<<abstract>> RecordWriter<K,V> void write(K key, V value)
void close(TaskAttemptContext context) Type definitions for reduce
output keys and values.
Write a logical key/value record to the destination
data sink.
Clean up any resources related to the destination
data sink.
Figure 3.6 The annotated
RecordWriter class overview
Write out the key, separator, value, and newline.
Write out the Object to the output stream.
69
TECHNIQUE 8 MapReduce and XML
exchange. In this section we’ll look at how you can read and write these data formats in MapReduce.
3.2.1 XML
XML has existed since 1998 as a mechanism to represent data that’s readable by machine and human alike. It became a universal language for data exchange between systems and is employed by many standards today, such as SOAP and RSS, and it’s used as an open data format for products such as Microsoft Office.
TECHNIQUE 8 MapReduce and XML
MapReduce comes bundled with an InputFormat that works with text, but it doesn’t come with one that supports XML. Working on a single XML file in parallel in MapReduce is tricky because XML doesn’t contain a synchronization marker in its data format.2
■Problem
You want to work with large XML files in MapReduce and be able to split and process them in parallel.
■Solution
Mahout’s XMLInputFormat can be used to work with XML files in HDFS with MapReduce. It reads records that are delimited by specific XML begin and end tags. This technique also explains how XML can be emitted as output in MapReduce output.
■Discussion
MapReduce doesn’t contain built-in support for XML, so we’ll turn to another Apache project—Mahout, a machine learning system—to provide an XML InputFormat. To showcase the XML InputFormat, you can write a MapReduce job that uses Mahout’s XML input format to read property names and values from Hadoop’s configuration files. The first step is to set up the job configuration:
conf.set("xmlinput.start", "<property>"); conf.set("xmlinput.end", "</property>"); job.setInputFormatClass(XmlInputFormat.class);
Mahout’s XML input format is rudimentary; you need to tell it the exact start and end XML tags that will be searched for in the file, and files are split (and records extracted) using the following approach:
2 A synchronization marker is typically some binary data used to demarcate record boundaries. It allows a
reader to perform a random seek into a file and determine where the next record starts by reading until a synchronization marker is found.
Define the string form of the XML start tag. Your job is taking Hadoop config files as input, where each configuration entry uses the property tag.
Define the string form of the XML end tag.
Set the Mahout XML input format class.
1 Files are split into discrete sections along HDFS block boundaries for data locality.
2 Each map task operates on a specific input split. The map task seeks to the
start of the input split, and then continues to process the file until it hits the first xmlinput.start.
3 The content between xmlinput.start and xmlinput.end is repeatedly emitted
until the end of the input split is reached.
Next you need to write a mapper to consume Mahout’s XML input format. The XML element in Text form has been supplied, so you’ll need to use an XML parser to extract content from the XML.3
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws
IOException, InterruptedException { String document = value.toString(); System.out.println("'" + document + "'"); try { XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(new ByteArrayInputStream(document.getBytes())); String propertyName = "; String propertyValue = "; String currentElement = "; while (reader.hasNext()) { int code = reader.next(); switch (code) { case START_ELEMENT: currentElement = reader.getLocalName(); break; case CHARACTERS: if (currentElement.equalsIgnoreCase("name")) { propertyName += reader.getText(); } else if (currentElement.equalsIgnoreCase("value")) { propertyValue += reader.getText(); } break; } } reader.close(); context.write(propertyName.trim(), propertyValue.trim()); } catch (Exception e) {
log.error("Error processing '" + document + "'", e); }
} }
Listing 3.1 Extracting content with Java’s STAX parser
3 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/xml/ XMLMapReduceReader.java.
71
TECHNIQUE 8 MapReduce and XML
The map is given a Text instance, which contains a String representation of the data between the start and end tags. In this code, you use Java’s built-in Streaming API for XML (StAX) parser to extract the key and value for each property and output them.
If you run the MapReduce job against Cloudera’s core-site.xml and use the HDFS
cat command to show the output, you’ll see the following:
$ hadoop fs -put $HADOOP_HOME/conf/core-site.xml core-site.xml $ hip hip.ch3.xml.XMLMapReduceReader \
--input core-site.xml \ --output output
$ hadoop fs -cat output/part* fs.default.name hdfs://localhost:8020 hadoop.tmp.dir /usr/local/hadoop/tmp ...
This output shows that you’ve successfully worked with XML as an input serialization format with MapReduce. Not only that, you can support huge XML files because the input format supports splitting XML.
Writing XML
Having successfully read XML, the next question is how to write XML. In your reducer, you have callbacks that occur before and after your main reduce method is called, which you can use to emit a start and end tag, as shown in the following example.4
public static class Reduce
extends Reducer<Text, Text, Text, Text> { @Override
protected void setup( Context context)
throws IOException, InterruptedException { context.write(new Text("<configuration>"), null); }
@Override
protected void cleanup( Context context)
throws IOException, InterruptedException { context.write(new Text("</configuration>"), null); }
private Text outputKey = new Text();
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException { for (Text value : values) {
outputKey.set(constructPropertyXml(key, value));
Listing 3.2 A reducer to emit start and end tags
4 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/xml/ XmlMapReduceWriter.java.
Use the setup method to write the root element start tag.
Use the cleanup method to write the root element end tag.
Construct a child XML element for each key/value combination provided in the reducer.
context.write(outputKey, null); }
}
public static String constructPropertyXml(Text name, Text value) { return String.format(
"<property><name>%s</name><value>%s</value></property>", name, value);
} }
This could also be embedded in an OutputFormat, but I’ll leave that as a project for you to experiment with. Writing an OutputFormat class is covered in section 3.5.1.
Pig
If you want to work with XML in Pig, the Piggy Bank library5 (a user-contributed library of useful Pig code) contains an XMLLoader. It works much like this technique and captures all of the content between a start and end tag, supplying it as a single byte array field in a Pig tuple.
Hive
Currently, no means exist for working with XML in Hive. You’d have to write a custom SerDe, which we’ll cover in chapter 9. 6
■ Summary
Mahout’s XmlInputFormat certainly helps you work with XML. But it’s sensitive to an exact string match of both the start and end element names. If the element tag can contain attributes with variable values, or if the generation of the element can’t be controlled and could result in XML namespace qualifiers being used, then this approach may not work for you. Also problematic will be situations where the element name you specify is used as a descendant child element.
If you have control over the XML laid out in the input, this exercise can be simpli- fied by having a single XML element per line. This will let you use the built-in Map- Reduce text-based input formats (such as TextInputFormat), which treat each line as a record and split to preserve that demarcation.
Another option worth considering is that of a preprocessing step, where you could convert the original XML into a separate line per XML element, or convert it into an altogether different data format, such as a SequenceFile or Avro, both of which solve the splitting problem for you.
Now that you have a handle on how to work with XML, let’s tackle another popular serialization format, JSON.
3.2.2 JSON
JSON shares the machine- and human-readable traits of XML and has existed since the early 2000s. It’s less verbose than XML, and it doesn’t have the rich typing and valida- tion features available in XML.
5 Piggy Bank—user-defined pig functions: https://cwiki.apache.org/confluence/display/PIG/PiggyBank. 6 SerDe is a shortened form of Serializer/Deserializer; it’s the mechanism that allows Hive to read and write
data in HDFS.
Emit the XML element.
73
TECHNIQUE 9 MapReduce and JSON
TECHNIQUE 9 MapReduce and JSON
Imagine you have some code that’s downloading JSON data from a streaming REST service, and every hour it writes a file into HDFS. The amount of data being down- loaded is large, so each file produced is multiple gigabytes in size.
You’ve been asked to write a MapReduce job that can take as input these large JSON files. What you have here is a problem in two parts: first, MapReduce doesn’t come with an InputFormat that works with JSON; second, how does one even go about splitting JSON?
Figure 3.7 shows the problem with splitting JSON. Imagine that MapReduce cre- ated a split as shown in the figure. The map task that operates on this input split will perform a seek to the start of the input split, and then needs to determine the start of the next record. With file formats such as JSON and XML, it’s challenging to know when the next record starts due to the lack of a synchronization marker, or any other indicator that identifies the start of a record.
JSON is harder to partition into distinct segments than a format such as XML because JSON doesn’t have a token (like an end tag in XML) to denote the start or end of a record.
■Problem
You want to work with JSON inputs in MapReduce, and also to ensure that input JSON files can be partitioned for concurrent reads.
■Solution
The Elephant Bird LzoJsonInputFormat input format is used as a basis to create an input format class to work with JSON elements. This technique also discusses another approach using my open source project that can work with multiline JSON.
Input split N
Figure 3.7 Example of issue with JSON and multiple input splits
■ Discussion
Elephant Bird (https://github.com/kevinweil/elephant-bird), an open source project that contains useful utilities for working with LZOP compression, has an LzoJsonInputFormat that can read JSON, though it requires that the input file be LZOP-compressed. You can use the Elephant Bird code as a template for your own JSONInputFormat that doesn’t have the LZOP compression requirement.
This solution assumes that each JSON record is on a separate line. Your JsonRecord- Format is simple and does nothing other than construct and return a JsonRecordFormat, so we’ll skip over that code. The JsonRecordFormat emits LongWritable, MapWritable key/value pairs to the mapper, where the MapWritable is a map of JSON element names and their values. Let’s take a look at how this RecordReader works. It uses the LineRecordReader, which is a built-in MapReduce reader that emits a record for each line. To convert the line to a MapWritable, the reader uses the following method:7
public static boolean decodeLineToJson(JSONParser parser, Text line, MapWritable value) {
try {
JSONObject jsonObj = (JSONObject)parser.parse(line.toString()); for (Object key: jsonObj.keySet()) {
Text mapKey = new Text(key.toString()); Text mapValue = new Text();
if (jsonObj.get(key) != null) { mapValue.set(jsonObj.get(key).toString()); } value.put(mapKey, mapValue); } return true; } catch (ParseException e) {
LOG.warn("Could not json-decode string: " + line, e); return false;
} catch (NumberFormatException e) {
LOG.warn("Could not parse field into number: " + line, e); return false;
} }
The reader uses the json-simple parser (http://code.google.com/p/json-simple/) to parse the line into a JSON object, and then iterates over the keys in the JSON object and puts them, along with their associated values, into a MapWritable. The mapper is given the JSON data in LongWritable, MapWritable pairs and can process the data accordingly.
The following shows an example JSON object:
{
"results" : [
{
"created_at" : "Thu, 29 Dec 2011 21:46:01 +0000",
7 GitHub source: https://github.com/alexholmes/hiped2/blob/master/src/main/java/hip/ch3/json/ JsonInputFormat.java.
75
TECHNIQUE 9 MapReduce and JSON
"from_user" : "grep_alex",
"text" : "RT @kevinweil: After a lot of hard work by ..." },
{
"created_at" : "Mon, 26 Dec 2011 21:18:37 +0000", "from_user" : "grep_alex",
"text" : "@miguno pull request has been merged, thanks again!" }
] }
This technique assumes one JSON object per line. The following code shows the JSON file you’ll work with in this example:
{"created_at" : "Thu, 29 Dec 2011 21:46:01 +0000","from_user" : ... {"created_at" : "Mon, 26 Dec 2011 21:18:37 +0000","from_user" : ...
Now copy the JSON file into HDFS and run your MapReduce code. The MapReduce code writes each JSON key/value pair to the output:
$ hadoop fs -put test-data/json/tweets.json tweets.json $ hip hip.ch3.json.JsonMapReduce \
--input tweets.json \ --output output
$ hadoop fs -cat output/part*
text RT @kevinweil: After a lot of hard work by ... from_user grep_alex
created_at Thu, 29 Dec 2011 21:46:01 +0000
text @miguno pull request has been merged, thanks again! from_user grep_alex
created_at Mon, 26 Dec 2011 21:18:37 +0000
Writing JSON
An approach similar to what we looked at in section 3.2.1 for writing XML could also be used to write JSON.
Pig
Elephant Bird contains a JsonLoader and an LzoJsonLoader, which you can use to work with JSON in Pig. These loaders work with line-based JSON. Each Pig tuple contains a chararray field for each JSON element in the line.
Hive
Hive contains a DelimitedJSONSerDe class which can serialize JSON, but unfortunately can’t deserialize it, so you can’t load data into Hive using this SerDe.
■Summary
This solution assumes that the JSON input is structured with one line per JSON object. How would you work with JSON objects that were across multiple lines? An experimen- tal project on GitHub8 works with multiple input splits over a single JSON file. This approach searches for a specific JSON member and retrieves the containing object.
You can also review a Google Code project called hive-json-serde (http:// code.google.com/p/hive-json-serde/), which can support both serialization and deserialization.
As you can see, using XML and JSON in MapReduce is kludgy and has rigid require- ments about how to lay out your data. Support for these two formats in MapReduce is also complex and error-prone, because neither lends itself naturally to splitting. Clearly, you need to look at alternative file formats that have built-in support for splittability.
The next step is to look at more sophisticated file formats that are better suited to working with MapReduce, such as Avro and SequenceFile.