• No results found

e prior section described how to achieve reliable access to mutable data in a distributed hash table. In this section, we explore how to perform reliable processing over this data. In combination, this will allow us to achieve correct and complete answers to queries, even under network and in the presence

of node failures. is is contrast to prior peer-to-peer query engines, such as PIER (Huebsch et al., 2005) and Seaweed (Narayanan et al., 2008), which were best-effort. eir focus was on scaling to large numbers of nodes, and do to so it was necessary to sacri ce strict consistency. In our work on query processing for collaborative data sharing, we can exploit the fact that we are limited to a smaller scale to ensure reliable processing. In the case of O , this means reliable execution of the queries that implement the update store and perform update exchange. For other distributed appli- cations, this means any “single-block” SQL query, i.e. any query consisting of joins, scalar function evaluation, predicate evaluation, and possibly a single level of aggregation.

As in this prior work on peer-to-peer query engines, we suffer from the general problems in wide- area distributed systems of higher lag times, constrained bandwidth, and generally bursty commu- nication. Like those systems, we adopt a push-driven style of distributed query processing. e operators at each node either receive data directly from a local scan of persistent storage, or receive tuples as they arrive from other nodes in system. is ensures that as much processing is done as pos- sible given the available data, and to enables exible operator scheduling in the event of delays. Also like PIER, most operators are partitioned-parallel, using hash ranges as the partitioning function; a small fraction of each operator executes on each node. is gives the system high scalability, assum- ing a good hash function. Partitioned parallelism happens naturally for scans, given the data layout described in the previous section. For other operators, it necessitates repartitioning to ensure that tuples than need to end up at the same node do so. For example, in the case of a distributed join, the tuples must be rehashed on (possibly a subset of) the join a ributes, to ensure that any tuples that should join are able to; similarly, for an aggregation, it must be a subset of the grouping a ributes. All data is ultimately collected at thequery initiatornode, which may do nal processing, such as the last stage of aggregation, or a nal sort.

Hashing-Based Distributed Query Execution

A query plan consists of a number of operators. Most are implementations of standard relational op- erators, though a few are specialized to our storage layer or hashing-based partitioned-parallel data- bases. Our system implements the following operators:

Covering index scan retrieves data directly from the index nodes, if only key a ributes are required, bypassing the data storage nodes.

Distributed scan executes at both index nodes and data storage nodes, similar to in Algorithm 4.1. e index nodes lter index pages to eliminate tuples that don’t pass a predicate over the rela- tion’s key a ributes, and then send the passing tuple IDs to the data storage nodes. e tuples from each index page are stored nearby on disk, and are retrieved using the tuple IDs in a single pass through the hash ID range for that page. Instead of being sent back to the query initiator, the resulting tuples are pushed through the query plan.

Select implements selection on intermediate results.

Project is the standard projection operator.

Join is apipelined hash join(Raschid and Su, 1986).

Aggregate is a a blocking, hash-based grouping operator, which supports re-aggregation of partially aggregated intermediate results.

Ship sends the tuples it receives to the query initiator.

Rehash partitions its input among the system nodes by hashing on some subset of the tuples’ at- tributes.

Compute-function performs scalar function evaluation, such as arithmetic or string concatenation.

Spool buffers its input tuples as the query initiator, so they can be retrieved once the query is n- ished.

e query is “driven” by some combination of the leaf-level scan operators described in the table — each is novel to our system, as it exploits the speci c versioned indexing scheme used in our storage system. Such operators typically are run concurrently across all of the nodes in the system — each operating on a data partition stored at those nodes.

From there, the retrieved data may be passed locally through a series of pipelined operators, such as joins or function evaluation. Recall that most operators execute in a partitioned-parallel fashion, meaning that a logical operator in the query plan is represented by a collection of physical operators, one at each node, each executing over a portion of the data in the system. Processing continues using a given partitioning of the data (i.e. using a particular subset of tuple a ributes as input to the hash function) until the data needs to be repartitioned to enable a different computation. is happens

using either ashipoperator or arehashoperator. e ship operator sends the data it receives to the query initiator; this is needed to get the results to the spool operator that collects the results of a query. e rehash operator repartitions its input tuples by their hash IDs in the networking substrate and sends them to other nodes in the system. Rehashing is commonly used to enable joins or aggregation, when a relation needs to be re-partitioned on a join or grouping key. e rehash operator routes tuples to a destination node by rst hashing the key using the SHA-1 hash function, then consulting the snapshot of thequery routing tabledescribed previously.

