• No results found

3.9 Related Event Stream Processing Systems

3.9.1 Apache S4

Apache S4 [S4215, NRNK10], originally developed and pushed by Yahoo! Inc. was a first attempt towards real-time computation at large scale. The ESP system is entirely written in Java which simplifies debugging and bug tracking while still being portable for various platforms. S4’s programming model was inspired by the simplicity of MapReduce [DG08] where users are enabled to implement operators through user-defined-functions which are instantiated as processing elements (PEs) during runtime. Similar as with STREAMMINE3G, users are provided with an event as input and a stream object (which mimics a collector object) to emit events to downstream peers. However, contrary to our system, PEs are instantiated based on the emitted keys, hence, a PE operates solely on a single key rather than on a partitioned stream of events comprising a set of keys. In order to mimic event partitioning, the providedKeyFinderinterface can be used in order to assign events to predefined keys based on the attributes provided.

The flow of events in S4 is specified through streams that are attached to PEs. PEs consume events from those attached streams where the streams themselves can be made available to any of the peer PEs in order to allow them to emit and route events transparently to interested parties forming a topology of operators. In order to feed the system with events, a special adapter operator exists which has access to the input stream attached to the first operator in the chain of operators.

For message transport,netty.io[Net15b] is used, a Java-based library providing a convenient abstraction to the non-blocking native I/O interface in a similar way as boost.asiofor C++. However, contrary to STREAMMINE3G which sends events in batches when facing high volumes of data, events in S4 are always disseminated without any batching capabilities. Moreover, S4 reads messages asynchronously without throttling read requests (AUTO_READ mode) which can quickly saturate input buffers where events will be dropped transparently in case of an overload of the operator which is also known as load shedding [TcZ07].

Similar as with STREAMMINE3G, S4 uses Zookeeper in order to store its cluster configuration in a reliable manner. However, the cluster configuration is static and cannot be changed during runtime. Hence, nodes cannot be added or removed during the course of processing in order to accommodate new load situations through elasticity mechanisms. Moreover, a S4 process can only host a single PE where co-location of PEs for better resource utilization can only be achieved through launching of several S4 processes on the same node.

While the initial releases of S4 were only equipped with very basic fault tolerance by simply restarting crashed PEs, the latest version 0.6.0 offers state persistence through uncoordinated checkpointing where the fields of a PE are transparently serialized and written to stable storage. Although checkpointing preserves the state of long running operations, S4 does not provide strong consistency since events are neither merged, processed deterministically nor logged for a replay of in-flight events during an initiated recovery. Hence, S4 provides only gap recovery without strong consistency for state as well as events.

Although S4 was open-sourced by Yahoo Inc. in early 2011 and became an Apache incubator project just two years after, the project itself is can be considered as dead since its last release (0.6.0) in summer 2013.

3.9.2 Apache Storm

A project that evolved roughly at the same time as S4 is Apache Storm [Sto15] which was originally pushed by Twitter Inc. and is written in Clojure [Hal09], a Java-based dialect. Oper- ators in Storm are implemented using UDFs in a similar way as in S4 and STREAMMINE3G. However, contrary to S4 where operators (processing elements) solely operate on a single key, Storm operates on a partitioned stream of events comprising several keys specified through

stream-groupings which is essentially a partitioner. Instead of providing several streams to

an operator in order to have events delivered downstream as done in S4, Storm provides a single collector object to emit events and handles transparently the event dissemination to downstream peers based on the user’s specified topology.

Topologies in Storm are established by specifying the set of upstream operators a downstream operator is required to consume events from. The topology definition also contains the partitioning scheme (i.e., the stream grouping) as well as the parallelization degree in a similar fashion as in STREAMMINE3G. Storm distinguishes between two types of operators: bolts and spouts. While spouts are solely used as input adapters to ingest events, bolts serve as regular operators within a topology that receive, process and emit events to downstream bolts. Earlier versions of Storm were based onzeroMQ[Zer15] for message dissemination across the network as it provides a high-level abstraction for networking similar toboost.asio. However, sincezeroMQis written in C++, the use of a JNI wrapper [Lia99] library was needed which intro- duced additional overhead through (un-)marshaling of data and complexity when deploying the system on diverse platforms. Therefore, authors recently switched to thenetty.ioframe-

work which not only simplifies the deployment of the system but also improves performance as micro benchmarks have shown [Net15a].

Storm clusters are configured through Zookeeper, however, contrary to S4, new nodes can be added at runtime. In order to mimic elasticity, Storm provides a re-balance feature where operators (i.e., bolts) will be evenly redistributed across the cluster taking into account the newly added resources. Although re-balancing allows a scale out, a contraction of the system can only be achieved by actively crashing running nodes.

In order to cope with system failures, Storm provides the concept of transactional topologies where unacknowledged events will be automatically replayed. Hence, Storm employs mecha- nisms to prevent event loss, however, accumulated operator state will be lost. Authors of Storm propose to use RDBMS systems instead for storing the state of long running applications. In 2013, Storm became an Apache Incubator project and is currently maintained and sup- ported by a steadily growing community.

