ESP systems are used for a wide range of applications such as real-time bidding (RTB), fraud detection and click stream analysis. Some of those applications are classified as critical systems as a system failure and its temporary unavailability may result in high financial losses. Hence, fault tolerance is essential for those applications. However, applications offering services exclusively designed for leisure can also benefit from fault tolerance as an improved service availability will in turn increase customer satisfaction and financial revenue if for example ads are delivered with the service or optional paid features.
Since the amount of data being processed by an ESP application exceeds the processing power of a single machine, several tens to hundreds of machines are generally needed for carrying out a specific data processing task. However, a high number of machines composing a system increases in turn the probability for a fault that can occur at least at one of those machines compromising the whole task if not properly handled. Hence, fault tolerance is equally important for all applications running on top of distributed systems such as ESP. In the following section, we will review first the failure model and failure detection mechanisms and then provide an overview of commonly used techniques to provide fault tolerance in ESP
systems.
2.2.1 Failure Model and Detection
Prior to the design and implementation of a fault tolerance mechanism in a distributed system, it is important to agree first on the set of failures a system must be able to tolerate and to recover from. In a distributed system, both processes and communication channels may fail, hence, there are three categories of failures that can be found in literature [CD88, HT94]:
Omission failures, Arbitrary failures and Timing failures.
Omission failures refer to the case when a process has been stopped or crashed, i.e., does not execute any further or messages are lost which is generally caused by a lack of buffer space in the input buffer of the receiver or broken channels. Other processes may not be able to detect these states correctly as it would require some form of heart beat and timing mechanism where a reply is expected to arrive within a certain interval of time. However, those mechanisms are not applicable to asynchronous systems such as ESP where message delays may be incorrectly interpreted as process halts or crashes.
Contrary to omission failures, Arbitrary failures or Byzantine failures cover the worst case of failures which can be any type of error that may occur. For example, a process might emit messages with faulty content or might even set wrong values in variables in order to expose an arbitrary behavior.
The last class of failures considers Timing failures where processes do not execute or respond to messages within a certain interval of time. Of course this class of failures is only applicable to synchronous systems and therefore will not be considered further here.
The cost of overhead and introduced complexity for handling those various types of failures greatly varies from class to class: For example, handling crash (i.e., omission) failures requires at least f + 1 nodes in order to continue a system to operate while an agreement on a single value (i.e., reaching consensus) requires at least 3 · f + 1 nodes in order to tolerate f failures in the arbitrary failure model. Hence, it is important to choose a failure model that matches the environment and the requirements for the system at hand most.
Fortunately, there exist a set of techniques to transform some types of failures into another one covered by a different failure model with the advantage of broadening the types of failures being handled by one failure class while saving resources at the same time. For example, value failures caused by hardware errors such as bit flips can be detected through encoded processing [WF07] and transformed into process crashes requiring less resources in order to be properly handled rather than in the arbitrary failure model. Moreover, local watchdogs (in hardware or software) can be used in order to crash nodes behaving arbitrarily slow. Those few examples show that the crash-failure model is sufficient for the majority of application scenarios in ESP systems and will be considered throughout the remainder of this work.
Once the failure model has been defined and narrowed down to process crashes, a failure detector is needed in order to trigger recovery and compensation actions in a system. There exist various approaches for the construction of failure detectors such as proposed by [Fet03] in literature, however, in practice simpler solutions such as heart beating are generally used. Examples for state of the art big data systems using simple heart beating (incorporated into Zookeeper [HKJR10]) for failure detection range from Apache Hadoop [Had15] as a repre- sentative for MapReduce [DG08] to Apache S4 [NRNK10], Storm [Sto15] and Samza [Sam15] representing open-source ESP systems.
2.2.2 Recovery Guarantees
ESP systems process continuous streams of events where operators are used to filter, extract or accumulate events in order to transform low quality information into high quality information, or to trigger certain actions. During the course of data processing, operators may accumulate state which must be made resilient in order to survive system crashes.
One way of making an operator resilient is periodic checkpointing of its accumulated state. However, in order to completely mask a failure, “in-flight”-events must be made resilient as well, i.e., may not be lost and must be replay-able in a predefined order in order to generate the exact same output. In-flight events define the set of events which were processed after the last checkpoint taken and prior to the occurrence of a crash. ESP systems that completely mask failures, i.e., guarantee (i ) neither a loss of events nor duplicated processing, (i i ) the recovery of operator state and (i i i ) to reprocess events in the same order regardless of a failure, provide the strongest possible guarantee to its users, i.e., precise recovery .
However, not every ESP application requires such strong guarantees in order to continue processing and fulfilling its objective. An alternative, with slightly weaker guarantees provides
rollback recovery [HBR+05], where neither in-flight events nor state is lost, however, the reprocessing of in-flight events might take a different execution paths after a crash than it would have taken in a failure-free execution, hence, resulting in different conclusions. In contrast to precise and rollback recovery, gap recovery does not preserve any events for a replay but operator state, while in amnesia, neither the operator state nor in-flight events are preserved so that an operator is always restarted with a fresh virgin state and resumes event processing using the next available input.
2.2.3 Fault Tolerance Approaches
Fault tolerance in ESP systems is mainly achieved through replication, either of state an operator may maintain or the processing of events itself. Hence, approaches for implementing fault tolerance in ESP systems are either based on one of the two replication mechanisms or a combination of both.
replication, a second copy of an operator exists (called secondary) processing the same input stream of events as its primary peer as shown in Figure 2.4. Active replication can transparently mask failures as events are being continuously processed by both operator instances producing identical outputs for downstream operators. However, active replication is not trivial to implement and comes with a high resource foot print. First, in order to produce identical results, the computation needs to be strictly deterministic. This can be only achieved if input streams are identical which requires coordination across replicas through an atomic broadcast protocol [CASD95]. However, since coordination among nodes is costly, simpler mechanisms such as a deterministic merge [AS00] can be used, guaranteeing identical input sequences at a much lower overhead. Second, as the computation is replicated, active replication requires at least twice the resources, computational as well as network bandwidth wise which is wasted during the time of a failure-free execution.
Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Remove duplicates Primary Secondary Primary Secondary
Figure 2.4: Active replication - prior (left) and after (right) a crash.
An alternative to active replication is upstream backup [HBR+05] where only one operator instance is running and doing event processing at a time. However, in order survive crashes, events must be additionally buffered at upstream operators in order to be replayed to a newly created instance of an operator as depicted in Figure 2.5. Although this approach saves the computational costs for redundant processing, it comes with the disadvantage of long recovery times. During recovery, a new instance of the operator needs to be created first, events buffered at upstream operators must be retransmitted and then reprocessed in order to resume event processing without information loss. Since memory at upstream operators is bounded and reprocessing of events takes a substantial amount of time, this approach is impractical especially for long running operations. Hence, mixed approaches are common where both, replication of state and processing is used.
One of the approaches that makes use of state replication is passive replication. In passive replication, the state of an operator is periodically checkpointed to stable storage that can be either a local disk if the system under consideration only suffers from process crashes where the system only needs to be restarted using a watchdog mechanism after experiencing a crash, or some fault tolerant distributed filesystem (DFS) spanning multiple nodes if hardware failures need to be tolerated as well. In addition to state replication, events are buffered at upstream operators as in upstream backup. However, contrary to upstream backup, the
Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator upstream downstream replay Node Node Queue Operator Node Node Queue Operator upstream downstream
Figure 2.5: Upstream backup - before (left) and after (right) a crash.
buffers at upstream operators are periodically pruned (via an acknowledgment message) with each successful checkpoint in order to keep the buffer small and speeding up the event replay during recovery as shown in Figure 2.6. Although this approach has several advantages over the classical upstream backup, the recovery time can still greatly vary as it depends primarily on the frequency of checkpoints taken and the size of the operator state. For instance, a recovery of an operator with a state size of 4 GB from local disk would require at least 40 seconds under the assumption of a throughput of 100 MB/s for reading a checkpoint from a conventional magnetic disk. Hence, several hybrid approaches have evolved over time with the goal to the lower recovery time.
Node Node Queue Operator Node Node Queue Operator state state Node Node Queue Operator ack ack state
state statestate
Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator state state Node Node Queue Operator replay
Figure 2.6: Passive replication - before (left) and after (right) a crash.
One of such hybrid approaches is passive standby where checkpoints are stored at a suspended operator replica instead of using an external stable storage. Since such a replica is already equipped with an up-to-date state, only events between the last taken checkpoint and the time the system crash occurred must be pulled and replayed as shown in Figure 2.7.
Contrary to passive standby, the secondary peer in active standby receives and processes events from upstream operators similar as in active replication, however, output produced by the secondary peer is not being sent downstream saving bandwidth costs as depicted in Figure 2.8. Active standby comes with the advantage of providing a quick fail over as only the connection to the downstream operator must be established in order to resume event
Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Primary Secondary z z z state state Primary Secondary replay
Figure 2.7: Passive standby - before (left) and after (right) a crash.
processing. However, the approach has almost the same costs with regards to processing and network resources as active replication. Hence, the advantages and the associated costs must be carefully considered when choosing an appropriate fault tolerance approach for the ESP application at hand. In Chapter 7, we will provide the reader with a more condensed overview about resource requirements and recovery times for each of these different approaches, and propose a fault tolerance controller that takes the burden from the system user to choose the most appropriate scheme considering resource requirements and recovery times.
Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Node Node Queue Operator Primary Secondary Primary Secondary
Figure 2.8: Active standby - before (left) and after (right) a crash.