CloudEx coordinator and processor tasks can accept input and provide output as shown in Figure 5.4. Additionally, every processor task has a partitioning function that can also accept input and provide output. CloudEx provides the ability for users to define how the input for tasks and partitioning functions is populated. It also provides the ability for users to set the output of some tasks and partitioning functions as an input to others. This approach enables users to create a flow of tasks that are dependent on the execution of others and collectively perform a particular job.
5.2.1
The Job Context
Whilst executing a job, the coordinator maintains a job context, which is an in-memory collection of key-value pairs. The job context, illustrated in Figure 5.4, plays a central role in data sharing between the various tasks and partitioning functions. The coor-
Task1 Job End Taskj Taskn Taskj Taskj
T1 2 Job Context
T
Direct Input Metadata Input Output Partitioning Function Task
Keyx Valuex
Processors Processors
Keyy Valuey
Coordinator Task Processor Task
Keyz Valuez Coordinator Cloud Services T2 Metadata Processors Key k Valuek
Figure 5.4: CloudEx tasks input and output.
dinator populates the input for tasks and partitioning functions from the job context. Moreover the output of tasks and partitioning functions can also be saved back to it. The coordinator tasks can directly accept input from and provide output to the job context. Processor tasks can also accept input from the job context, however, this input is sent remotely to the processors through the metadata server. Partitioning functions for processor tasks are executed by the coordinator and hence have direct access to the job context. Consequently, these functions can directly accept input from and provide output to the job context. When defining tasks, values in the job context can be referenced as input or output for the tasks, by simply referring to them by their keys. This approach enables users to wire the output of some tasks as an input to others to create dependencies as will be explained in the following section.
5.2.2
Input and Output Resolution
When defining a CloudEx task, its input is defined as either constants or variable names. These variables’ names are used as keys to lookup a value from the job context. For example, if a task is defined to have the following key-value pairs as input:
{threshold:50, domain:example.com, table:#key1, file:#key2}
and the job context contains the following key-value pairs:
{key1:sample-data, key2:data.txt, key3:some-value, key4...}
The coordinator will treat all the values as constants, except those prefixed with #, those are treated as variables. In this example, the values for keys threshold and domain are treated as constants. However, for the other two keys table and file, the coordinator will remove # prefix and will then lookup key1 and key2 in the job context. These will resolve to sample-data and data.txt respectively and the task will be initialised with the following input data:
input = {threshold:50, domain:example.com, table:sample-data, file:data.txt}
The same approach is applied to the input of partitioning functions attached to pro- cessor tasks. Additionally, this approach is also used to resolve the input for processor tasks. However, this input is not directly populated by the coordinator, instead it is sent to the processor with the task metadata through the metadata server.
Output of coordinator tasks and partitioning functions can also be specified when defining tasks. The approach for handling the task output is slightly different from the approach described previously for input. For example, if a task has output keys
defined as {key5, key6} and the following output is generated when the task finishes executing:
output = {key5:value5, key6:value6, key7:...}
Then, only the key-values pairs for the two keys specified in the task definition are added to the job context, i.e. {key5:value5, key6:value6}, the rest are ignored. As mentioned previously this variable resolution and input binding approach using the job context enables users to create dependencies between the various tasks. Ad- ditionally, it enables users to create tasks that can dynamically determine and set the computational requirements for upcoming tasks in the job.
5.2.3
Handling Input
Each task can be provided with input arguments, which are stored by the coordinator the job context (discussed in Section 5.2.1). These arguments are provided to the task before it starts executing. Coordinator tasks can be directly populated from the job context as illustrated in Figure 5.4 for task T1.
The partitioning function for a particular processor task can also be provided with input arguments from the job context. This is illustrated in Figure 5.4 for parti- tioning function f2, attached to processor task T2. These arguments are used by the partitioning function to partition the task workload between a number of processors. Unlike coordinator and partitioning function input, which can be directly provided, processor input is provided remotely as metadata input as illustrated in Figure 5.4 for task T2. The CloudEx application (cx) running on the processor receives requests to execute tasks through the metadata server. cx will then initialise the task and use the relevant metadata key-values pairs to provide the task with its input values.
5.2.4
Handling Output
When finished executing, coordinator tasks can provide output as a collection of key- value pairs, this is illustrated in Figure 5.4 for task T1. These key-values pairs are added to the job context so they can be used as input to subsequent tasks. When a task is defined the user specifies which key-values pairs from the task output are to be added to the job context.
Partitioning functions for processor tasks can also provide an output to be added to the job context, this is illustrated in Figure 5.4 for partitioning function f2. This output takes the form of a list of items, the size of this list determines the number of processors to use for the task execution. Each item in the list is then distributed as task input to each one of the processors using the metadata server.
On the grounds that processor tasks are executed remotely in an embarrassingly par- allel fashion, they can not directly provide output back to the coordinator. Therefore CloudEx processor tasks do not provide any output arguments. When required to provide output, these tasks save their output directly to one of the cloud services (such as cloud storage), which can later on be accessed directly and aggregated by using a coordinator task.