3.9.3 Apache Samza

A quite recent development in the area of ESP systems is Apache Samza [Sam15] which evolved from LinkedIn Inc. The system is written in Scala [OSV08] and uses Apache Kafka [Kaf15], a publish-subscribe system to exchange messages. Operators in Samza are calledStreamTasks that operate on a partitioned stream similar as in Storm and STREAMMINE3G.

Instead of defining an explicit topology which cannot be modified during runtime as in S4 and Storm, Samza follows a more flexible approach with a loose communication model where

StreamTaskscommunicate solely through named streams. This model allows a deployment and removal ofStreamTaskswithout any interference. In order to feed Samza with new events,StreamConsumersare used which serve as input adapters for the system.

Operators, i.e.,StreamTasksin Samza are deployed using Apache YARN [Yar15], a resource aware scheduler which evolved from the Hadoop eco-system. However, contrary to Storm and STREAMMINE3G which use multi-threading to process events, a set of single threaded processes is used in Samza to executeStreamTasksin a sequential way. While Storm supports re-balancing where threads and nodes can be added during runtime, Samza follows a more static approach using a fixed resource pool.

For message dissemination, Samza uses Kafka which employs its own low level implemen- tation to exchange messages between peer nodes based on bare-metal Java TCP sockets. However, to store the configuration of the system, Zookeeper is used.

Samza provides fault tolerance using two mechanisms: Periodic checkpointing of state and event logging. Event logging is realized through persistently writing events to stable storage which is executed at the transportation layer, i.e., Kafka while state persistence is provided by

Samza itself. In order to offer exactly once processing semantics, the state is equipped with a timestamp vector to track the progress of each upstream partition. However, this requires that every partition emits events with strictly monotonically increasing timestamps as in STREAMMINE3G.

SinceStreamTaskscan explicitly define the order of upstream partitions to consume events from at runtime, a deterministic merge can be easily implemented at operator level giving the user the possibility to recover from crashes in a precise manner as in STREAMMINE3G. Similar as with Storm, the project benefits from a large community and has been recently promoted to a top level Apache project.

3.9.4 SEEP

In contrast to the previous projects which evolved mainly from industry, we will now briefly present SEEP [See15, CFMKP13], an ESP system which originates from academia and is pub- licly available as open-source. SEEP is completely written in Java and shares many commonal- ities with STREAMMINE3G as the architecture of the system was designed with the focus to provide fault tolerance and elasticity for stateful operators at the same time.

As for the programming model, users are provided with a simple MapReduce-like interface which provides an event as input and a collector object (as field) to emit events downstream. Topologies in SEEP are defined using the provided Java API where first the operators are defined and then explicitly wired by defining to what downstream operators events should be sent to.

The partitioning of data is done explicitly through thesend()method which takes a second parameter streamId to specify to which downstream operator and partition an event should be routed to rather than encapsulating the data partitioning in a separate interface. In order to harness the power of nowadays multi-core architectures, SEEP provides multi-threading support for stateless operators.

Messages in SEEP are exchanged through the use of bare-metal Java socket connections. Similar as in STREAMMINE3G, event batching is used to reduce serialization costs and improve network efficiency.

Contrary to the previous presented systems, SEEP does not rely on Zookeeper for storing its configuration. Furthermore, nodes can be added at runtime which allows a scale-out of the system. Since SEEP comes with an explicit state management interface, stateful operators can be easily migrated to spare nodes during a scale-out without information loss.

The provided state management interface does not only serve for load balancing when scaling- out of the system but also for fault tolerance through periodic checkpointing. Since events are also logged prior leaving the node for dissemination to downstream operators, a gap-less

recovery as in STREAMMINE3G can be provided. However, providing precise recovery is only possible through the application of a deterministic merge which is currently only supported through the use of an explicit ordering operator that must be plugged in into the operator pipeline right before the operator requiring the ordered input.

3.9.5 IBM System S

A system that originates from industry, however, is not publicity available as open-source software is IBM’s System S [AAB+06, GAW+08]. The system comprises of the StreamProcess- ingCore (SPC) [AAB+06] which can be considered as the actual streaming engine similar to STREAMMINE3G with a query language layer on top called SPADE [GAW+08]. The language allows the definition of queries including the information about its distribution and paralleliza- tion. In addition to a high-level definition of queries using SPADE, users can also implement their custom operators using UDFs, maximizing the user’s flexibility.

SPC shares many similarities with STREAMMINE3G’s architecture and design: Firstly, operators are executed as processing elements (PEs) where they form a topology in order to carry out a specific query. Furthermore, topologies can be modified at runtime through stream composition. In order to increase the efficiency of the system, applications can consume public streams avoiding unnecessary re-computation of previous results. In situations where several succeeding operators are deployed on the same local node, data is passed via simple pointers as in STREAMMINE3G.

