Pig has a number of specialized JOINs that, used in their appropriate circumstances, bring enough performance improvements to organize a cult around. (TODO: make funny).
Specialized Pig Join #1: The REPLICATED JOIN
If you are joining a large dataset with a small-enough one, Pig can often execute the operation using a Mapper-Only job, eliminating the costly startup time and network transfer of a Reduce step. This is commonplace enough and the performance impact large enough that it is always worth considering whether this type of JOIN is applicable.
Imagine visiting the opera while the United Nations is in town. The smart theater owner will prepare librettos in, say, a dozen languages, enough to delight the many thousands of attendees. A bad way to distribute them would be to arrange kiosks, one per language, throughout the lobby. Every aficionado would first have to fight their way through the crowd to find the appropriate kiosk, then navigate across the theater to find their seats.
Our theater owner, being smart, instead handles the distribution as follows: Ushers stand at every entrance, armed with stacks of librettos; at every entrance, all the languages are represented. This means that, as each attendee files in, they simply select the appropriate one from what is on hand, then head to their seat without delay.
Optimizing Hadoop Dataflows | 165
A Mapper-Only JOIN works analogously. Every Mapper reads the small dataset into a lookup table — a hash map keyed by the JOIN key (this is why you will also see it referred to as a HashMap JOIN). Every Mapper loads the contents of the smaller dataset in full into its own local lookup table (which is why it is also known as a Replicated JOIN). The minor cost of replicating that dataset to every single Mapper is often a huge improve‐
ment in processing speed by eliminating the entire Reduce stage. The constraint, how‐
ever, is that the smaller dataset must fit entirely in RAM. The usher’s task is manageable when there is one type of libretto for each of a dozen languages but would be unman‐
ageable if there were one type of libretto for each of several thousand home towns.
How much is too much? Watch for excessive GC activity. (TODO: Pig probably has a warning too - find out what it is). Within the limits of available RAM, you can use fewer Mappers with more available RAM; the Hadoop tuning chapter (TODO: REF) shows you how. Don’t be too aggressive, though; datasets have a habit of growing over time and you would hate to spend Thanksgiving day reworking the jobs that process retail sales data because you realized they would not stand up to the Black Friday rush.
There is a general principle here: It is obvious there is a class of problems which only crop up past a certain threshold of data. What may not be obvious, until you’ve learned it the hard way, is that the external circumstances most likely to produce that flood of extra data are also the circumstances that leave you least able to address the problem.
Specialized Pig Join #2: The MERGE JOIN
A JOIN of two datasets, each in strict total order by the JOIN key, can also be done using Mapper-Only by simply doing a modified Merge sort. You must ensure not only that the files are in sort order but that the lexicographic order of the file names match the order in which its parts should be read. If you do so, Pig can proceed as follows: It does a first pass to sample each file from the right-hand dataset to learn the distribution of keys throughout the files. The second stage performs the actual JOIN. Each Mapper reads from two streams: its assigned split within the left-hand dataset and the appro‐
priate sections of the right-hand dataset. The Mapper’s job is then very simple; it grabs a group of records from the right-hand stream and a group of records from the left-hand stream and compares their keys. If they match, they are joined. If they do not match, it reads from the stream with the too-low key until it either produces the match‐
ing group or sails past it, in which case it similarly reads from the other stream.
As we’ve discussed a few times, reading data in straight streams like this lets the under‐
lying system supply data at the fastest possible rate. What’s more, the first pass indexing scheme means most tasks will be “Map-local” — run on a machine whose data node hosts a copy of that block. In all, you require a short Mapper-Only task to sample the right-hand dataset and the network throughput cost that is ‘O(N)’ in the size of the second dataset. The constraint is, of course, that this only works with total-ordered data on the same key. For a “Gold” dataset — one that you expect to use as source data for a
166 | Chapter 19: Advanced Pig
number of future jobs — we typically spend the time to do a last pass total sort of the dataset against the most likely JOIN key. It is a nice convenience for future users of the dataset, helps in sanity checking and improves the odds that you will be able to use the more efficient MERGE/JOIN.
Exercises
1. Quoting Pig docs: > “You will also see better performance if the data in the left table is partitioned evenly across part files (no significant skew and each part file contains at least one full block of data).”
Why is this?
2. Each of the following snippets goes against the Pig documentation’s recommenda‐
tions in one clear way.
• Rewrite it according to best practices
• compare the run time of your improved script against the bad version shown here.
things like this from http://pig.apache.org/docs/r0.9.2/perf.html
--a. (fails to use a map-side join)
b. (join large on small, when it should join small on large)
c. (many FOREACH`es instead of one expanded-form `FOREACH) d. (expensive operation before LIMIT)
For each use weather data on weather stations.
Optimizing Hadoop Dataflows | 167
CHAPTER 20
Hadoop Internals
For 16 chapters now, we’ve been using Hadoop and Storm+Trident from the outside.
The biggest key to writing efficient dataflows is to understand the interface and funda‐
mental patterns of use, not the particulars of how these framework executes the dataflow.
However, even the strongest abstractions pushed far enough can leak, and so at some point, it’s important to understand these internal details. These next few chapters con‐
centrate on equipping you to understand your jobs’ performance and practical tips for improving it; if you’re looking for more, by far, the best coverage of this material is found in (TODO: Add links Tom White’s Hadoop: The Definitive Guide and Eric Sammer’s Hadoop Operations).
Let’s first focus on the internals of Hadoop.
HDFS (NameNode and DataNode)
It’s time to learn how the HDFS stores your data; how the Map/Reduce framework launches and coordinates job attempts; and what happens within the framework as your Map/Reduce process executes.
The HDFS provides three remarkable guarantees: durability (your data is never lost or corrupted), availability (you can always access it) and efficiency (you can consume the data at high rate especially from Map/Reduce jobs and other clients). The center of action, as you might guess, is the NameNode. The NameNode is a permanently running daemon process that tracks the location of every block on the network of DataNodes, along with its name and other essential metadata. (If you’re familiar with the File Allo‐
cation Table (FAT) of a traditional file system, it’s like a more active version of that.
FOOTNOTE: [If you’re not familiar with what an FAT is, then it’s like the system you’re reading about but for a file system.])
(NOTE: check something … appending)
169
When a client wishes to create a file, it contacts the NameNode with the desired path and high-level metadata. The NameNode records that information in an internal table and identifies the DataNodes that will hold the data. The NameNode then replies with that set of DataNodes, identifying one of them as the initial point of contact. (When we say “client”, that’s anything accessing the NameNode, whether it’s a Map/Reduce job, one of the Hadoop filesystem commands or any other program.) The file is now exclusively available to the client for writing but will remain invisible to anybody else until the write has concluded (TODO: Is it when the first block completes or when the initial write completes?).
Within the client’s request, it may independently prescribe a replication factor, file per‐
missions, and block size _ (TODO: fill in)
The client now connects to the indicated DataNode and begins sending data. At the point you’ve written a full block’s worth of data, the DataNode transparently finalizes that block and begins another (TODO: check that it’s the DataNode that does this). As it finishes each block or at the end of the file, it independently prepares a checksum of that block, radioing it back to the NameNode and begins replicating its contents to the other DataNodes. (TODO: Is there an essential endoffile ritual?) This is all transparent to the client, who is able to send data as fast as it can cram it through the network pipe.
Once you’ve created a file, its blocks are immutable — as opposed to a traditional file system, there is no mechanism for modifying its internal contents. This is not a limi‐
tation; it’s a feature. Making the file system immutable not only radically simplifies its implementation, it makes the system more predictable operationally and simplifies cli‐
ent access. For example, you can have multiple jobs and clients access the same file knowing that a client in California hasn’t modified a block being read in Tokyo (or even worse, simultaneously modified by someone in Berlin). (TODO: When does append become a thing?)
The end of the file means the end of its data but not the end of the story. At all times, the DataNode periodically reads a subset of its blocks to find their checksums and sends a “heartbeat” back to the DataNode with the (hopefully) happy news. (TODO: fill in).
There are several reasons a NameNode will begin replicating a block. If a DataNode’s heartbeat reports an incorrect block checksum, the NameNode will remove that Data‐
Node from the list of replica holders for that block, triggering its replication from one of the remaining DataNodes from that block. If the NameNode has not received a heartbeat from a given DataNode within the configured timeout, it will begin replicating all of that DataNode’s blocks; if that DataNode comes back online, the NameNode calmly welcomes it back into the cluster, cancelling replication of the valid blocks that Data‐
Node holds. Furthermore, if the amount of data on the most populated and least popu‐
lated DataNodes becomes larger than a certain threshold or the replication factor for a file is increased, it will rebalance; you can optionally trigger one earlier using the hadoop balancer command.
170 | Chapter 20: Hadoop Internals