Each operator sends anend-of-stream noti cationto its parent operator when it nishes executing. Scans can easily detect when they are done, and most other operators simply propagate an end-of- stream noti cation downstream a er they receive it (perhaps rst performing some nal computa- tion to produce results, as in a aggregate operator). However, detecting end-of-stream with the re- hash operator is slightly tricky: it cannot complete until it has acknowledgment from all downstream nodes that they have received all of the data it sent. Once the spool operator (the root of all query plans that holds the query results) has received an end-of-stream noti cation, all other operators must also have nished, and so the query is complete.

We now show two examples of query plans for our system. Each is annotated with the intermedi- ate state that might be generated during an execution of that plan. Example 3 shows how a distributed join takes place, and Example 4 shows how a distributed aggregation takes place.

Example 3. Continuing our example of versioned storage given in Example 1, let us consider the following query, which performs a self-join of the relationR(x,y):

SELECT x, z FROM R r1, R r2 WHERE r1.y = r2.x

An O query execution plan, overlaid with data from the example instance as it would ow through the plan, is shown in Figure 4.7; we assume that noden1posed the query and is therefore

acting as the query originator. Each node begins by scanning two copies of R, one forr1and one forr2. e copy forr1is then repartitioned onr1.y. It then joins withr2, which is already par- titioned on withr2.x, making this a valid distributed join. e resulting tuples are then sent to the query originator, where they are spooled. As mentioned previously, end-of-stream noti cations will

r1

r2

r1

r2

r1

r2

r1.y

r1.y

r1.y

r1.y=r2.x

r1.y=r2.x

r1.y=r2.x

propagate up through the query plan from the leaves; when they reach the spool operator, the query results are complete.

Example 4. Now let us consider an aggregation query. Since the relation for the previous exam- ple contained no numeric data, let us instead consider the relationStatePop(state,region, population). e primary key ofStatePopisstate, which is also the hash key used for par- titioning. Suppose that noden1wished to execute the following query:

SELECT region, SUM(population) FROM StatePop

GROUP BY region

As before, we present a distributed execution plan, overlaid with data from a possible instance; this is shown in Figure 4.8. is plan performs a distributed scan of theStatePoprelation, which pro- duces data partitioned by state ID. e data is then repartitioned by hashing on theregiona ribute. Note that since there are only two regions present in the data, at least one node will not receive any data; in that case, onlyn1andn3receive data. One the aggregate operators receive end-of-stream

noti cations from all of the scan operators, they know that they will receive no more input and can produce their output. ese tuples are then shipped ton1, the query originator, which collects the

results to the query.

Of course, the queries presented in Examples 3 and 4 are relatively simple. e each contain only one repartitioning of the data through rehashing. More involved queries, containing multiple joins, possibly combined with aggregation, will make considerably more use of the rehash operation to distribute intermediate state so it can be used in a series of operators. In the examples, there were only a few possible ways of translating the query into a query plan, and the ones chosen are intuitively among the best. With more complex queries, where operations may be reordered and there may be multiple partitioning schemes that make sense, the query optimizer plays a critical role in choosing a good plan, based on data statistics and data about the participants’ capabilities. We will discuss in more detail how it does this in Section 4.4.

StatePop

StatePop

StatePop

region

region

region

region

region

region

Architecture for Performance and Failure Detection

Several aspects of our query processing architecture are enabled by our custom hash-based substrate. Prior work has used existing “off the shelf ” DHTs. For example, PIER uses Bamboo (Rhea et al., 2004), while Seaweed uses Pastry. However, we additionally develop several techniques at the query execution level that are vital for performance and correctness.

First, forefficiency, the query processor bene ts from the fact our substrate uses TCP to manage connections between machines. is allows for automatic ow control in the event of a congested network. Of course, we could have alternatively used UDP, and implemented ow control via peri- odic handshaking. However, with the use of non-blocking I/O routines (to avoid the large numbers of threads inherent in using blocking I/O over many channels), we saw no evidence that maintaining even hundreds of open TCP connections imposed any signi cant overhead.

Second, for rapidfailure detection, we use frequent UDP-based pings. While existing DHTs also use a so-called “heartbeat” to detect node failure, they typically do not detect failures quickly enough for our purposes. We need these noti cations of node failure to percolate quickly up to the query execution layer, so it can compensate for them in some way. In existing DHTs, failover is o en totally transparent to the application execution above the networking layer.

ird, forfailure recovery, the query processor is given direct information about the state of the routing tables. Asnapshotof the routing tables is taken by the query initiator as it invokes the query; recall that, as in O all nodes have global knowledge, this is a complete snapshot of the partitioning of the DHT key space. is snapshot is disseminated along with the query plan to all nodes, in order to ensure absolute consistency of the routing tables. If one or more nodes fail in the middle of execution, the difference in the routing tables is reported back to the query initiator, such that it can incrementally recomputeonlythe lost portion of the query state. We describe this feature in detail in Section 4.4.

