Data storage on the batch layer: Illustration
5.1 Using the Hadoop Distributed File System
5.3.3 A split pail to vertically partition the dataset
The last step is to create a pail structure that implements the vertical partitioning strat- egy for a graph schema. It’s also the most involved step. All of the following snippets are extracted from the SplitDataPailStructure class that accomplishes this task.
At a high level, the SplitDataPailStructure code inspects the DataUnit class to cre- ate a map between Thrift IDs and classes to process the corresponding type. Figure 5.2 demonstrates this map for SuperWebAnalytics.com.
Listing 5.6 A concrete implementation for Data objects
The constructor of the Thrift object must be implemented in the child class.
Specifies that the pail stores Data
objects Needed by ThriftPailStructure to
create an object for deserialization
This pail structure doesn’t use vertical partitioning. union DataUnit { 1: PersonProperty person_property; 2: PageProperty page_property; 3: EquivEdge equiv; 4: PageViewEdge page_view; } Map<Short, FieldStructure> {{1: PropertyStructure}, {2: PropertyStructure}, {3: EdgeStructure}, {4: EdgeStructure}}
Figure 5.2 The SplitDataPailStructure field map for the DataUnit class of SuperWebAnalyt- ics.com
The next listing contains the code that generates the field map. It works for any graph schema, not just this example.
public class SplitDataPailStructure extends DataPailStructure { public static HashMap<Short, FieldStructure> validFieldMap =
new HashMap<Short, FieldStructure>(); static {
for(DataUnit._Fields k: DataUnit.metaDataMap.keySet()) {
FieldValueMetaData md = DataUnit.metaDataMap.get(k).valueMetaData; FieldStructure fieldStruct;
if(md instanceof StructMetaData && ((StructMetaData) md).structClass
.getName().endsWith("Property")) {
fieldStruct = new PropertyStructure( ((StructMetaData) md).structClass); } else {
fieldStruct = new EdgeStructure(); }
validFieldMap.put(k.getThriftFieldId(), fieldStruct); }
}
// remainder of class elided }
As mentioned in the code annotation, FieldStructure is an interface shared by both
PropertyStructure and EdgeStructure. The definition of the interface is as follows:
protected static interface FieldStructure { public boolean isValidTarget(String[] dirs);
public void fillTarget(List<String> ret, Object val); }
Later we’ll provide the details for the EdgeStructure and PropertyStructure classes. For now, we’re just looking at how this interface is used to accomplish the vertical par- titioning of the table:
// methods are from SplitDataPailStructure public List<String> getTarget(Data object) {
List<String> ret = new ArrayList<String>(); DataUnit du = object.get_dataunit(); short id = du.getSetField().getThriftFieldId(); ret.add("" + id); validFieldMap.get(id).fillTarget(ret, du.getFieldValue()); return ret; }
Listing 5.7 Code to generate the field map for a graph schema
FieldStructure is an interface for both edges and properties. Thrift code to inspect and iterate over the DataUnit object Properties are identified by the class name of the inspected object. If class name doesn’t end with “Property”, it must be an edge. The top-level directory is determined by inspecting the DataUnit. Any further partitioning is passed to the FieldStructure.
public boolean isValidTarget(String[] dirs) { if(dirs.length==0) return false;
try { short id = Short.parseShort(dirs[0]); FieldStructure s = validFieldMap.get(id); if(s==null) return false; else return s.isValidTarget(dirs); } catch(NumberFormatException e) { return false; } }
The SplitDataPailStructure is responsible for the top-level directory of the verti- cal partitioning, and it passes the responsibility of any additional subdirectories to the FieldStructure classes. Therefore, once you define the EdgeStructure and
PropertyStructure classes, your work will be done.
Edges are structs and hence cannot be further partitioned. This makes the Edge- Structure class trivial:
protected static class EdgeStructure implements FieldStructure { public boolean isValidTarget(String[] dirs) { return true; } public void fillTarget(List<String> ret, Object val) { } }
But properties are unions, like the DataUnit class. The code similarly uses inspection to create a set of valid Thrift field IDs for the given property class. For completeness we provide the full listing of the class here, but the key points are the construction of the set and the use of this set in fulfilling the FieldStructure contract.
protected static class PropertyStructure implements FieldStructure { private TFieldIdEnum valueId;
private HashSet<Short> validIds; public PropertyStructure(Class prop) {
try {
Map<TFieldIdEnum, FieldMetaData> propMeta = getMetadataMap(prop); Class valClass = Class.forName(prop.getName() + "Value");
valueId = getIdForClass(propMeta, valClass); validIds = new HashSet<Short>();
Map<TFieldIdEnum, FieldMetaData> valMeta = getMetadataMap(valClass);
for(TFieldIdEnum valId: valMeta.keySet()) { validIds.add(valId.getThriftFieldId()); }
} catch(Exception e) {
throw new RuntimeException(e); }
}
Listing 5.8 The PropertyStructure class
The validity check first verifies the DataUnit field ID is in the field map.
Any additional checks are passed to the FieldStructure.
A Property is a Thrift struct containing a property value field; this is the ID for that field. The set of
Thrift IDs of the property value types
Parses the Thrift metadata to get the field ID of the property value
Parses the metadata to get all valid field IDs of the property value
public boolean isValidTarget(String[] dirs) { if(dirs.length < 2) return false;
try { 1((check)) short s = Short.parseShort(dirs[1]); return validIds.contains(s); } catch(NumberFormatException e) { return false; } }
public void fillTarget(List<String> ret, Object val) { ret.add("" + ((TUnion) ((TBase)val)
.getFieldValue(valueId)) .getSetField()
.getThriftFieldId()); }
}
private static Map<TFieldIdEnum, FieldMetaData> getMetadataMap(Class c)
{
try {
Object o = c.newInstance();
return (Map) c.getField("metaDataMap").get(o); } catch (Exception e) {
throw new RuntimeException(e); }
}
private static TFieldIdEnum getIdForClass(
Map<TFieldIdEnum, FieldMetaData> meta, Class toFind) {
for(TFieldIdEnum k: meta.keySet()) {
FieldValueMetaData md = meta.get(k).valueMetaData; if(md instanceof StructMetaData) {
if(toFind.equals(((StructMetaData) md).structClass)) { return k;
} } }
throw new RuntimeException("Could not find " + toFind.toString() + " in " + meta.toString());
}
After that last bit of code, take a break—you’ve earned it. The good news is that this was a one-time cost. Once you’ve defined a pail structure for your master dataset, future interaction with the batch layer will be straightforward. Moreover, this code can be applied to any project where you’ve created a Thrift graph schema.
The vertical partitioning of a property value has a depth of at least two.
Uses the Thrift IDs to create the directory path for the current fact
getMetadataMap and getIdForClass are helper functions for inspecting Thrift objects.
5.4
Summary
You learned that maintaining a dataset within HDFS involves the common tasks of appending new data to the master dataset, vertically partitioning data into many fold- ers, and consolidating small files. You witnessed that accomplishing these tasks using the HDFSAPI directly is tedious and prone to human error.
You then were introduced to the Pail abstraction. Pail isolates you from the file for- mats and directory structure of HDFS, making it easy to do robust, enforced vertical partitioning and perform common operations on your dataset. Using the Pail abstrac- tion ultimately takes very few lines of code. Vertical partitioning happens automati- cally, and tasks like appends and consolidation are simple one-liners. This means you can focus on how you want to process your records rather than on the details of how to store those records.
With HDFS and Pail, we’ve presented a way of storing the master dataset that meets all the requirements and is elegant to use. Whether you choose to use these tools or not, we hope we’ve set a bar for how elegant this piece of an architecture can be, and that you’ll aim to achieve at least the same level of elegance.
In the next chapter you’ll learn how to leverage the record storage to accomplish the next key step of the Lambda Architecture: computing batch views.
83
Batch layer
The goal of a data system is to answer arbitrary questions about your data. Any question you could ask of your dataset can be implemented as a function that takes all of your data as input. Ideally, you could run these functions on the fly whenever you query your dataset. Unfortunately, a function that uses your entire dataset as input will take a very long time to run. You need a different strategy if you want your queries answered quickly.
In the Lambda Architecture, the batch layer precomputes the master dataset into batch views so that queries can be resolved with low latency. This requires strik- ing a balance between what will be precomputed and what will be computed at exe- cution time to complete the query. By doing a little bit of computation on the fly to complete queries, you save yourself from needing to precompute absurdly large
This chapter covers
■ Computing functions on the batch layer ■ Splitting a query into precomputed and on-the-
fly components
■ Recomputation versus incremental algorithms ■ The meaning of scalability
■ The MapReduce paradigm
batch views. The key is to precompute just enough information so that the query can be completed quickly.
In the last two chapters, you learned how to form a data model for your dataset and how to store your data in the batch layer in a scalable way. In this chapter you’ll take the next step of learning how to compute arbitrary functions on that data. We’ll start by introducing some motivating examples that we’ll use to illustrate the concepts of computation on the batch layer. Then you’ll learn in detail how to compute indexes of the master dataset that the application layer will use to complete queries. You’ll examine the trade-offs between recomputation algorithms, the style of algorithm emphasized in the batch layer, and incremental algorithms, the kind of algorithms typi- cally used with relational databases. You’ll see what it means for the batch layer to be scalable, and then you’ll learn about MapReduce, a paradigm for scalable and nearly arbitrary batch computation. You’ll see that although MapReduce is a great primitive, it’s quite a low-level abstraction. We’ll finish things off by showing you a higher-level paradigm that can be executed via MapReduce.
6.1
Motivating examples
Let’s consider some example queries to motivate the theoretical discussions in this chapter. These queries illustrate the concepts of batch computation—each example shows how you would compute the query as a function that takes the entire master dataset as input. Later you’ll modify these implementations to use precomputation rather than execute them completely on the fly.