Scheduler for the Stream Processing
Frameworks on Hadoop Clusters
Mgr. Petr Škoda
Rigorous Thesis
Abstract
Big data processing is a hot topic of today’s computer world. One of the key paradigms be-hind it is MapReduce—parallel and massively distributed model inspired by the map and reduce functions commonly used in functional programming. Due to its simplicity and general availability of standard implementations, the paradigm has been massively adopted on cur-rent computer clusters. Yet, MapReduce is not optimal for all big data problems.
My work focuses on the area of an alternative paradigm—stream processing—which has multiple advantages over the MapReduce, e.g., it avoids persistent data storing if not re-quired. The research aims at overcoming deficiencies of existing stream processing frame-works that prevent its wider adoption.
In particular, the work deals with scheduling problems of stream processing applica-tions on heterogeneous clusters. Heterogeneity is a typical characteristic of today’s large data centers (caused by incremental upgrades and combinations of computing architectures, in-cluding specialized hardware such as GPU or FPGA) and advanced scheduling mechanisms can significantly increase efficiency of their utilization.
The state-of-the-art research and development of stream processing and advanced methods of related scheduling techniques are discussed in this document. A special attention is paid to benchmark-based scheduling for distributed stream processing which also forms the core of my previous work and the proposed research towards my doctoral thesis.
Finally, the concept of novel heterogeneity aware scheduler is presented first in the in-tuitive way and then discussed deeper on theoretical basis. The prototype of the scheduler is then described and promising results of basic experiments are showed.
Contents
Chapter 1 Introduction 4
Chapter 2 State of the Art 7
2.1 Stream Processing ...7
2.1.1 Overview ...7
2.1.2 Components and Architecture ...9
2.1.3 Fault Tolerance ... 12
2.2 Resource Allocation and Scheduling ... 13
2.2.1 Metrics ... 14
2.2.2 Apache Mesos and Apache YARN ... 15
2.2.3 Interesting Approaches to Scheduling ... 19
2.3 Benchmark-based Scheduling ... 20
2.3.1 Benchmarking ... 20
2.3.2 Speculative Execution in Hadoop Revisited ... 21
2.3.3 Real Benchmarks ... 22
2.4 Beyond the State of the Art ... 23
Chapter 3 Heterogeneity Aware Scheduler 25 3.1 Motivation ... 25
3.2 Intuitive Example ... 25
3.2.1 The Application—Popular Stories ... 26
3.2.2 Standard Scheduling ... 27
3.2.3 Advanced Scheduling ... 28
3.3 Scheduler ... 28
3.3.1 Performance of Program to Hardware Class Combinations ... 29
3.3.2 Benchmarking and Reschedules ... 30
3.3.3 Heterogeneity Aware Scheduling ... 31
3.4 Experiments and Results ... 31
3.4.1 Prototype scheduler implementation ... 31
3.4.2 Experiments ... 32
3.4.3 Results ... 32
Chapter 4 Previous Work 34 4.1 ReReSearch ... 34
4.2 mOSAIC ... 35
4.3 JUNIPER+ ... 35
Chapter 5 Ongoing Work 37
Chapter 1
Introduction
As the Internet grows bigger, the amount of data that can be gathered, stored, and processed constantly increases. The first companies that handled internet based big data were the search providers such as Yahoo! and later Google. It is obvious that big amounts of data were processed even in some other industrial branches and scientific disciplines but the main leaders in development of the new cheaper, faster, more flexible and more available technolo-gies were rather the Internet based companies1. Later on, besides the search providers, first so called Web 2.0 companies raised; these were primarily blogs and afterward social networks.
From the technological point of view, the first big data was handled by many special-purpose tools implemented usually only for one type of computation. The large amounts of raw data, e.g., crawled documents, web request logs, etc., were processed this way to com-pute various kinds of derived data such as inverted indices, various representations of the graph structure of the Web documents, summaries of the number of pages crawled per host, and the set of most frequent queries in a given day. Most such computations were conceptu-ally straightforward; however, the input data was usuconceptu-ally large and the computations had to be distributed across hundreds or thousands of machines in order to finish in a reasonable amount of time. The issues of how to parallelize the computation, distribute the data, and handle failures conspired to obscure the original simple computation with large amounts of complex code dealing with these secondary issues. A solution, the MapReduce programming model, was developed and introduced by Google. [1]
MapReduce is an abstraction layer that allows expressing the simple computations but
hides the messy details of parallelization, fault tolerance, data distribution and load balancing which leads to an ability to process large data sets in a massively parallel manner. The pro-gramming model is based on the following simple concepts: (i) iteration over the input; (ii) computation of key/value pairs from each piece of input; (iii) grouping of all intermediate values by key; (iv) iteration over the resulting groups; (v) reduction of each group [1, 2].
To-day, the MapReduce programming model is widely adopted and is one of the most common
ways to process the large amounts of data. The adoption is mainly led by Apache Hadoop, an open source implementation of MapReduce programming model, which turned into a stack of
tools for parallel big data processing.
The Hadoop computations powered by MapReduce have few drawbacks where one of them is a barrier synchronization between map and reduce phase. It means that all map tasks
have to be completed prior to any reduce task can start—the computation model is strongly bound to data, its storage, and location. This behavior makes MapReduce rather a batch
1 Main difference between the Internet based companies and other companies (including scientific applications) in
context of big data is that the former started with simple and easily parallelizable computations and over time evolved into the more complex ones but the later companies have always needed complex and hardly parallelizable computations.
processing than online tool. To overcome the batch behavior, different ideas were intro-duced—for example [3] describes hierarchical and incremental reduction; in [4] support for pipelining is proposed; and the suitability of MapReduce for one-pass analytics processing is analyzed in [5].
If we get back to the big data, there are multiple interesting aspects that affected the development around its processing. We can mention, for instance, a constant growth of smartphones’ popularity hand in hand with availability of mobile data plans; a great success of social networks of all kinds; and constantly decreasing costs of different sensors that can be utilized in the industrial applications or logistics. What all previously named aspects have in common is that they constantly produce the large amounts of data (especially when many of entities are present) and simultaneously there are growing needs to evaluate these data as soon as possible. But still, while the MapReduce and its most popular implementation Apache Hadoop have already irreplaceable role at almost every company dealing with big
data, the MapReduce is not suitable for online processing. This is the place for the new ideas and paradigms.
We observed that the batch nature of MapReduce, despite the mentioned efforts, re-cently caused a kind of hunger for technologies better suitable for online processing of stream data for which the original concept does not fit perfectly. At the same time, it is still required to keep the easy parallelization, fault tolerance, data distribution and load balancing—the benefits well known from Hadoop with MapReduce. This all leads to the rise of the new easi-ly parallelizable stream processing systems.
Although the stream processing is currently a target of interest of many stakeholders in area of big data, the integration into data processing platforms (e.g., Hadoop) is still unsatis-fying. Analogical condition dominates in the field of advanced scheduling algorithms. There is a lack of suitable algorithms that can effectively employ the capabilities of heterogeneous clusters. The situation is even more alarming when the usage of static code acceleration (i.e., FPGA) and GPU acceleration is considered. In this area, scheduling algorithms currently available for stream processing applications display many blank spaces.
My work deals with scheduling of long running stream processing applications on the large shared heterogeneous clusters of computers. Leading idea is to combine benchmark-based scheduling with other scheduling approaches. Moreover, the work is interested mainly
in the Apache Hadoop 2.0 clusters because many businesses already deployed big batch pro-cessing solutions on top of the Hadoop clusters, and the stream processing as such is just
another piece of big data puzzle they have to handle. The same fact recently led the Hadoop team to separate scheduler (now called YARN) from the rest of Hadoop tools. It means that the deployment of various distributed systems on top of Hadoop clusters is now much easier. At the same time, new YARN scheduler allows to share the clusters among users with fair-ness in resource scheduling without any difficulties. Enhanced scheduler for the stream pro-cessing applications on heterogeneous clusters, the result of my ongoing work, thus can be incorporated into the ecosystem of Hadoop and exploit all its advantages.
The rest of this document has the following structure. In Chapter 2, the state of the art
in fields of the stream processing, resource allocation and scheduling in general, and bench-mark-based scheduling is outlined. Chapter 3 then describes the motivation for the novel scheduler with intuitive example, the theoretical basis of the scheduler, the prototype
imple-mentation, and the experiments with the prototype including promising results in comparison to Apache Storm’s “standard” scheduler. Meanwhile, Chapter 4 summarizes my previous affil-iation and work. Lastly, in Chapter 5, the objectives of my future work are described.
Chapter 2
State of the Art
In previous section, a brief history of the big data and its development over time with respect to the stream processing was described. An oncoming chapter, State of the Art, is at first
focused on various aspects of stream processing after which the scheduling of stream pro-cessing jobs for different resources is discussed. Later, the benchmarking approaches, crucial
topic for my ongoing work, are introduced. Finally, the problems of current state of the art are highlighted and discussed.
2.1
Stream Processing
It was already mentioned that the classical MapReduce is not a silver bullet for online data processing because of its batch-like manner. Yet, there are attempts to transform existing MapReduce solutions into streams. Some examples were already mentioned in the Introduc-tion and we will discuss them in more detail together with older approaches to the stream
processing. Later, some novel ideas and projects in the field of stream processing will be pre-sented.
2.1.1
Overview
Over the past decade, the stream processing has been the subject of a vivid research. With regard to their scalability, the existing approaches can essentially be subdivided into three categories: Centralized, distributed, and massively-parallel stream processors. A short over-view of each category will now be given. The overover-view is based on the extensive summary of stream processing systems from [6].
Centralized
Initially, several centralized systems for stream processing have been proposed, such as Auro-ra [7] and STREAM [8, 9]. AuroAuro-ra is a database management system (DBMS) for continuous queries that are constructed by connecting a set of predefined operators to a directed acyclic graph (DAG). The stream processing engine schedules the execution of the operators and uses load-shedding, i.e., dropping intermediate tuples to meet the QoS goals. At the end-points of the graph, the user defined QoS functions are used to specify the desired latency and the tuples to be dropped. STREAM presents the additional strategies for applying the load-shedding, such as probabilistic exclusion of tuples. While these systems have useful properties such as respecting the latency requirements, they run on a single host and do not scale well with rising data rates and number of data sources.
Distributed
Later systems such as Aurora*/Medusa [10] support distributed processing of data streams. An Aurora* system is a set of Aurora nodes that cooperate via an overlay network within the same administrative domain. In Aurora* the nodes can freely relocate load by decentralized, pairwise exchange of the Aurora stream operators. Medusa integrates many participants such as several sites running Aurora* systems from different administrative domains into a single federated system. Borealis [11] extends Aurora*/Medusa and introduces, along with its other features, a sophisticated QoS optimization model where the effects of load-shedding on QoS can be computed at every point in the data flow. This enables the optimizer to find the bet-ter strategies for load-shedding.
Massively-Parallel
The third category of stream processing systems is made up by the massively-parallel data processing systems. In contrast to the previous two categories, these systems have been de-signed to run on hundreds or even thousands of nodes and to efficiently transfer large data volumes between them. Traditionally, those systems have been used to process finite blocks of data stored on distributed file systems. However, many of the newer systems such as Dry-ad [12], Hyracks[13], CIEL [14], DAGuE [15], or Nephele framework [16] allow to assemble the complex parallel data flow graphs and to construct the pipelines between the individual parts of the flow. Therefore, these parallel data flow systems in general are also suitable for the streaming applications.
Current Development
Recently, based on the popularity of MapReduce and the wide spread of Hadoop, there were introduced series of systems exploiting ideas of a MapReduce paradigm in context of the stream processing. The first work in this area was arguably Hadoop Online described in [4]. As was already mentioned in the Introduction, the developers of the Hadoop Online extended
the original Hadoop system by ability to stream intermediate results from the map to the reduce tasks as well as the possibility to pipeline data across the different MapReduce jobs. To facilitate these new features, they extended the semantics of the classic reduce function by time-based sliding windows. Li et al. [17] picked up this idea and further improved the suita-bility of Hadoop-based systems for continuous streams by replacing the sort-merge implemen-tation for partitioning by a new hash-based approach.
The Muppet system [18] also focuses on the parallel processing of continuous stream data while preserving a MapReduce-like programming abstraction. However, the authors de-cided to replace the reduce function by a more generic update function to allow for greater flexibility when processing intermediate data with identical keys. Muppet also aims to
sup-port near-real-time processing latencies.
The systems S4 [19] and Storm [20, 21] can also be classified as massively-parallel data processing systems with a clear emphasis on low latency. Their programming abstraction is finally not MapReduce but allows the developers to assemble arbitrarily complex DAG of processing tasks. For example, Twitter Storm does not use the intermediate queues to pass the data items from one task to the other; instead, data items are passed directly between the tasks using batch messages on the network level to achieve a good balance between laten-cy and throughput.
In the end, it is important to note that along with the stream processing paradigm we can lately observe another movement in the field of low latency computations based on the fast message brokers such as Apache Kafka [22] or Apache Flume. Although the message brokers are well known concept, the new wave of this technology focuses on different objec-tives, which makes it more suitable for high throughput processing.
Traditional enterprise messaging systems e.g., IBM Websphere MQ, or JMS specifica-tion compliant brokers, emphasize strong delivery guarantees mostly with pushing data to consumers and ignore the throughput, which makes them very robust and often slow. The new wave of message delivery systems, on the other hand, accentuates throughput and lets consumers to pull data as they need. This opens a way for the new distributed applications with high throughput and straightforward design constructed on top of these queues. In my opinion, such message delivery systems are another kind of stream processing systems with less limited communication schemas i.e., DAGs are welcomed but not required.
This was a brief overview of “historical” and recent approaches to the stream pro-cessing. Because this work deals mainly with the resource allocation and scheduling, from now on we will discuss the parallel systems only.
2.1.2
Components and Architecture
In this section, the focus will be given to S4 and Storm because the overview of wider range of systems would be excessively long, and, at the same time, we can consider these two being representative examples of modern stream processing architectures.
Across many massively parallel systems, a kind of a master-worker pattern is very
common. The master node usually receives data or tasks and distributes them over the net-work of net-worker nodes. A good example could be again the MapReduce—its master process after receiving a job descriptor starts mappers and reducers on different machines; at the same time, the master process is responsible for a fault tolerance and liveness of the worker nodes (see Fig. 1) [23, 1]. For parallel stream systems where the streams of data are often
running for a long time, the placement of tasks can be less frequent but the concept of master and worker nodes stays unchanged. One interesting difference is that MapReduce job eventu-ally finishes, whereas stream processing topologies run forever (or until you explicitly shut them off) [21].
When we look deeper into the structure of parallel systems, we can see that the compu-tations are divided into jobs. The arrangement of jobs can be described by a graph with ver-tices representing the job’s individual tasks and the edges denoting communication channels between them. For example, from a high-level perspective, the graph representation of a typ-ical MapReduce job would consist of a set of Map vertices connected to a set of Reduce verti-ces. Some frameworks have generalized the MapReduce model to arbitrary directed acyclic graphs (DAGs), some even allow graph structures containing loops [14]. [6]
The stream processing systems are very similar to other parallel systems. The computa-tion is mostly described as a DAG. There are slight differences in a communicacomputa-tion layer, while some systems use queues and intermediate message brokers; the others use a straight point to point (process to process) communication. Let me now describe the example topolo-gies of S4 and Storm.
S4
Fig. 2 S4: The Word Count Example, reproduced form [19]
In S4 the main computational unit is called processing element (PE). The communication between PEs is based on events. Every PE consumes just those events which correspond to the value on which it is keyed (one of the parameters of each PE is a keyed attribute) or all events of their type (for PEs without keyed attribute). PE may produce output events. Note that a PE is instantiated for each value of the key attribute (so multiple PEs of the same kind can be placed for a load balancing). This instantiation is performed by the platform. For example, in the word counting application (Fig. 2), WordCountPE is instantiated for each
word in the input. When a new word is seen in an event, S4 creates a new instance of the PE corresponding to that word. S4 provides several built-in PEs for standard tasks such as
ag-gregate, join, and others. Unneeded PEs can be according to their Time-To-Live
A higher level in the hierarchy of S4 is
layer for a physical node in cluster. PNs are hosts for PEs, they are responsible for listening to events, executing operations on the incoming events, dispatching events
tance of the communication layer, and emitting output events. S4 routes each event to PNs based on a hash function of the values of all known keyed attributes in that event. A single event may be routed to multiple PNs. An event listener in t
the processing element container [19]
Apache Storm
In Storm, the main computational unit Storm has another actor named
tional setup as it can read data from easily implemented). There can be multiple Storm topology are sent as tuples;
sumes any number of input streams, does some processing, and possibly emits streams. The complex stream transformat
from a stream of tweets, require multiple steps and thus multiple bolts. Bolts can do an thing from running functions, filter tuples, do streaming aggregations, do stream
talk to databases, and more. [21]
Fig. 3 Storm: The Rolling Top Words Topology
Networks of spouts and bolts are packaged into a “topology” straction that is submitted to
stream transformations where each node is a spout or which bolts are subscribing to which streams. When
stream, it sends the tuple to every bolt that subscribed to that stream. nodes of the topology indicate how tuples shou
tween Spout A and Bolt B, a link from Spout A to Bolt C, and a link from Bolt B to Bolt C,
2 If no events for that PE object arrive within a specified period of time, the PE becomes eligible for removal 3 Currently, only Kestrel queue is supported out of the box.
Unneeded PEs can be automatically removed during computation Live2. [19]
hierarchy of S4 is Processing Node (PN) that
physical node in cluster. PNs are hosts for PEs, they are responsible for listening to events, executing operations on the incoming events, dispatching events
tance of the communication layer, and emitting output events. S4 routes each event to PNs based on a hash function of the values of all known keyed attributes in that event. A single be routed to multiple PNs. An event listener in the PN passes incoming events to container that invokes the appropriate PEs in the appropriate order.
the main computational unit where the user code runs is called named Spout that serves as a source of data to the
tional setup as it can read data from a queue3 or another sources (new spout types can be here can be multiple Spouts in one topology. The
orm topology are sent as tuples; unbounded sequence of tuples is called stream. sumes any number of input streams, does some processing, and possibly emits
omplex stream transformations, such as computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts. Bolts can do an
functions, filter tuples, do streaming aggregations, do stream [21]
Storm: The Rolling Top Words Topology, reproduced form
and bolts are packaged into a “topology” which is the top
to the Storm cluster for execution. The topology is a graph of stream transformations where each node is a spout or a bolt. Edges in the graph indicate which bolts are subscribing to which streams. When the spout or bolt emits a tuple to a
ple to every bolt that subscribed to that stream.
topology indicate how tuples should be passed around e.g., if there is a link b tween Spout A and Bolt B, a link from Spout A to Bolt C, and a link from Bolt B to Bolt C,
If no events for that PE object arrive within a specified period of time, the PE becomes eligible for removal queue is supported out of the box.
during computation that is an abstraction physical node in cluster. PNs are hosts for PEs, they are responsible for listening to events, executing operations on the incoming events, dispatching events with the assis-tance of the communication layer, and emitting output events. S4 routes each event to PNs based on a hash function of the values of all known keyed attributes in that event. A single he PN passes incoming events to invokes the appropriate PEs in the appropriate order.
is called Bolt. Besides that, the whole computa-or another sources (new spout types can be
The data inside of the unbounded sequence of tuples is called stream. A bolt con-sumes any number of input streams, does some processing, and possibly emits the new
computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts. Bolts can do
any-functions, filter tuples, do streaming aggregations, do streaming joins,
, reproduced form [24]
which is the top-level ab-topology is a graph of bolt. Edges in the graph indicate spout or bolt emits a tuple to a Links between the , if there is a link be-tween Spout A and Bolt B, a link from Spout A to Bolt C, and a link from Bolt B to Bolt C,
then every time the Spout A emits a tuple, it will send it to both Bolt B and Bolt C. All of the Bolt B’s output tuples will go to the Bolt C as well. Each node in the Storm topology executes in parallel. User can specify how many parallel instances are wanted for each node, and then Storm will spawn that number of threads across the cluster to do the execution. [21]
2.1.3
Fault Tolerance
A system designed to run on a large number of commodity computers must be capable of detecting and reacting to possible faults that might occur during its regular operation. Hadoop, for example, uses a naïve strategy based on storing all intermediate results to dura-ble storage before making progress. While such naive strategy is a safe approach, it can be extremely expensive, especially for the systems with high throughputs of data. At the same time, saving data to the persistent storages is one of the main factors causing the undeniable batch nature of MapReduce based systems. The form of fault tolerance handling can signifi-cantly affect the overall performance of distributed system because in very large clusters, i.e., thousands of nodes, the failures are quite probable and they affect a significant portion of the jobs running on such cluster [25].
If we take a look at the systems previously mentioned in chapter 2.1.1 Overview, we
can say that CIEL [14], Nephele[6], Dryad[12], DAGuE [26, 15], Hyracks[13], and Storm [21] are designed to be fully fault tolerant while S4 [19] is only partially fault tolerant. S4 is in-tended for dedicated homogeneous clusters and for the computations where the small errors are acceptable (e.g., click through-rate computation on streamed data); thus, faults of nodes are handled just by moving processes to a standby server. The state of the processes, which is stored in a local memory, is lost during the handoff. On new server, the state is regenerated using the input streams, yet, some data may be lost.
The systems with the full fault tolerance usually support transactional processing of
streamed data—it is being confirmed that the computation on piece of input data was suc-cessful and complete (in case of failures, the computation is restarted for given data). Nephele, unlike the others, saves checkpointing data to a distributed file system, which can be classified as the naïve approach. The rest of the systems named before use techniques based on replaying of a small portion of computation starting with intermediate results back-tracked from DAG of computation. In the worst case, when no intermediate results are avail-able, the computation for given source data is restarted from the beginning on all jobs im-pacted by failures.
For example, CIEL in a case of job failure automatically re-executes the job. However, if it has failed because its inputs were stored on a failed worker, the task is no longer runna-ble. In that case, CIEL recursively re-executes predecessor tasks until all of the failed task’s dependencies are resolved. To achieve this, the master invalidates the locations in the object table for each missing input and lazily reevaluates the missing inputs. Other tasks that de-pend on data from the failed worker will also fail, and these are similarly re-executed by the master. [14]
Storm, on the other hand, traces the “tree of tuples”. Whenever a new tuple is created
in Spout or Bolt, the tree of computations for the originating tuple is updated, so it is always evident which parts of DAG were accomplished. All vertices (Bolts) of the tuple’s tree must
then be finished by acknowledgement4. When the acknowledgement of some tuple is timed out, the computation is restarted from the originating Spout where the original tuples must be kept until the full acknowledgement of tuple’s tree.
2.2
Resource Allocation and Scheduling
In context of a High-Performance Computing (HPC) and the shared clusters, resource alloca-tion and scheduling are little overlapping terms. Resource allocation deals mainly with a
problem of gathering and assigning resources to the different requestors. Resource can be
computing node, CPU time, memory, HDD bandwidth, network bandwidth, or other less
common species such as GPU time or FPGA time. Requestor of resources is in terms of HPC typically a framework that needs to run a job on the shared cluster of computers5. A com-plexity of resource allocation is illustrated on Fig. 4. Scheduling, on the other hand, cares about the question: “when to place which tasks to which previously obtained resources.” A typical scenario is: the user commits his job to a framework, the framework asks for resources from the resource allocation system, and after receiving resources, the framework’s scheduler takes care of placing the tasks to the resources over the time of job’s computation. [27, 28, 29, 30]
Fig. 4 Resource Allocation Strategies (RAS) in cloud computing, reproduced form [29] HPC cluster architectures are ideally suited to workloads where work can be divided into the independent pieces and distributed accordingly. Many workloads, however, can only be parallelized to some extent—they typically contain dependencies that require some serial execution. At the same time, these workloads may also be broken into pieces of significantly different sizes. While producing a schedule using a policy such as First In First Out (FIFO)
may be trivial, scheduling over a big cluster to meet quality of service requirements or a level of optimality in the presence of dependencies can be difficult [27]. Optimal scheduling in the general case is an NP-Complete problem. Therefore, optimal scheduling is intractable at the
4 Bolts at the ends of DAG typically save some results to persistent storages or pass results out of the Storm
infrastructure.
scale of large clusters where heterogeneity and network delays are also present. Instead, the heuristic [31, 32] or machine learning [33, 34, 35] scheduling policies are better suitable. [36]
Other interests lay in Quality of Service (QoS), both the users and the providers want to have guarantees that the computation starts or is done in some reasonable time interval. Poor QoS can be caused by poor scheduling decisions leading to low throughput or unaccept-ably long task waiting times. Metrics are essential for providers to be able to monitor the QoS they are delivering and these metrics should be appropriate to the needs of their users. To provide the QoS is challenging because the workloads are usually unpredictable and the utilization of already running jobs varies over time. [36]
2.2.1
Metrics
In order to objectively compare schedulers, the metrics are required. Different metrics are more or less relevant to the different stakeholders in the system. Three metrics that represent each of the industrial stakeholder perspectives are selected here. The metrics relevant to the system administrators correspond to those related to utilization and the metrics that
repre-sent the users’ point of view correspond to the responsiveness and fairness metrics. Another
commonly used category of metrics is relative metrics. Relative metrics compare schedulers
by counting the number of the “best” schedules (by another metric) over a number of scenar-ios in a problem space. Extensive analysis of these and other metrics can be found in [36], which this brief description is based on.
• Utilization—measures how much of the maximum potential of the platform is actually
being used. It is desirable to avoid having idle resources if there is non-empty working queue. Utilization can be measured by a number of different metrics. A workload makespan (1), which is defined by the time at which all the work in the workload was
completed, is widely used. Even though it is not suitable for measuring of responsive-ness or fairresponsive-ness, the workload makespan is accurate for utilization in a case of static scheduling problems. For dynamic scheduling, the Average Utilization (2) [37] can be used when it is calculated over some time interval. These intervals can be monitored over the time (like daily or weekly average utilization). Note that Gcores is the number of processing units.
(1)
(2)
• Responsiveness—compares how successful the scheduler is in keeping the job latency
low. There will always be a minimum time that a job will take to execute, and this is determined by its critical path. However, the time spent by queueing or during net-work transfers will influence the responsiveness of a job. Responsiveness metrics can be a tool for measuring how successful is the scheduler under the periods of heavy load. Cumulative Completion (3) is a metric that rewards early completion of work, and therefore good average responsiveness [31]. This metric calculates the sum of completed job execution times at each time tick in the execution thus it provides
bet-ter insight into how the computation was done than the utilization metrics. For dy-namic scheduling, the metrics must be extended, so that it works over the time win-dow. Much more accurate metric for responsiveness is Schedule Length Ratio (4) [38]
as it compares the ideal response time of critical path on one core (JCP) with actual response time (Jresponse). This metric is thus independent of the total execution time or the parallelism available in the job.
(3)
(4)
• Fairness is only little mentioned in literature, the best source is [39]. Burkimsher at
al. in [36] then explains that there can be a tradeoff between fairness and utilization in a non-pre-emptive system. Consequently, the metrics are needed to quantify the level of fairness and to ensure that the tradeoff is managed appropriately. There may be an underlying assumption that by raising utilization, responsiveness is maximized; therefore, fairness will be near optimal as well. The average value of Schedule Length Ratio metrics can be used to gauge the responsiveness that a scheduler is able to
achieve with a given workload. Tight clustering of the responsiveness values may in-dicate a fair distribution of grid resources to jobs. The importance of fairness is more-over connected to QoS.
• Relative—a metric where for a given problem instance, the performance of all the
con-sidered schedulers are compared against a given metric—usually the workload makespan. This is repeated over a number of problem instances. The “best” scheduler
is then considered to be the one that had the highest number of wins over the prob-lem space. Relative metrics can often be useful for real-world scheduling probprob-lems, because finding the optimal schedule is computationally intractable.
2.2.2
Apache Mesos and Apache YARN
As was discussed earlier, Hadoop is already one of the crucial technologies for big data pro-cessing on a large cluster. More importantly, it became the place within an organization where the engineers and researchers have an instantaneous and almost unrestricted access to vast amounts of computational resources and stores of company data. It is caused by MapReduce’s tight binding to the storage6. With this wide adoption, the public of developers extended the MapReduce programming model beyond the capabilities of the cluster manage-ment substrate. A common pattern submits “map-only” jobs to spawn arbitrary processes in the cluster. The examples of (ab)uses include forking web servers and gang-scheduled compu-tation of iterative workloads. Developers, in order to leverage the physical resources, often resorted to clever workarounds to sidestep the limits of the MapReduce API. The main rea-son for those misuses of Hadoop and MapReduce was the monolithic architecture of the re-source management functions and the programming model. [40]
Apache Mesos and Apache YARN are discussed in this section mainly because they are both targeted to the MapReduce clusters shared across different frameworks such as MPI, Dryad, or Storm. Another interesting projects dealing with clusters for multiple frameworks are Omega [41], Corona [42], or Cosmos [43]. However, these systems are more focused on the specific needs of companies that have created them (Google, Facebook, Microsoft) than to the universal scenarios. A better comparison can be found in [40].
Apache Mesos
Mesos is a thin fault-tolerant resource sharing layer that enables fine-grained sharing across the diverse cluster computing frameworks by giving frameworks a common interface for ac-cessing cluster resources. It is written in C++ and it has a pretty small codebase of about 10 000 lines.
Because the cluster frameworks are both highly diverse and rapidly evolving, an over-riding design philosophy of Mesos was defining a minimal interface that enables an efficient resource sharing across frameworks, and otherwise pushing control of task scheduling and execution to the frameworks. Firstly, it allows frameworks to implement diverse approaches to various problems in the cluster (e.g., achieving data locality, dealing with faults), and to evolve these solutions independently. Secondly, it keeps Mesos simple and minimizes the rate of change required of the system, which makes it easier to keep Mesos scalable and robust. [44]
Mesos consists of a master process that manages slave daemons running on each cluster
node, and frameworks that run tasks on these slaves (see Fig. 5). The master implements a fine-grained sharing across frameworks using resource offers. Each resource offer is a list of free resources on multiple slaves. The offers are generated according to the organizational policy (these are defined in pluggable allocation modules, Fig. 5). The frameworks running on
Mesos consist of two components: a scheduler that registers with the master to be offered with resources, and an executor process that is launched on the slave nodes to run the framework’s tasks. Each framework can accept or reject the offer and ask for another offer. Frameworks cannot send detailed requests to Mesos, instead they can state which offers are unacceptable for them by defining the simple filters or whitelists of acceptable nodes. The rejection, filter, and whitelist mechanisms enable frameworks to support arbitrarily complex resource constraints while keeping the Mesos simple and scalable. [44]
Fig. 5 To the left, Mesos architecture diagram with two running frameworks (Hadoop and MPI); to the right, example of resource offer; reproduced form [44]
Resource allocation in Mesos is processed in pluggable modules, so that organizations
can tailor allocation to their needs. Out of the box, there are two allocation modules: one that performs fair sharing based on a generalization of max-min fairness for multiple re-sources, and one that implements strict priorities. Similar policies are used in Hadoop and Dryad. In case of need, i.e., many long tasks are blocking the whole cluster, Mesos can revoke (kill) them—this is “preemption”. [44]
Performance isolation between frameworks on the same node is being done by leverag-ing existleverag-ing mechanisms of the underlyleverag-ing OS. Namely Linux Containers or Solaris Projects; these can limit the CPU, memory, network bandwidth, and (in new Linux kernels) IO usage of a process tree. [44]
Apache YARN
The YARN scheduler is a result of a Hadoop’s evolution. Originally, the Hadoop was using master-slave architecture of one JobTracker and multiple TaskTrackers on each worker node.
Drawback of this solution was a very tight connection to the MapReduce programming mod-el where running of non-MapReduce jobs was always kind of compromise. To overcome this,
Hadoop 2.0 is delivered with its own scheduler. Therefore, YARN lies on the same level as
Mesos. The new scheduler in Hadoop allows extending the whole ecosystem by different frameworks and at the same time, it opens Hadoop clusters to other third parties’ frame-works needed by different users.
YARN newly lifts some functions into a platform layer responsible for resource
man-agement, leaving coordination of logical execution plans to a host of framework implementa-tions (Fig. 6). By separating these responsibilities in the JobTracker’s charter, the central
allocator can use an abstract description of tenants’ requirements but remains unaware of the semantics of each allocation. That responsibility is delegated to an ApplicationMaster (AM),
which coordinates the logical plan of a single job by requesting resources from the
ResourceMaster (RM), generating a physical plan from the resources it receives, and
coordi-nating the execution of that plan around faults. [40]
Fig. 6 YARN Architecture (the system components in blue, and two applications running in yellow and pink); reproduced form [40]
Typically, an AM will need to collect the resources (CPUs, RAM, disks etc.) available on multiple nodes to complete a job. To obtain containers, AM issues resource requests to the RM. The form of these requests includes specification of locality preferences and proper-ties of the containers. The RM will attempt to satisfy the resource requests coming from each
application according to availability and scheduling policies. When a resource is allocated on behalf of an AM, the RM generates a lease for the resource, which is pulled by a subsequent AM heartbeat. [40]
The ResourceManager (RM) matches a global model of the cluster state against the list of resource requirements reported by running applications. This makes it possible to tightly enforce global scheduling properties (different schedulers in YARN focus on different global properties such as capacity or fairness) but it requires the scheduler to obtain an accurate understanding of applications’ resource requirements. Request vector of the AM contains:
number of containers, resources per container7 (currently amount of RAM and number of
CPU), locality preferences (granularity is node-level, rack-level, and global), and priority of requests within the application. The RM can request back some resources from the AMs that,
on the other hand, have some flexibility in fulfilling such preemption—for example, the AM can migrate some tasks or checkpoint them prior to returning the resource. ResourceManager
is not responsible for coordinating application execution or task fault-tolerance while these are responsibilities of framework’s ApplicationMaster. [40]
The ApplicationMaster is the process that coordinates the framework’s execution in the
cluster but it itself is run in the cluster just like any other container. A component of the RM negotiates for the container to spawn this bootstrap process. After building a model of the framework’s requirements, the AM encodes preferences and constraints in a heartbeat mes-sage to the RM. In response to subsequent heartbeats, the AM will receive a container lease on bundles of resources bound to a particular node in the cluster. Based on the containers it receives from the RM, the AM may update its execution plan to accommodate perceived abundance or scarcity. The AM optimizes a locality of data by re-requesting containers with diminished weight for nodes that are not desirable as hosts for tasks. [40]
The NodeManager (NM) is the “worker” daemon in YARN. It authenticates container
leases, manages containers’ dependencies, monitors their execution, and provides a set of ser-vices to containers. Operators configure it to report memory, CPU, and other resources avail-able at this node and allocated for YARN. The NM mainly takes care of node’s liveness, availability of all necessary packages, and of cleanup after each task including the killing of the task on the RM’s or the AM’s demand. [40]
Conclusion
While both Mesos and YARN have schedulers at two levels, there are two very significant differences. First, Mesos is an offer-based resource manager, whereas YARN has a request-based approach. YARN allows the AM to ask for resources request-based on various criteria includ-ing locations, allows the requester to modify its future requests based on what was given and on the current usage. The YARN’s approach is better suitable for location based allocation even though the results of Mesos are considered to be good in [44]. Second, instead of a per-job intra-framework scheduler, Mesos leverages a pool of central schedulers (e.g., classic Hadoop or MPI). YARN enables late binding of containers to tasks, where each individual job can perform local optimizations, and seems more amenable to rolling upgrades (since each job can run on a different version of the framework). On the other side, per-job ApplicationMaster might result in greater overhead than the Mesos’ approach. [40, 44]
2.2.3
Interesting Approaches to Scheduling
Besides the common schedulers like random, FIFO, capacity, or fair, there are some unusual ones. These are mostly targeted to some occasional behavior of frameworks but they still can show some interesting ways to improve the scheduling and resource allocation decisions.
Hadoop’s standard scheduler is equipped with a simple speculative re-execution of tasks that may be late or failed. This is based on a monitoring of tasks’ progress. Because hetero-geneous clusters proven this to be problematic, Zaharia at al. [45] propose the LATE sched-uler for speculative execution. The LATE re-executes the tasks according their probable fin-ish time (so the performance of current node is included assumption). At the same time, speculatively executed tasks are not scheduled to the nodes known as the slow ones. The LATE’s approach has proved to be more effective on heterogeneous clusters than Hadoop’s standard scheduler.
Another unusual scheduling approach is used in DAGuE distributed DAG engine [15]. The engine takes the definitions of tasks in the DAG and tries to optimize the performance of the distributed system as whole by scheduling tasks with large amount of shared data as close to each other as possible. Two tasks can even share the same memory with respect to NUMA8 architecture if multiple cores sharing the same memory are available in cluster. The DAGuE transparently shares data across processors or even through network across nodes. An application in the DAGuE system is defined by a description of all tasks and the DAG in a special language and by the C-code for each task. Distributed scheduler analyzes the DAG and chooses which vertices of DAG to run on which resources. The work stealing is also per-mitted. The DAGuE engine performed better and was better scalable on Cholesky factoriza-tion than classical approaches, e.g., ScaLAPACK9.
Somewhat similar approach of data transfer optimization is taken by Adaptive Online Scheduler for Storm [46]. Two adaptive schedulers are proposed: topology-based scheduler and traffic-based scheduler. Topology-based scheduler examines the topology of the stream
pro-cessing application and makes the best to place receivers of data streams to the same physi-cal node as the emitters of that data. This is performed at the time of tasks’ scheduling. While the best placement of particular task is given by the topology itself, scheduler works with resources assigned by resource allocator, so the decision is always kind of compromise10. Traffic-based scheduler, moreover, takes into account the actual traffic performed between
each emitter and receiver. The placement to the same physical node is conditioned by higher traffic between the tasks. The traffic must be monitored through the run of the application and scheduling decisions are improved over time by rescheduling of tasks. It is worth noting that the traffic scheduler watches even the overall computational power of the node to avoid non-network bottlenecks when rescheduling. Performance results of both adaptive schedulers are slightly better than results of default Storm scheduler.
8 Non-uniform memory access.
9 ScaLAPACK—Scalable Linear Algebra PACKage, the MPI based framework for linear algebra
http://www.netlib.org/scalapack/
10 It is obvious that from the perspective of topology-based scheduler, the best placement of tasks is whole
topolo-gy on one physical node. This is caused by naïve decision perspective based only on network bandwidth optimiza-tion.
2.3
Benchmark-based Scheduling
For heterogeneous clusters, nevertheless, the crucial approach is the scheduling supported by benchmarking. Over the time, clusters are not homogeneous anymore; instead, they contain the different pieces of hardware. This is often caused by the fact that clusters are gradually enlarged by new nodes that are always the best on market. Even the stable size clusters are usually upgraded, often rack by rack. This way, the clusters became made of some older and some newer machines with differences in available memory but especially in a CPU perfor-mance [45]. Situation is getting even worse with growing interests into the GPU computa-tions where the cluster may contain only a limited number of nodes equipped by robust graphics cards.
Even thought, the schedulers use different techniques to divide the CPU performance (mostly just one core to one task or container), the differences of underlying hardware may be significant (performance of one core differs over CPU generations and obviously depends either on frequency). Heterogeneity causes many of the classical scheduling algorithms to be less accurate because the same computation takes different time on equal containers in clus-ter, e.g., bad performance of standard Hadoop scheduler on heterogeneous clusters described in 2.2.3 Interesting Approaches.
At the same time, different capabilities of nodes may, if correctly employed, bring even better results than homogeneous node deployments. Ren at al. in [47] discussed benefits of heterogeneous processors (e.g., processor with one or two fast cores and six or more slow cores). Besides improved power efficiency, this processor architecture allows for better re-sponse times in high loads on interactive systems such as websites because multiple slow pro-cessors simply can serve more low CPU intensive tasks than one fast CPU. Analogically, more nodes with slower CPUs should be faster in higher amounts of low CPU intensive com-putations over the fast streams of data than fewer nodes with fast CPUs.
The main idea of benchmark-based scheduling is two-way. First, the capabilities of each node of cluster in the means of CPU performance, network bandwidth, or permanent storage bandwidth are investigated. Afterward, the scheduler is able to make better decisions and can consider various tradeoffs based on knowledge of each node’s capabilities, e.g., to schedule the task far from its data because all nodes with necessary data are CPU over-utilized but with a free network bandwidth.
Now, we will go through the introduction to benchmarking methods where the basic
overview of the benchmarking and more specifically the execution time estimation problem will be given. After that, the works dealing with described execution time estimation methods
will be discussed.
2.3.1
Benchmarking
In the context of heterogeneity, the execution times of the same tasks are different across various nodes of the same cluster. For better scheduling decisions it would be useful to pre-dict the execution time with respect to specific node. Although it is a complex problem, be-cause different programs have different bottlenecks (i.e., some programs need mostly fast CPU and others require rather fast permanent storages), there are some suitable solutions. We are talking about the execution time estimation problem.
In the literature, there are three major classes of solutions to the execution time esti-mation problem: code analysis [48], analytic benchmarking/code profiling [49, 50, 51] and
statistical prediction [52, 53]. Code Analysis
In Code analysis, an execution time estimate is found through analysis of the source code of
the task. A given code analysis technique is typically limited to a specific code type or a lim-ited class of architectures. Thus, these methods are not very applicable to a broad definition of heterogeneous computing. [54]
Analytic Benchmarking/Code Profiling
Analytic benchmarking defines a number of primitive code types. On each machine, bench-marks, which determine the performance of the machine for each code type, are obtained. The code profiling then attempts to determine the composition of a task in terms of the same code types. The analytic benchmarking data and the code profiling data are then combined to produce an execution time estimate.
Analytic benchmarking/code profiling has two disadvantages. First, it lacks a proven mechanism for producing an execution time estimate from the benchmarking and profiling data over a wide range of algorithms and architectures. Second, it cannot easily compensate for variations in the input data set. However, analytic benchmarking is a powerful compara-tive tool in that it can determine the relacompara-tive performance differences between machines. [54]
Statistical Prediction Algorithms
Statistical prediction algorithms, the third class of execution time estimation algorithms,
make predictions using the past observations. A set of past observations is kept for each ma-chine, which are used to make new execution time predictions. The matching and scheduling algorithm uses these predictions (and other information) to choose a machine to execute the task. While the task executes on the chosen machine, the execution time is measured, and this measurement is subsequently added to the set of previous observations. Therefore, as the number of observations increases, the estimates produced by a statistical algorithm will im-prove.
Statistical methods have the advantages that they are able to compensate parameters of the input data (such as the problem size) and do not need any direct knowledge of the internal design of the algorithm or the machine. However, statistical techniques lack an in-ternal method of sharing observations between machines. By allowing observations to be shared between machines, the execution time estimate on a machine with few observations can be improved by using observations from machines with similar performance characteris-tics. [54]
2.3.2
Speculative Execution in Hadoop Revisited
Getting back to the problem of Hadoop’s speculative execution (2.2.3 Interesting Approach-es), we can see another solutions to this problems. Bortnikov at al. [25] presents machine learning approach for prediction of MapReduce tasks’ slowdowns. On Yahoo!’s large cluster was identified that the slowdowns of individual tasks are highly correlated with overall job latencies. The correlation is not direct, since mappers and reducers are typically executed in
multiple waves. However, significant task slowdowns tend to indicate bottlenecks in the job execution as well. The measurements on production data show that as many as 90 % of spec-ulative tasks are useless—they are launched too late to finish before the original stragglers, and end up being killed by the system. Proposed slowdown predictor gives for a given task-node pair 〈t, n〉 the estimation of slowdown. In this context, both the task and the node are
modeled as feature vectors assembled from MR-level and system metrics.
From perspective of benchmarking, the statistical prediction algorithm is used. As the
concept of MapReduce divide jobs to plenty of map tasks and a few reduce tasks where many of them are somehow the same (at least they have the same code), the patterns of similar tasks on the different nodes can be found even across various jobs executed on the same clus-ter.
Intuitively, two similar tasks are identified by the same “signature”. As an approxima-tion, sibling mappers or reducers in the same job with roughly the same input and output size are considered similar. More precisely, each task has a characterizing profile; it can be considered as a vector of attributes. Two tasks are similar if their profiles are similar. Along the profiles of tasks, the nodes have profiles too—nodes’ profiles are based on hardware con-figuration. Finally, machine learning is employed to build the prediction model from
accumu-lated logs and the profiles of the tasks and the hardware.
2.3.3
Real Benchmarks
Although the previous method falls into category of benchmarking, it does not perform any tests on the hardware itself. Instead, the nature of MapReduce tasks’ similarity is exploited. Different approach was taken by Gupta at al. in ThroughputScheduler [55]. With the
ThroughputScheduler, after each hardware or topology change the cluster is probed by series
of tests (note that it is assumed that such changes are pretty rare, so the testing can be done offline). Then, while running MapReduce jobs, the profiles of their tasks are learned online— without any disturbance of the computation. The Bayesian active learning scheme is split into two phases: learning and exploitation.
Fig. 7 Algorithm SELECTBESTJOB(NodeN; ListOfJobs), reproduced form [55]
To enable online learning the task completion time samples from actual production jobs are collected. With every new time sample, the belief about the resource profile of the tasks is updated. Statistical methods are utilized to decide the level of certainty, and when some predefined11 level is exceeded, the exploitation phase of learned knowledge can begin.
In exploitation phase, the new scheduler called SelectBestJob then according the
capa-bilities of each node and learned resource profile of job’s tasks enhance the performance of MapReduce on the heterogeneous cluster (Fig. 7). The main idea is to select from queue the
most compatible jobs for specific machine and run them first (Fig. 8). For example, consider
Nodes A and B, Node B is almost 7.5 times faster than Node A in terms of CPU but only 2.5 times faster in terms of disk. Hence, intuitively, disk intense jobs are better scheduled on Node A, since the relatively higher CPU performance of Node B is better used for CPU
tense jobs (if there are any). To account for this relativity of optimal resource matching, we normalize both jobs and machines to make their total requirements and capabilities sum to one for each resource (e.g., computing and storage).
Fig. 8 Algorithm THROUGHPUTSCHEDULER(Cluster, Request), reproduced form [55]
Although the performance of ThroughputScheduler is, especially on the heterogeneous
clusters, significantly better than the original fair scheduler’s, the problem is the same as with the statistical prediction algorithm from 2.3.2 Speculative Execution in Hadoop Revisit-ed—the solution is only suitable for MapReduce jobs.
2.4
Beyond the State of the Art
Current state of the art can be characterized as a slow adoption of new principles required by processing of massive data streams. Stream processing systems are getting into awareness of organizations with need for processing of big data and it can be said that the stream pro-cessing is cutting increasing piece of market where MapReduce-based data propro-cessing was used in past. At the same time, the reality of large clusters (even those based on clouds) shows that these are not homogeneous any more (see 2.2.3, 2.3 and 3.1). Benchmark-based
scheduling approaches are the remedy for heterogeneity of clusters.
Stream processing is still kind of novice among widely operated paradigms for big data processing; the most widespread paradigm is MapReduce. Current situation around stream processing systems is insufficient because all existing systems are primarily designed for oper-ation on dedicated clusters and the only way to use them on shared cluster is their incorpora-tion with the scheduling systems like Mesos or YARN (see 2.2.2). Although the connectors
for schedulers are often available, the problem remains since the stream processing is most suitable for a segment of jobs that are currently processed using the MapReduce paradigm and the MapReduce is mostly run without any other scheduling substrate. One of possible solutions is being brought by the new concept of Hadoop 2.0 (the most widely used imple-mentation of MapReduce paradigm) where the YARN scheduler is opened for other applica-tions.
There are multiple efforts to utilize the heterogeneity of clusters but most of them are targeted to MapReduce where, in Hadoop implementation, the problem of non-homogeneous clusters is already significant (see introduction of 2.3). Solutions to this problem of
and benchmarking (see 2.3.3) but none of them is directly suitable for the stream processing
paradigm.
On the other hand, the long running nature of stream jobs together with their resource requirement heterogeneity can be exploited for better utilization of heterogeneous clusters. Current situation, however, does not allow for such scenario because there is a lack of ad-vanced schedulers that are able to fulfill this potential of stream applications. Further re-search of benchmark-based scheduling algorithms has to be made. At the same time, many concepts already known from other fields of big data processing can be a good starting point to a resource aware scheduling capable of effective utilization of heterogeneous hardware.
Existing advanced scheduling approaches for MapReduce and stream processing pro-vide different effectiveness (see 2.2.3). Each one is suitable for specialized set of problems
where it may bring some advantage over standard schedulers. Unfortunately, these ideas are fragmented over multiple different systems (e.g., Hadoop or DAGuE) and there is no attempt to combine at least some of them into the single and thus more flexible scheduler. Bench-mark-based scheduling for stream processing systems, for example, may profit from combina-tion with topology-based or traffic-based scheduler from 2.2.3. The reason is that the sets of
problems are partly overlapping, i.e., all mentioned schedulers can utilize the data from per-formance monitoring together with information about resource capabilities found by bench-marking.
Deficiencies and possible improvements discussed in this section are the fundamentals for objectives of my work. Ongoing chapters describe in more detail the motivation of this work and show the ways leading to prototype of the heterogeneity aware scheduler for stream processing frameworks.
Chapter 3
Heterogeneity Aware Scheduler
3.1
Motivation
In recent years, the popularity of the MapReduce and Hadoop raised to the new levels. Al-most any subject dealing with so called “big data” uses the Hadoop for data processing. The popularity it received was mostly caused by the simple model of parallelization that provides a massively distributed processing of data with an easy and understandable interface. The interface is not limited only to MapReduce programming model but there are various tools on the top of MapReduce such as Apache Pig (scripting language) or Apache Hive (SQL like
query language). For increasingly diverse companies, Hadoop has become the data and com-putational agorá—the de facto place where the data and computational resources are shared
and accessed [40].
Recent trends showed that the batch processing on persistently stored data via Hadoop is not sufficient for a vast amount of applications. The reasons are obvious—storing of data is often unneeded and it prolongs both the time of computation over data itself, and the time from data arrival to results of the computation. Stream processing is the solution. Same way, it is natural to incorporate the stream processing into an existing Hadoop ecosystem because firstly, many clusters are already running Hadoop and will have to run the Hadoop in future,
and secondly, the interaction of data obtained by stream processing with another data cur-rently saved or processed in Hadoop clusters12 is inevitable.
First steps to make the connection between Hadoop and the stream processing were re-cently done. There are third party schedulers making the Hadoop clusters available to other data processing frameworks; the Hadoop itself now has dedicated scheduler called YARN, again extensible for new frameworks.
Although the stream processing is on rise, there is no progress in scheduling approaches to support the streams—especially the situation concerning the heterogeneous clusters is still unsatisfying and the new solutions have to be proposed. An adoption of the benchmark-based scheduling for wider classes of problems, such as stream processing, can be the right cure.
3.2
Intuitive Example
To better illustrate the scheduling problems anticipated in current state of the art in area of stream processing on heterogeneous clusters, in this section, the example application and its possible behavior are presented. We chose the Apache Storm as the stream processing sub-strate because of its partial integration into Apache Hadoop and its simple and