The choices to tackle near-real-time use cases are pretty open-ended. Stream processing can provide low latency, with budding SQL capabilities, but it requires the queries to be predefined in order to work well. Proprietary warehouses have a lot of features (e.g., transactions, indexes, etc.) and can support ad hoc and predefined queries, but such proprietary warehouses are typically limited in scale and are expensive. Batch processing can tackle massive scale and provides mature SQL support via Spark SQL/Hive, but the pro‐
cessing styles typically involve larger latency. With such fragmenta‐
tion, users often end up making their choices based on available hardware and operational support within their organizations. We will circle back to these challenges at the conclusion of this post.
For now, I’d like to outline some technical benefits to tackling near-real-time use cases via “mini” batch jobs run every x minutes, using Spark/MR as opposed to running stream-processing jobs. Analo‐
gous to “micro” batches in Spark Streaming (operating at second-by-second granularity), “mini” batches operate at minute-by-minute granularity. Throughout the post, I use the term incremental process‐
ing collectively to refer to this style of processing.
Increased efficiency
Incrementally processing new data in “mini” batches could be a much more efficient use of resources for the organization. Let’s take a concrete example, where we have a stream of Kafka events coming in at 10K/sec and we want to count the number of messages in the last 15 minutes across some dimensions. Most stream-processing pipelines use an external result store (e.g., Cassandra, ElasticSearch) to keep aggregating the count, and keep the YARN/Mesos contain‐
ers running the whole time. This makes sense in the less-than-five-minute latency windows such pipelines operate on. In practice, typical YARN container start-up costs tend to be around a minute.
In addition, to scale the writes to the result stores, we often end up buffering and batching updates, and this protocol needs the contain‐
ers to be long-running.
Figure 3-7. Comparison of processing via stream processing engines versus incremental “mini” batch jobs. Credit: Vinoth Chandar.
However, in the near-real-time context, these decisions may not be the best ones. To achieve the same effect, you can use short-lived containers and improve the overall resource utilization. In Figure 3-7, the stream processor performs six million updates over 15 minutes to the result store. But in the incremental processing model, we perform in-memory merge once and only one update to the result store, while using the containers for only five minutes.
The incremental processing model is three times more CPU-efficient, and several magnitudes more efficient on updating of the result store. Basically, instead of waiting for work and eating up CPU and memory, the processing wakes up often enough to finish up pending work, grabbing resources on demand.
Built on top of existing SQL engines
Over time, a slew of SQL engines have evolved in the Hadoop/big data space (e.g., Hive, Presto, SparkSQL) that provide better expres‐
sibility for complex questions against large volumes of data. These systems have been deployed at massive scale and have hardened over time in terms of query planning, execution, and so forth. On the other hand, SQL on stream processing is still in early stages. By performing incremental processing using existing, much more mature SQL engines in the Hadoop ecosystem, we can leverage the solid foundations that have gone into building them.
Uber’s Case for Incremental Processing on Hadoop | 59
For example, joins are very tricky in stream processing, in terms of aligning the streams across windows. In the incremental processing model, the problem naturally becomes simpler due to relatively longer windows, allowing more room for the streams to align across a processing window. On the other hand, if correctness is more important, SQL provides an easier way to expand the join window selectively and reprocess.
Another important advancement in such SQL engines is the support for columnar file formats like ORC/Parquet, which have significant advantages for analytical workloads. For example, joining two Kafka topics with Avro records would be much more expensive than join‐
ing two Hive/Spark tables backed by ORC/Parquet file formats. This is because with Avro records, you would end up deserializing the entire record, whereas columnar file formats only read the columns in the record that are needed by the query. For example, if we are simply projecting out 10 fields out of a total 1,000 in a Kafka Avro encoded event, we still end up paying the CPU and IO cost for all fields. Columnar file formats can typically be smart about pushing the projection down to the storage layer (Figure 3-8).
Figure 3-8. Comparison of CPU/IO cost of projecting 10 fields out of 1,000 total, as Kafka events versus columnar files on HDFS. Credit:
Vinoth Chandar.
Fewer moving parts
The famed Lambda architecture that is broadly implemented today has two components: speed and batch layers, usually managed by two separate implementations (from code to infrastructure). For
example, Storm is a popular choice for the speed layer, and Map‐
Reduce could serve as the batch layer. In practice, people often rely on the speed layer to provide fresher (and potentially inaccurate) results, whereas the batch layer corrects the results of the speed layer at a later time, once the data is deemed complete. With incremental processing, we have an opportunity to implement the Lambda archi‐
tecture in a unified way at the code level as well as the infrastructure level.
The idea illustrated in Figure 3-9 is fairly simple. You can use SQL, as discussed, or the same batch-processing framework as Spark to implement your processing logic uniformly. The resulting table gets incrementally built by way of executing the SQL on the “new data,”
just like stream processing, to produce a “fast” view of the results.
The same SQL can be run periodically on all of the data to correct any inaccuracies (remember, joins are tricky!) to produce a more
“complete” view of the results. In both cases, we will be using the same Hadoop infrastructure for executing computations, which can bring down overall operational cost and complexity.
Figure 3-9. Computation of a result table, backed by a fast view via incremental processing and a more complete view via traditional batch processing. Credit: Vinoth Chandar.