In order to handle system failures, System S is also equipped with a set of fault tolerance mechanisms such as active replication and rollback recovery preventing event and state loss.

3.10 Summary

In this chapter, we presented STREAMMINE3G, our approach for fault tolerant and elastic event stream processing. We first introduced the programming model and the general architecture of the system. We then provided details on how we support stateful event processing and achieve scalability through our explicit state management interface and data partitioning, respectively. We furthermore presented several techniques to improve performance and scalability such as event batching, efficient memory management and delegation mechanisms tailored to ESP systems.

With regards to fault tolerance, we presented several protocols and their underlying imple- mentations to efficiently perform checkpointing, event logging and redundant processing in STREAMMINE3G. In order to operate STREAMMINE3G in the cloud, we furthermore described two protocols for stateful operator migration. The chapter concludes with an overview and comparison about recently evolved ESP systems originating from industry and academia.

sive Replication

In this chapter, we will present an approach for lowering the runtime overhead of event ordering in the context of roll-back recovery. Event ordering ensures that events are processed deterministically producing repeatable output. Deterministic execution allows applications to recovery precisely without information loss nor duplicated processing.

Our approach is based on the observation that many windowed operators share the property of commutativity. We therefore introduce the notion of an epoch, matching the window of an operator. Instead of enforcing a strict ordering of events, we assign events to epochs and

process events within such epochs (i.e., windows) out-of-order. Checkpointing and recovery is

then solely performed at epoch boundaries providing a deterministic snapshot and allowing the application to recover in a precise manner.

4.1 Motivation

Since ESP applications are long running operations, one requirement developers of ESP applications often face is fault tolerance. Fault tolerance is not only important to mitigate crash failures of the system but also to allow a migration of the system to a new machine. Since the majority of ESP applications accumulate some form of state over time which must not be lost when facing a system failure or when migrating the application, appropriate mechanisms are required to persist state as well as events. Depending on the underlying ESP system, application developers are offered with a set of fault tolerance mechanisms and replication schemes they can choose from such as roll back recovery which uses a combination of checkpointing and logging or active replication.

Although active replication offers a quick recovery, it is often not the preferred solution as it consumes almost twice the resources through its redundant processing. Moreover, active repli- cation requires deterministic execution in order to reliably filter out duplicates at downstream operators which adds another layer of complexity.

tens of seconds where simpler mechanisms for fault tolerance such as rollback recovery can be applied. Moreover, rollback recovery uses a combination of checkpointing and event logging which consumes far less resources than the use of active replication.

Despite the fact that system crashes occur relatively seldom, application developers are often in favor of a precise recovery for their application where neither events are lost nor will be processed twice. Precise recovery simplifies the implementation of an application, and reduces the developer’s effort for guaranteeing and reasoning the correctness of the application at hand.

In order to let an application recover precisely, deterministic execution (as in active replica- tion) is required where events originating from different upstream operators are merged and processed, i.e., passed to the user’s operator code in a predefined order. The strict order allows operators1to produce the exact same sequence of events at any point of time, i.e., even during a replay. The replay-ability/reproducibility of events allows a reliable filtering of duplicates using a timestamp vector associated with the operator state and a gap-less recovery if event logging is used. Moreover, checkpoints can be taken independently without coordination which lowers the overhead imposed onto the system through checkpointing.

Although the use of rollback recovery saves computational resources, users are still required to run their application using deterministic execution in order to benefit from the properties provided by a precise recovery. However, deterministic execution with a strict ordering of events imposes a non-negligible overhead onto the system as we will show in the evaluation Section 4.3 of this chapter.

The majority of ESP applications perform some form of aggregation where either several events are summarized over some window or several streams joined in order to identify correlations between events within a fixed window. Many of those operators such as aggregation and joins share the property of commutativity where the order of arrival within a window does not distort the result. Commutativity also applies to non-order-sensitive custom-written operators (using user-defined-functions) as well as stateless ones such as simple filters. Since applications that solely employ commutative operators do not require a strictly ordered event stream in order to produce correct results, deterministic execution would solely serve here for the purpose of achieving a precise recovery.

In the following, we will present an alternative event ordering scheme which consumes far less resources than a strict ordering while still allowing applications to recover in a precise manner.

4.2 Epoch-Based Processing

Our approach is based on the observation that many applications use commutative operators and operate on jumping windows. Commutative operators share the property of non-order-

sensitivity where the result does not depend on the order of arrival of events within a window. Hence, the key idea of our approach is to process events within a window in arbitrary order but guaranteeing that windows themselves are processed in order.

In order to ensure the correctness of our weak ordering scheme, the set of events for a window must be always well defined. Events are assigned to such sets, which we call epochs, using their timestamps. An epochω is essentially a time based tumbling window with two bound- aries: A lower bound blowerand an upper bound bupperwhere both bounds are defined using

timestamps with t sblower < t sbupper. The assignment of events to epochs fulfills the following relation:ω = {e|tsblower< t sevent∧ t sevent≤ t sbupper}