Recall from our earlier discussion that O supports a batched publish/import cycle, where each participant stores its own updates in the CDSS, disjoint from all others. ere is no need for traditional concurrency control mechanisms, as con icts among concurrent updates are resolved during the import stage (via reconciliation) by the participant. We therefore do not focus on such techniques. While this work is motivated by the needs of O , the system described here supports general reliable storage of relational data, provided concurrency control is not needed.
However, there is indeed a notion of global consistency. We assign a logical timestamp (epoch) that advances a er each batch of updates is published by a peer. When a participant performs an im- port or poses a distributed query, it is with respect to the data available at the speci c epoch in which the import starts. e participant should receive the effects ofallstate published up to that epoch, and no state published therea er (until its next import). e current epoch does require some additional synchronization. As mentioned in Section 3.4, a distributed counter, maintained through standard techniques from distributed systems such as Paxos (Lamport, 1998) or PBFT (Castro and Liskov, 2002), is necessary for complete consistency. However, such “heavyweight” distributed consensus protocols are only needed once during each publish/import cycle. One an epoch is associated with this operation, all writes and reads can be relative to this epoch, using the techniques we describe here.
Of course, in order to support queries over versioned data, we must develop a storage and access layer capable of managing such data. ere are several key challenges here:
• Between database versions, we want to efficiently reuse storage for data values that have not changed.
• We must track which tuples belong to the desired version of a database. Such metadata should be co-located with the data in a way that minimizes the need for communication during query operation.
• Each tuple must be uniquely identi able using atuple identi erthat includes its version. Yet, for efficiency of computation, we must partition data along a set of key a ributes (as with a clustered index). It must be possible to convert from the tuple ID to the tuple key, so that a tuple can be retrieved by its ID; therefore a tuple’s hash key must be derived from (possibly a subset of) the a ributes in its ID.
We maintain all versions of the database in a log-like structure across the participants: instead of replacing a tuple, we simply update our records to include the new version rather than the old version, which remains in storage. Disk space is rarely a constraint today, and the bene ts of full versioning, such as support for historical queries, typically outweigh the drawbacks. We distribute this log partitioned along (possibly a subset of) a tuple’s key a ributes. Tracking temporal infor- mation is an integral part oftemporal databases, where it was surveyed in Özsoyoglu and Snodgrass (1995). Associating versioned (or timestamped) information with a key is a standard approach in both peer-to-peer systems and databases. Examples include Haeberlen et al. (2005) in the former and Stonebraker (1987) and Buneman et al. (2002) in the la er.
Each node, therefore, may contain many versions of each tuple. If the set of nodes is in ux, nodes may come and go between when a tuple is inserted or updated and when it is used in a query; therefore, a node may not have the correct version of a particular tuple. We assume that background replication is sufficient to ensure that each tuple exists somewhere in the system, but that it may not exist where the standard content-addressable networking scheme can nd it. We therefore need to be able to determine, for a particular epoch, the collection of tuple IDs that are present in a relation. Once the system has this information, scanning a relation becomes relatively simple. e tuple IDs are partitioned using the DHT routing table, and sent to the nodes that own them. ose nodes then
probe against persistent storage to nd the full tuples associated with each ID. If the node has data for the tuple ID, it scans that tuple. Otherwise, it must have stale data for that key, or be entirely missing it, due to replication lag and network churn. e node that should own the tuple therefore searches outwards from the current node, looking rst at nodes nearby in the key space, and then at those farther away, until it nds a copy of the data for that ID. If the tuple is eventually found in the system, it is copied to the node that should own it and scanned there to preserve data partitioning; otherwise we have con rmation that data has been lost due to insufficient replication. Such an approach will never suffer from silent failure.
A key property we adopt from CFS (Dabek et al., 2001) is that, once there is enough information to begin a request, it is always clear what datashouldbe present in the distributed storage layer. In our case, this means that the current epoch has been determined by one of the distributed protocols discussed above; in CFS, it means that the root block has been retrieved, which determines which versions of all les will be used. In either case, stale data will never be retrieved. If expected data is not found at the node that should own it, this is likely due to network churn. e request can either be retried a er background replication has moved state around, or the system can proactively try to retrieve the missing state from other nearby nodes. Our query-oriented approach, described above, a empts to proactively retrieve the necessary data.
e key feature of our approach to reliable storage are the two complementary storage layers. Primary storagecontains a log of all versioned tuples, tagged with version numbers.Secondary storage contains what we term theindex, which maps from versions of a relation to versions of particular tuples that are present in that relation. Implementing primary storage directly over raw distributed hash table is an obvious choice. As we require that the hash a ributes be a subset of the primary key, it is possible to determine which node should store the full version of a tuple given its ID. In this thesis, we do not consider alternate ways of storing the versioned tuple log. ere are, however, many possible ways of storing the index. At the limit, one could store a list of tuple IDs as a single object in the DHT. is could be very expensive to update, however, since any update to a relation would require the (slightly) modi ed list of tuple IDs to be rewri en in its entirety.
We have experimented with severalhierarchicalimplementations of the index, which introduce a level of indirection to allow unmodi ed portions of the index to be used by a relation at multiple epochs. We were initially inspired by lesystem i-nodes, the CFS lesystem (Dabek et al., 2001), and log-structured lesystems, where for append operations and small changes, the page-level data in a
large le mostly remains unchanged. Such schemes all make use of a versioned system for tracking the contents of a le, which greatly resembles our index. With this prior work in mind, we decided to divide a relation into multipleindex pages, each of which is given a unique ID. Amaster recordfor a relation records which index pages are used by that relation for a particular epoch. When a relation is updated, only those index pages that are changed need to be rewri en, and a new master record wri en out that refers to a mixture of old and new pages. e new versions of updated tuples need also be wri en out to primary storage, of course.
is leaves the question of how to organize tuple IDs onto index pages. If they are organized in a particular way, we may be able to exploit that when updating or querying a relation. One option, of course, is not to organize the index pages in any particular way. is would make deleting or updat- ing a relation expensive, since all index pages might have to be searched to nd the page that would need to be updated when updating a tuple. It would, however, allow the system to cluster frequently updated keys on the same index page, maximizing the reuse of index pages by ensuring that as many index pages as possible remain unchanged from epoch to epoch. A second option is to sort the tu- ple IDs by their primary key in the relation (i.e. by their logical keys), and partition them into pages based on that ordering, so each page holds a continuous range for the primary key. en it is easy to nd which page contains a key when updating or deleting a tuple, eliminating the potentially lengthy search described above. is approach will also potentially allow range predicates over the primary key to be executed at the level of the index, eliminating tuples as early as possible in a scan. A third option is to sort the tuple IDs by their ID in the DHT key space, and partition them into pages so that each page holds a contiguous range in the key space. is has the same bene ts for updating as clustering the tuple IDs by their primary key, but does not allow range predicates over the primary keys. It does, however, allow the pages to be stored near to the tuples they reference in the DHT key space. If the number of pages is large enough and the number of nodes is small enough, then with high probability a node will own the index pages for the tuples it owns. Preliminary experiments showed that the bene ts of the last approach for scan performance are very high, and that perfor- mance was much be er than for the second option. We therefore cluster index pages by DHT ID in our implementation, and all experimental results shown in this chapter use this approach.
Figure 4.3 shows the main data structures used to ensure consistency. All data structures are repli- cated using the underlying network substrate, so failure of any node will cause all of its functionality to be assumed transparently by one or more neighboring nodes. All nodes in the system perform a
!"#$%&' ($)"*+,-%.$/,01 2+3*%,4%/"#$3%&'3 &-5$63$%/"#$%&' ($)"*+,-%.$/,01 !"#$%&' 7+-%8%9":%*;/)$%&'%1"31$3 2+3*%,4%*;/)$%&'3%+-%/"#$ !"#$%&' &-5$63$%!"#$%&' <;/)$%="31%>$? <;/)$%&'%ĺ5");$ @A%<6$$B% <;/)$%&'%1"31%ĺ/"#$%+-C$:
Figure 4.3: Storage scheme to ensure version consistency and efficient retrieval. Rounded rectangles indicate the key used to contact each node (whose state is indicated with squared rectangles).
number of tasks based on the data stored at them. In its capacity as adata storage node, each partici- pant holds a portion of the versioned tuple log by storing a mapping from tuple IDs (recall that this is the primary key plus a version identi er) to full tuples; this is the primary storage mentioned above. e hierarchical index is shared among three types of nodes. erelation coordinatorfor a particular relation at a particular epoch holds the master record for that relation and epoch, a list of the page IDs used in that version of the index. It also holds theinverse page ID, which identi es a B+ tree that allows the system to quickly determine which page a particular primary key would fall onto. is is split apart from the master record because small changes to a relation over time may not necessitate changing the boundaries in the DHT key space between pages, and therefore the lookup tree can be reused; it references indices in the list of page IDs instead of actual page IDs. Anindex nodeholds index pages, keyed by page ID. Each page holds the tuple IDs for a contiguous range of the DHT key space. ese tuple IDs can used to probe the data storage nodes for the full version of each tuple in the relation. Recall that we place the index node entry at the same node as the tuples it references, by storing the index page at the middle of the range of tuple keys it encompasses. is is why the network substrate, as discussed in Section 4.2, assigns a large, contiguous region in the key space to each node; it means that the vast majority of tuple keys are never sent over the network. If each node is responsible for many smaller ranges, this is no longer the case, and performance suffers. us for most tuples the same participant is both the index node and the data storage node.
ciently supporting relatively small changes to tables. Modifying a tuple in a relation requires us to look up the page holding the old version of the tuple using an inverse node, modify that page to in- clude the ID of the new tuple, and write out that modi ed page as the new index page for the region of the table surrounding the updated tuple. e entire contents of the new tuple must also be writ- ten out to the network. e system then creates a new version record linking to the updated index page, and all of the unaffected pages from the previous version. No modi cations happen at the data storage nodes except to add the new version of the modi ed tuple; the previously used versions of all other tuples continue to be referenced by the reused or updated index pages.
Example 1. Suppose we have three participants, each storing a partition of a simple, one-table data- base,R(x, y), wherexis the key andyis a non-key a ribute. Noden1is responsible for the range
[0x00…,0x55…],n2for [0x55…,0xAA…], andn3[0xAA…,0x00…]. In this example, the tuple
ID is the key a ribute of a tuple and the epoch in which it was last modi ed, e.g.,⟨f, 1⟩forR(f, a). e index page ID consists of the relation name, the epoch in which it was last modi ed, and a unique identi er for that relation and epoch, such as⟨R, 1, 1⟩for the second index page created for relation Rduring epoch 1. It also includes the hash ID where the index page is stored.
In the rst epoch (epoch 0), a participant inserts the tuplesR(a, b)andR(f, z); system state a er this operation is shown in Figure 4.4a. In epoch 1, someone insertsR(b, c),R(e, e), andR(c, f)while also changingR(f, z)toR(f, a); this is shown in Figure 4.4b. In epoch 2, someone insertsR(d, d). e nal state of the system is shown in Figure 4.4c. All of the structures stored in the system at epoch 2, included state from previous epochs, is shown in Figure 4.5.
Pseudocode for performing a lookup appears as Algorithm 4.1. Retrieval starts at the relation coordinator for the requested epoch, from which a list of index nodes can be obtained. It sends a scan request to each index node, along with any predicates that can be evaluated over the key a ributes present in the tuple IDs (the so-called sargable predicates). e index nodes apply all such predicates to the list of tuples for each index page, and requests that the matching tuples be retrieved. is operation is highly parallelizable; the only operation done at a single node is the sending of the scan requests, which is very fast.
Example 2. Figure 4.6 shows how the lookup procedure works for our example instance. First, the lookup request fromn2for relationRat epoch 2 is hashed to nd the node (in this case,n1) that is
(a) State at epoch 0
(b) State at epoch 1
(c) State at epoch 2
Figure 4.4: State of the versioned relationRat each of the epochs for Example 1. Data is partitioned across nodes by the key (the rst a ribute), which is a subset of the Tuple ID. Redundant copies of replicated data are not shown, nor are versions of tuples that only appear at previous epochs. e le brackets indicate which nodes a tuple is stored on, while the right brackets indicate which index page a tuple’s ID is on.
Figure 4.5: All of the state stored in the system for versioned relationRa er all of the operations described in Example 1 have taken place.
1
2
3
of index pages that contain the tuple IDs for that version of the relation. e request to scan those pages is sent to the index nodes that contain the contents of the pages, in this casen1andn3. ose
index nodes then send requests on to the data storage nodes that contain the full tuples (stored as a mapping from Tuple ID to full tuple) to scan the desired tuples given their IDs. e data storage nodes then retrieve the desired tuples and return them to the requester (not shown). Note that only two of the six Tuple IDs were actually sent over the network, due to the colocation of index pages and tuple data.
As mentioned before, this approach avoids any possibility of seeing stale data due to replication lag. Suppose that, for some reason,n1had not yet received a copy of the record forRat epoch 2. It
would search other nodes nearby in the system until it found a copy before proceeding. Similarly, if n1had not yet received the data⟨f, 1⟩, it would never simply return the data for⟨f, 0⟩; it knows that
data is stale because it does not appear in the index page. It would instead try to retrieve the full tuple for⟨f, 1⟩from the network before proceeding.
Algorithm 4.1D R (R, e, f(¯k))
Input:R(relation),e(epoch),f(k¯)( lter function over key¯k)
Output: Matching tuplest∈Rsatisfyingf(¯k). 1: relCoord←h(⟨R, e⟩)
2: Contact Relation Coordinator atrelCoord, retrievepageIDList 3: forpage∈pageIDListdo
4: Ask Index node ath(⟨e,(page.max+page.min)/2⟩)to scan pagepage 5: Index node retrievespagecontentsTuples
6: Index node ltersTupleswithf(¯k)→fTuples 7: fort∈fTuplesdo
8: Index node requests that Data Storage node ath(t.key)scan the tuplet
9: Data Storage node sendstto node that requested scan, bypassing the Index node and Re-