• No results found

In addition to supporting the 3 chosen cluster managers (Standalone, Mesos, YARN), and two different modes of resource allocation i.e. static and dynamic resource allocation, Spark provides many configuration parameters that control speculative execution of tasks, size of resources per executor, and the number of parallel tasks that can be created and executed for a giving job. This section provides a detailed description

Table 3.2: Cluster Resources

Node

Virtual Cores Memory Disk

Node Total Cluster Node Total Cluster Node Total Cluster

9 Worker Nodes 8 7 14 GB 13 GB 7.4 TB 7.4 TB

2 Worker Nodes 8 7 35 GB 34 GB 7.7 TB 7.7 TB

Totals 88 77 196 GB 185 GB 82 TB 82 TB

of the parameters chosen for the various set of experiments in this thesis, and how they are configured for comparison between running Spark applications on the three supported cluster managers.

3.2.1

Parameters to Control Resource Scheduling and Allocation

There are several parameters that control how static and dynamic resource allocation behaves. However, for this thesis only two parameters were used, namely spark.dynamicAllocation.enabled, which indicate whether to use static or dynamic resource allocation, andspark.shuffle.service.enabled, which allows safe removal of executors from applications without losing their shuffle data. Both of these parameters need to be set totrue

in order for Spark applications to use dynamic resource allocation.

3.2.2

Parameters to Control Number of Parallel Tasks Per Job

Spark provides several options for controlling the number of RDD partitions (and concurrent tasks) that can be created at any given stage. For applicatioins with stages that involve distributed shuffle operations, such as reduceByKey and join, the spark.default.parallelism configuration parameter can be used to control the number of partitions in RDDs returned by these operations.

In addition, Spark also provides fine grain control over the number of partitions at run time. By chang- ing the numPartitions variable in Spark transformations such as repartition, coalesce, and repartitionAnd- SortWithinPartition, the number of partitions in an RDD at any stage of a job can be controlled. Further- more, thespark.files.maxPartitionBytes configuration parameter can be used to define the maximum number of bytes to pack into a single partition when reading files, which will also control the number of RDD parti- tions. Since Spark creates one task per RDD partition, these parameters control the total number of tasks at any stage of an application. For the experiments in this thesis, spark.files.maxPartitionBytes was used to control the size of Spark RDD partitions (and the number of concurrent tasks) in the three benchmark applications.

3.2.3

Parameters to Control Speculative Execution of Tasks

Speculative execution of tasks is a feature of Spark that enables the detection of slow running tasks and re- scheduling their execution on different executors in order to prevent having failed or slow machines slowing

down the execution of an entire application. Slow running tasks are considered to be tasks running slower in a stage than the median of all successfully completed tasks in that stage according to configuration values. If a node that is running a task crashes, Spark will automatically re-run the task on another node; if the node is still active but slower than other nodes, however, Spark will not re-launch the task on another node by default.

Withspark.speculation set to true, slow running tasks in a stage may be re-launched on another node, and Spark would use the results of the task that finishes first. There are various parameters associated with speculative execution in Spark. These parameters includespark.speculation.interval which controls how often Spark checks for tasks to speculate, spark.speculation.multiplier which is used to control how many times slower a task is than the median to be considered for speculation, andspark.speculation.quantile which controls the fraction of tasks that must be complete before speculation is enabled for a particular stage. For the experiments in this thesis, only the spark.speculation configuration parameter was used to control speculative execution, all other parameters associated with speculation are set to default values.

3.2.4

Parameters to Control Resource Size Per Spark Executor

Two Spark configuration parameters,spark.executor.coresandspark.executor.memory control the number of CPU cores and memory per Spark executor process respectively. The value for spark.executor.cores is set by default to 1 in YARN mode, and all available cores on the worker node in Standalone and Mesos modes. The value of spark.executor.memory is set to 1 GB in all the three cluster managers by default.

Using the default values for these parameters means that only one executor per worker node (with 1 GB memory and all available CPU cores on the worker machine) are allocated to Spark applications by the Standalone and Mesos cluster managers. On the other hand, the YARN cluster manager would allocate multiple executors per worker nodes, however, the executors would use the minimum possible resource (that is 1 Core and 1 GB memory). Setting these parameters allows one to control the amount of resource per executor. This means multiple executors with appropriate resource size may run on a single worker node.