Fourth, for performance, the query processorbatchestuples into blocks by destination, com- pressing them (using lightweight Zip-based compression) and marshalling them in a format that ex- ploits their commonalities. is makes query processing much more efficient than if it were built over a DHT with many smaller messages, and reduces CPU and bandwidth use.

Finally, forcorrectness, each tuple is annotated with information about which source nodes sup- plied the data from which the tuple was derived. is is used to prevent duplicate answers when

recovering from a failed node.

Handling Node Membership Changes

e major challenge of reliable query processing is how to handle changes to the node set. Recall from Section 4.2 that the query initiator takes asnapshotof the routing table (which, for us, is the complete key space partitioning) in the system during query initiation. It disseminates this snapshot along with the query plan so all machines will use a consistent assignment between hash values and nodes. e query initiator is itself deciding which participants are to participate in the query execution; this is similar to the work performed by a distributed group membership protocol, such as in Reiter (1996) or Birman and Joseph (1987), for example.

Node arrival

Suppose a node joins the system in the midst of execution. In a DHT, such a change immediately affects the routing of the system — and begins forwarding messages to the new node, which may not have participated in any prior computation. In principle, one might develop special protocols by which the new node would be “brought up to speed” by its neighbors. However, this becomes quite complex when multiple nodes join at different times. Instead, we let the query complete on its initial set of nodes, and only make use of the new node when a fresh query (with a new routing table snapshot) is invoked. is approach provides simplicity and avoids expensive synchronization.

Node departure or failure

Our use of TCP connections between nodes is o en adequate to detect network partition or a node failure; we assume complete system failure, or at least a crash of our so ware running on the system, rather than incorrect operation. If a sending node (and query operator) drops its connection before sending an end-of-stream message, or a receiving node drops its connection before query comple- tion, then this represents a failure. As mentioned previously, the system also performs periodic ping operations in the background to detect a machine to which the TCP connection (at least appears to) remain open, but which on which our distributed query processor is no longer successfully running, perhaps due to a so ware crash. In either case, ignoring the failure at the node or connectivity failure

between the node and the rest of the system will lead to missing or possibly incorrect answers. is leads us to the problem of recomputation, described in the next subsection.

Recovery from Failure

Our system supports two forms of recovery from failure. One option, upon detecting a node failure, is to terminate and andrestartany in-process queries. Assuming low failure rates, we will ultimately get the answers this way; the indexing structure described in Section 4.3 will ensure that the scans in the restarted query are correct, and therefore, unless another node fails, the query will complete with accurate results. is approach is straightforward to implement in O , since we can detect which queries are still in- ight — in contrast to systems like PIER.

When failure during query execution is more likely, as in longer-lived queries running on large numbers of nodes, be er performance (i.e. correct query answers are returned sooner) might be obtained by performingincrementalrecomputation, where we only repeat the computations affected by the failed node, using a different node that has data replicated from the failed one. e key chal- lenge here is that simply recomputing will likely result in the creation of some number of duplicate tuples — which in turn will either lead to duplicate answers or (in many cases) to incorrect aggregate results.

A er a failure, any derived state in the system that originated from the failed nodes is likely to be inconsistent, due to propagation and computation delays. We can re-invoke the computation from the failed nodes and then remove duplicate answers, or instead we can remove all state derived from the failed nodes’ data before performing the recomputation. We adopt the la er approach due to the difficulty of detecting which tuples are duplicates. As was hinted at previously, this means we must track which intermediate and nal results are derived from data processed at one of the failed nodes. We tag each tuple in the system with the set of nodes that have processed it (or any tuple used to create it), and maintain these sets of nodes as the tuples propagate their way through the operator graph. As we validate experimentally, this can be done with minimal overhead. Granted, even some overhead will probably increase the average time to query completion, if failures are rare; however, in many instances we feel that having less variable query execution times, and in particular reducing the maximum time it may take to execute a query, is worth a slight increase in the average execution time.

We divide incremental recomputation into four stages.

1. Determine change in assignment of ranges to nodes.When a node or set of nodes fail, other nodes “inherit” a portion of the hash key space from failed nodes. e query initiator computes a new routing table from the original one, assigning the ranges owned by the failed nodes to remaining ones. If the failed nodes’ data is available on more than one replica, the initiator will evenly divide among them the task of recomputing the missing answers.

2. Drop all intermediate results dependent on data from the failed nodes. To prevent duplicate answers, we scan the internal state of all operators and discard any tuples that are tagged as