In this section, we describe all the peculiarities associated with the SpatialSSJP system that we have designed, which aims basically to proactively avoid congestion within the operator graph (that specifically includes a stream-static join operator), and thus seeking high resource utilization and averting frequent reconfigurations.
6.2.1 Usage Model and Baseline System
While the main purpose of parallelizing the operation of SPEs is to achieve low latency and high throughput, there are innumerable scenarios that require accessing static information (i.e., information that is held and spelled out in disks), thus compromising the performance of the SPEs [118] . There are innumerable ways for which stream-static join is attractive.
Always focusing on highly dynamic and scalable scenarios, where fast arriving spatially- tagged data points need to be enriched with master static data (a.k.a. data-at-rest) for deeper insights.
For example, NYC taxicab trips (represented with, most importantly, pickup and drop-off points) have been distributed on the form that includes only the GPS longitude/latitude coordinates without the names of the regions that those traces belong to. On the other hand, names of zones (i.e., boroughs or districts in city management terms) are distributed alone in a static table. That table is normally containing polygons on the form of points covering each polygon (a.k.a. bounding box). An example scenario is a query that asks to “generate an interactive heatmap showing trajectories of taxis in-motion to see the trend and decide on city planning”. As such, specifying the neighborhood for every tuple requires solving the Point in Polygon (PIP) problem (a.k.a. geofencing [125]), which basically requires stream- static join. As the amount of streaming data can be prohibitively large in the terms that our screens are not able to efficiently absorb such amounts in one map (e.g., while generating heatmaps), then it is favorable to take only portion of the arrival data and join it with the static table.
However, the tremendous deluge of geo-referenced continuously arriving data streams challenges the capacities of currents SPEs in achieving a (near) real-time interactive visualization (e.g., through heatmaps). A spatial-aware online sampling is then necessary for a proper data reduction, thus striking an acceptable balance between accuracy and latency
131
QoS targets. This reduction requires either clustering or aggregating, which basically, in a streaming setting, means joining tuples (that are spatially-tagged) with data-at-rest, thus the stream-static join. Traditional sampling designs (such as SRS, refer to section 5.2.2 for information) do not consider the spatial characteristics of the arriving tuples, thus rendering the visualization process erroneous.
Approximation is a valuable solution in highly dynamic environments. Baselines include a standard-compliant system employing backpressure on top of an emerging de facto standard SPE, specifically SpSS. The SpSS baseline is a resemblance to that of Spark Streaming backpressure mechanism. Spark Streaming backpressure works by applying a PID controller (known as PID rate estimator in Spark Streaming, which is based on the PID theory). We have retrofitted the PID controller (similar to PID rate estimator in Spark Streaming) so that it transparently incorporates with SpSS and operates under the SQL-like API. The baseline also comprises SRS-based sampler instead of SAOS. That is for the case of low-latency QoS goal.
As a baseline to compare our models for the case of accuracy QoS goals, we have transparently incorporated within the layers of SpSS a model-based controller that is based on SRS theory for calculating a new sampling fraction after every trigger and serving it interactively to an SRS-based sampler (as opposed to our SAOS sampler).
6.2.2 SpatialSSJP Overview
We have designed SpatialSSJP12 (short for Spatial-aware approximate Join Processor), an adaptive QoS- and Spatial-aware framework for processing spatial stream-static joins efficiently. Our system employs hybrid model-based controllers to reactively and proactively handle the information overflooding during burst spikes in spatial data streaming workloads.
12 The source code of SpatialSPE (including rateController) is available at: https://github.com/IsamAljawarneh/SpatialSSJP
132
We depend on hybridizing a novel rate controller with our spatial-aware sampling method (from our previous work SAOS [101] , refer to section 5.3.4 for details). A general overview of SpatialSSJP is schematized in the context diagram of figure 6.1.
An expert expresses the continuous spatial query (that implicitly requires stream-static join operation) and a query running budget. Budgets are a form of either latency or accuracy QoS guarantees. Data is arriving continuously from geo-referenced heterogeneous sources and is then fed interactively at regular time intervals (e.g., batch intervals, a.k.a. trigger intervals in SpSS terms). We have implanted our cogent method SAOS in a front stage so that it receives a signal from the rate controller of SpatialSSJP that informs the new appropriate sampling fraction. SAOS then selects a proportionate sample and emits it to the stream-static join operator, which thereafter forwards the intermediate result to the approximator. Approximator completes the approximate computation cycle and serves an incremental result with rigorous error-bounds to the user. At the same time the join operator sends statistics of the latest trigger to the rate controller, which exploits those statistics in calculating new sampling rate and serving it to SAOS to be applied in the next trigger. SpatialSSJP comprises three main components:
• Stream-static join operator. This component is responsible for stream-static join over the sampled subset. While stream-static join processor in SpSS is a simple and conceptually appealing approach, it suffers from computational limitations when
133
applied to spatially-tagged datasets. Our join operator is alternatively then a retrofitted version based on an operator offered by the spatial-aware library on top of spark (known as Spark’s Magellan13 [12, 13]). Spark’s Magellan is basically designed to handle static-static spatial joins using z-order curves. We have retrofitted Spark’s Magellan so that it works with the stream-static join, using the primitive features of Spark’s Magellan that offer basically a static-static join.
• Rate controller. The rate controller depends on QoS goals fed to the system by an expert user. Our controller is composed of two sub-components; latency-aware rate
controller and accuracy-aware rate controller. For latency-aware rate controller, we
have incorporated a hybrid (i.e., proactive and reactive) model-based loop feedback mechanism for appropriately pruning the arrived data loads to avoid system failure and achieve the latency QoS targets. The controller calculates a new appropriate rate (which is then mapped into an appropriate sampling fraction) and feeds it back to the SAOS method to force SAOS to limit the rate of data accepted for processing in the next trigger. Accuracy-aware rate controller employs a model-based statistical approach to compute a sampling fraction that is appropriate for meeting the accuracy requirement (expressed as ‘margin of error’ value, explained shortly in section 6.3.2). Notice that both controllers have one common reconfigurable parameter, which is the
sampling fraction. Since sampling module is a front-stage, then the overhead caused
by the rate controllers is tiny and negligible, which is highly desirable in the control theory.
• Approximator. This component is responsible for receiving the output of the join operator and then using the result in incrementalizing a required statistical target variable. For example, calculating the “average trip distance for all Uber trips (e.g., during a specific time-based window) originated in a specific district in Amman city (in Jordan)”.
134