TETTERTON, JAMES C. ParaFitter: Automatic Runtime Concurrency Con-figuration for Parallel Scientific Applications(Under the direction of Assistant Professor Xiaosong Ma).
Parallel processing is increasingly relied upon in many disciplines, but traditional parallel computing mechanisms and interfaces lack the inter-activeness that many scientists need in their data processing tasks. These facilities are often located off-site and only accessible via batch scheduling systems. In this project, we explore interactive parallel scientific computing in a distributed environment.
A dissertation submitted to the Graduate Faculty of North Carolina State University
in partial fulfillment of the requirements for the Degree of
Master of Science
Computer Science
Raleigh, NC
2007
Approved By:
Dr. Christopher Healey
Biography
James was born in Sanford, NC in 1977 and lived there until moving to Raleigh, NC to attend North Carolina State University. Due to his strength in math, he came to NCSU as a statistics major. After taking his first computer science class CSC114, an Introduction to C++, he knew programming was his real calling.
James decided to become a lab instructor for CSC114, under Dr. Jo Perry. This is when he discovered the enjoyment of teaching. He was a teaching assistant for CSC114 and CSC116 for 3 years during his undergraduate work. After graduating he was asked to teach the online section of CSC114 for another 2 years. After the course was assigned to another faculty member, he did not stop there. James then stepped back into his teaching assistant shoes for an online section of Operating System Concepts, CSC244/CSC246, and Introduction to Java, CSC257. Later he took advantage of the opportunity to create and co-lecture an online section of Programming Concepts - Java, CSC216, with Dr. Jo Perry. In addition to of James’s part-time teaching job, he has also worked full-time for SAS Institute as a software developer, since graduating in May of 2000.
and presentation of the results.
I am also thankful to my other committee members, Professor David Lalush and Professor Christopher Healey, for their time and assistance during my thesis research. In particular, I thank Professor Lalush for providing us with the application that both mo-tivated my thesis topic and resulted in an excellent test case, and spending a lot of time helping me with the details of the code.
I thank the system group at the Department of Computer Science for providing me with the cluster resources and technical support for my experiments.
Contents
List of Figures vi
List of Tables vii
1 Introduction 1
1.1 Motivation . . . 1
1.2 Sample Application . . . 3
1.3 Summary of Contributions . . . 5
2 Related Work 6 2.1 Grid Computing . . . 6
2.2 Adaptive Scheduling . . . 7
2.3 Master-Worker Parallel Programming . . . 8
2.4 Parallel Frameworks and Libraries . . . 9
3 ParaFitter Design 10 3.1 ParaFitter Overview . . . 10
3.2 ParaFitter Components . . . 12
3.3 Automatic Adaptation of Parallel Execution . . . 12
3.4 Thread Control Algorithm . . . 13
4 Implementation 15 4.1 Framework . . . 15
4.1.1 Example program breakdown . . . 17
4.2 Sample Application Transformation . . . 18
5 Performance Results 20 5.1 Testbed Configuration . . . 20
5.2 Distributed Word Count . . . 21
5.3 Sorting . . . 24
5.4 Biomedical Application . . . 28
List of Figures
1.1 Overview of the CT process [20]. . . 4
1.2 Sample output image from the biomedical application . . . 4
3.1 An overview of the ParaFitter environment and architecture. . . 11
4.1 Serial program breakdown . . . 18
4.2 C code of the original application . . . 19
4.3 Java code of what phase one would be in ParaFitter . . . 19
4.4 C code which is part of Figure 4.2 using ParaFitter as the loop . . . 19
5.1 Surface plot of distributed search benchmark results . . . 25
5.2 Line plot of distributed sort benchmark results . . . 27
5.3 Surface plot of Bitcone for phase one benchmark results . . . 33
5.4 Surface plot of Bitcone for phase two benchmark results . . . 34
A.1 The UML Class Diagram of the shared code between a server and the client 40 A.2 The UML Class Dialog the of server module . . . 41
List of Tables
5.1 Distributed search benchmark results . . . 23
5.2 Distributed search framework results . . . 24
5.3 Sorting benchmark results . . . 28
5.4 Framework results with the sorting application . . . 28
5.5 Bitcone benchmark for phase one . . . 30
5.6 Bitcone benchmark for phase two . . . 31
5.7 Bitcone framework for phase one . . . 31
Chapter 1
Introduction
1.1
Motivation
Powerful parallel computers are increasingly relied upon in scientific exploration, but these high-end facilities lack means of direct user interaction, such as display and mouse. Further, they are shared resources normally located off-site and accessed through batch schedulers. Therefore desktop computing will continue to be an indispensable part of high-performance computing workflows. By allowing direct interaction between end users and their programs/data, personal computers form a window by which people interface daily with high-end compute and storage facilities. Even the heaviest supercomputer users perform part of their tasks, such as data visualization, on personal computers.
However, with the growing speed of high-end supercomputers and instruments (e.g., the aggregate FLOPS supplied by the world’s No.1 supercomputer has increased by 800 times in the past 10 years [26]), and consequently the rapid growth of data volumes generated by these systems [15, 18, 27], it is difficult for individual PCs to catch up even with the highest configuration. As a result, limited throughput in interactive (as opposed to batch-mode) desktop processing often compromises people’s productivity in using powerful remote systems and slows down scientific exploration cycles.
processing environment. With most high performance facilities a programmer or application user would need to submit a job in a batch queue. The batch processing mode loses the interactive operations critical or desired in many applications and decreases scientists’ overall productivity. In fact, sometimes a job may wait longer than its execution time.
Parallel computing in the desktop environments has been facilitated by the technol-ogy development from two aspects. Recently, interfaces such as the Google MapReduce [10] have been proposed to assist parallel data processing. This work shows many real world tasks can be expressed withmapfunctions for iterative processing on sets of objects and re-ducefunctions for merging intermediate results. In less than two years Google programmers created over 900 instances of MapReduce codes. Meanwhile, with distributed job execution systems such as Condor [23] and Entropia [3], people can utilize idle resources on local workstations to run parallel jobs.
The parallel execution scalability of data-processing applications, however, varies from program to program. The maximum speedup achievable depends on the problem nature, the parallel algorithm used, the computation-communication ratio, as well as the hardware settings. For example, many MapReduce codes the “reduce” stage may become a serialization point that limits the overall program throughput and the maximum speedup. This makes it challenging to determine the appropriate amount of resources (often in terms of the number of processors/nodes) to request. While requesting too few processors fails to exploit the available parallelism, requesting too many results in low parallel execution efficiency as well as low resource utilization.
1.2
Sample Application
Our research has been motivated by a computed tomography (or CT) recon-structed imaging application [22, 32] authored by Lalush et al. from Dept. of Biomedical Engineering at North Carolina University. This application demonstrates the target audi-ence ParaFitter is designed to help: scientists who run computation and/or data-intensive codes in a desktop environment interactively, and desire better application performance without going to batch processing or parallel computers.
CT scans are conducted using X-rays, which are produced from a source that rotates 360◦ around a subject. The X-rays then pass through the body of the subject and are partially absorbed by the body tissue detectors opposing the X-ray source. In this particular application one such 360◦ pass around the subject a slice, composes a thousand images. These measurements are fed into a computer, that takes the images and creates one 2D image.
Figure 1.2 shows one slice from a CT scan. Figure 1.1 displays how the process is done. Our sample application is the program used to merge the thousands of images into one clear 2D image.
This application uses an iterative algorithm that has two main parallel phases. The first phase generates a number of normalization files based upon the input images. For example, in one demonstration run there are seventeen partitions of the image, which will produce seventeen normalization files. This area is simple to parallelize because it composes a forloop that iterates over the partitions without inter-iteration dependency.
The next step of the application is a short preparation step that does not require parallelization, but must be done before the second phase starts. It computes the step size to be used in the next phase, from the data generated by the first phase.
Figure 1.1: Overview of the CT process [20].
Figure 1.2: Sample output image from the biomedical application
produced without any significant impact on the image quality.
code to deploy the most suitable level of concurrency, as this optimal concurrency heavily depends on the interaction of the program code and the underlying hardware and operating system. Consequently, the optimal concurrency may change dynamically when the code is ported to a different machine or there is a change in system resources. The approach explored in this thesis, however, tries to capture the optimal level of concurrency based on runtime observations, and enable individual executions of a parallel data processing program to adapt to the runtime system and resource configurations.
1.3
Summary of Contributions
We consider the major contributions of this thesis project as follows:
• We designed and implemented ParaFitter, an autonomic framework for adaptive par-allel computation in distributed environments. ParaFitter dynamically adjusts the level of concurrency used in an application’s parallel execution, according to runtime performance observations.
Chapter 2
Related Work
2.1
Grid Computing
Grid computing [13] is a classification of architecture that enables the sharing of resources across administrative domains. It gives the benefit of clustering large numbers of heterogeneous resources together for the computation performance. Grid computing, though, can be more difficult with traditional parallel programming environments. There needs to be some balance between the Grid and cluster computing environments.
Google introduced an interface called MapReduce [11]. This project is the closest design to what we have done. MapReduce contains two functions: map and reduce. The
map function will associate a key to a value, to generate a set of key/value pairings. The
reducefunction is called as the mapfunction finishes. An example given in the paper was counting the number of occurrences of a word in a large document. Themapfunction would generate each word plus an associated count of the time the word appears in the document. The reduce function would then sum all of the counts together. Each instance of the
map function is distributed around to many nodes, reading input files work and write an output file for the reduce function. The reduceworkers then read those output files and consolidate results for the common keys as specified by thereducefunction. Applications written with the MapReduce interface may utilize ParaFitter to find the more resource-efficient configurations for their distributed processing.
A programmer would write the map and reduce functions and pass them into a
equipped to handle this type of demand, the paper describes that there are 1800 machines for MapReduce. Load balancing and scheduling is not much of a concern when a job only needs one percent of the whole Grid.
MW [16] gives a very similar framework to the one discussed in this thesis. The breakdown of the program is in C++ which will not give a user the ability to take advantage of heterogeneous environments. Our system was designed for the purpose of finding the most idle workstations, no matter what operating system or architecture an idle machine may have. Having our system written in Java gives us access to more idle workstations, which will make up for some short comings Java may have with computational intense applications.
Condor takes advantage of idle and lightly used workstations around a network. Like us, each machine in the network could run a task from another user. This task is run in the background with the desire not to disturb the workstation for the user to destroy any remotely executing job. At the beginning of execution the Condor system assigns one machine within Condor to be the master to all other workers. This master will farm out all jobs to other machines. Results are then returned to the master and resolved. The most recent version of Condor [24] has several ways of executing a parallel application, using MPI, PVM, or master-worker. The MPI environment as described before requires the understanding of how to write a MPI program, same applies to PVM programs.
Entropia [7] is an example of a desktop grid environment advertised as an enterprise grid. Its main bonus is the ability to execute programs within a sandbox. A sandbox is a safe execution environment for the application, where the machine the job is executing on is not in danger of malicious code. The machine is safe from the program. Entropia add in the sandbox by re-writing the binary and using a virtual machine to execute the binary. This provides good safety for the user but lacks flexibility to the user and compiler. The parallelization is left up to Entropia to discover. Discovering parallelisms within a nonparallel written program is a difficult task to do.
2.2
Adaptive Scheduling
job at hand. Piranha is designed using a master-worker architecture, where the master is not the client computer unlike in our design. Piranha therefore does not take advantage of the one machine of most importance, which is fully committed and often well configured for intensive jobs.
DAGMan [8] (Directed Acyclic Graph Manager) is a meta-scheduler for Condor. It organizes inherent dependencies within a job. Directed Acyclic Graphs, or DAGs, are used to discover what input a process has, and what output would be generated that are needed by other tasks. This is built into our programming framework by our interface design. There is an order defined within ParaFitter that gives us the sequence of execution, outside of the parallel execution framework.
2.3
Master-Worker Parallel Programming
The master-worker model for parallel programming has been around for many years. This execution paradigm is easily adaptable to grid and cluster computing and is very attractive for distributed task execution.
Several of the systems mentioned earlier, such as Piranha [14], MW [16], and Condor [23], follow the master-worker execution model. The major difference between these systems and ParaFitter lies in the fact that with the traditional design, the master node is typically a separate, dedicated manager node, while with ParaFitter, the client node where scientists submit their jobs act as the master.
a dedicated master node caused by centralized job management.
2.4
Parallel Frameworks and Libraries
One of the more thorough and yet very complex libraries is HPC++ [21]. HPC++ is a C++ standard template library that provides an extension to C++ for parallel program-ming. The system contains an environment that can use remote machines with CORBA [29]. It does implement several interesting features such as global pointers and global references. HPC++ provides numerous classes to support parallelization of applications. One such example featured in the paper is the multidimensional array class. The application does not perform any configuration of the parallel execution and the number of processors it executes on is static. The library itself is not designed for scientists, but for well-trained parallel programmers.
The Linda [5] framework provides an easy to use design that has been implemented in several programming languages, such as C, Fortran, C++, and Scheme. Linda provides an extension to these languages to describe the layout of a program as a tuple space, then Linda will execute the application in parallel over a shared memory and distributed machines. Each machine will then start a process and wait until a message is sent to the machine, then it will respond. Linda is written as a message passing system. This can provide interprocess communication, but the implementation is more complex then ParaFitter. Linda, along with others, does not take advantage of any local resources and is submitted in a batch manner.
Chapter 3
ParaFitter Design
3.1
ParaFitter Overview
We target the scenario where an application user has colleagues who are willing to give up some of the idle CPU cycles of their desktop computer. Some colleagues may also allow jobs to be run while they are using their machines, as long as the job does not interfere with their work. This way resources can be shared among groups of users without the cost of purchasing or managing a cluster of computers. Also, application user will not need to be concerned with waiting on a job queue.
Long running scientific applications typically contain loops executing over a large array of data, where the iterations are mutually independent. This would give a program the ability to run multiple iterations at one time. There can be multiple “parallel regions” within a program. The program may branch out into multiple parallel processes and reduce their results, then take the output of the reduction as the input to another group of parallel processes.
To maintain the interactiveness of parallel job execution, we must have a flexible scheme that is able to execute a parallel application using any amount of given resources, instead of waiting for a certain amount of resources (e.g. 16 processors) to become available. Therefore, we designed ParaFitter to work in a client-server model where the client can perform the computation task if necessary. Similarly, the parallel execution using external nodes (donated resources) is done using a master-worker model, which may easily scale up to any given number of workers.
Fig-ure 3.1, once the application has been started, the first level of parallelism occurs by allowing server and client tasks to be executed concurrently. The server thread controller and client thread controller are handling the execution of a data collection in different ways. The server thread controller needs to send input data to, execute tasks at, and receive output data from a remote server. The client thread controller initiates task execution without communication since all of the data are already stored locally.
Start job Job initializer
Collection of data and code Client thread controller Server thread controller Sending data Returning result Executing code Executing code
using local threads
Client/Master Node
Workstations
Idle Machine Idle Machine
Figure 3.1: An overview of the ParaFitter environment and architecture.
The client node runs the master module of ParaFitter, where much of the complexity of ParaFitter resides, as shown in Figure 3.1.
The master module takes an application that is written or adapted to use ParaFit-ter inParaFit-terfaces to indicate parallel parts, and the data associated with each piece of compu-tation work. It breaks the application execution down to parallel tasks, and handles the task/data distribution as well as result collection/merging process.
Each server node runs the server module of ParaFitter, which is relatively sim-ple. The server module runs constantly when a server machine is volunteered to provide computation service through ParaFitter. An idle server waits for requests from any client machine. Upon receiving one, the server runs the task and returns the results back to that client.
To ensure ParaFitter works correctly and reliably in a distributed environment, especially with resources that come and go dynamically and multiple user jobs executing simultaneously, the server and client modules must be equipped with additional function-alities. It needs to carry out distributed resource and job management and information service, ensure fault tolerance with job status monitoring and task resubmission, and coor-dinate the resource partitioning among multiple concurrent jobs. For each problem, there exist mature techniques that we can leverage in ParaFitter. These topics, however, are considered beyond the scope of this thesis.
3.3
Automatic Adaptation of Parallel Execution
par-allel execution. Second, this model allows the execution to go on even when no external resources are available. ParaFitter determines the maximum number of parallel tasks that should be created on the local machine and the servers.
We start the client off with two thread controller threads, one managing the client workload and the other handling the server workload. Each of these two controller threads use the same thread control algorithm. It is important to note that once a thread is created, it does not terminate until all the work has been completed. Created threads will be reused to run multiple tasks. Comparing with creating one new thread for each individual task, this approach reduces the thread creation/cleanup overhead.
The thread controlling is performed dynamically, according to runtime perfor-mance observations, to adapt to individual workloads and underlying systems. The intu-ition here is to increase the level of concurrency until we observe no further speedup. The thread controlling program is presented in the next section.
3.4
Thread Control Algorithm
Each ParaFitter thread controller experiments with gradually increasing the level concurrency, simultaneously at the client and the server sides.
A thread controller starts with one thread and assign one work unit (task) to this thread. Once this thread has finished its task, it signals the thread controller about the completion. The thread controller records the system throughput during this interval where only one thread is used, in terms of the number of tasks completed within the unit time. The thread controller then increases the number of concurrent threads to two, and repeat the same throughput calculation when both threads finish their work, for the second time interval. The thread controller continues to increase the number of threads until no more improvement in throughput is observed.
More formally, we define the throughput for the time interval i as Ti = timetaskii,
where taski is the number of tasks during time interval i (which should be equal to the
number of threads during this interval), andtimei is the length of interval i.
Chapter 4
Implementation
4.1
Framework
ParaFitter composes of three modules:
• Shared, which contains the shared objects between the client host machine and the server machines. These classes are also the ones that a programmer uses to parallelize a program. Figure A.1 shows the UML Class Diagram of the shared objects.
• Server, which handles remote task execution on server machines. Figure A.2 shows the UML Class Diagram of the server objects.
• Client, which handles the thread scheduling, communication, and task execution of programs. Figure A.3 shows the UML Class Diagram of the client objects.
An easy-to-understand framework is required for scientific application program-mers. ParaFitter assumes the existence of loops in computation-intensive codes. A user first needs to break loop bodies away down toIWorkobjects, whereIWorkis the basic unit of execution and scheduling. This work object has only one methodexecute(IData data,
IShareData shareData). This method is passed two arguments. One is the data that it will process, and the second is any static data theIWorktask will need. AnIDatais a very simple interface that allow a programmer to encapsulate any object with any data theIWork
of working with the framework. An IDataCollection has work associated with it, so there is a method called IWork getWork(). If there is any data that is the same for all iteration of the task, then it will be associated with the ISharedData getSharedData(). Having this method will reduce the amount of data sent to any remote machine. Instead of storing multiple copies of the same data in an IData reference, it should be stored in the
ISharedData getSharedData().
The IDataCollectionhas two methods, calledstartOfExecution()and
endOfExecution(), which will be executed before and after the parallelized collection has begun or completed, respectively. The methodstartOfExecution()gives the programmer the means to setup the loops, by reading from a file or calculating something from a pre-viousIDataCollection. Respectively, endOfExecution()would give the programmer the opportunity to write to a file or prepare the result of this IDataCollection for the next iteration.
The core methods of IDataCollection, getParallelType(), reduce(IData),
IData getDataIndex(int i), andsize(), are used in the paralleled execution of the col-lection. ThegetParallelType()returns one of three possible values,LocalOnly,ServerOnly, or LocalAndServer. These values indicate what type of parallel execution should be exe-cuted. LocalOnly is for parallelization only on the client machine. ServerOnly describes execution on only remote machines. None will be executed on the client.
LocalAndServer will utilize both the local and remote machines simultaneously, probably the most common attribute.
Thesize()method is a simple method that returns the number of IDataelements there are to execute. IData getDataIndex(int i) will return one IData object to be executed,iwill be greater than or equal to zero and less thansize(). Thereduce(IData)
method is called once the data has completed executing, for the data collection to consolidate the result from anIData object.
of the program. For example, if all of the IData objects were allocated at the start of the execution of an IDataCollectionperhaps usingstartOfExecution(), then there will be less memory for the execution itself. These methods give ParaFitter the ability to only request the objects on demand when the data is actually going to be executed. Once a result has been returned then its answer can be reduced and memory returned, instead of holding onto the memory until the entire collection of data is finished, and callingendOfExecution
to perform the reduction.
The scientist will need to write a very simple class to get the application started initially. First, the scientist will implement an IClient interface on a class. This class will need three methods, Prepare(), Finish(), and getParallelizeData(). Prepare()
will be called at the very beginning of the application for any initial file reading, pars-ing of command line options, or any preparation that can not be done in parallel, but must be done at the beginning of the application. The final step is the Finish() method that combines any remaining results or writing to an output file. The last method is
getParallelizedData(). This method will return an array of IDataCollection objects that need to have a barrier in between each element. Remember, as discussed in the above section, that anIDataCollectionhas oneIWorkassociation which executes over an array of IData references. In most cases each IDataCollection will have different IWork ob-jects (but not in all cases). The reason for this flexibility would be for when a scientific application has several parallel tasks, but with different loops bodies and different data.
4.1.1 Example program breakdown
To better illustrate this process, Figure 4.1, shows an example of a sequential program and how it would be broken apart into the described framework. The goal of this code is to add all elements in a two-dimensional array.
Figure 4.1: Serial program breakdown
4.2
Sample Application Transformation
This application was written in C. With it we demonstrate that an existing non-Java program can fit into non-Java-based framework. As shown in Figure 4.1, if we modify the program to only have the data and work portions of the figure, we can plug in a standalone C program, which is to be executed list[i].lengthtimes.
This scheme allows existing C programs to use ParaFitter by only making minor changes. This is particularly desirable to scientific applications, since C has better perfor-mance over Java in scientific computing. The only Java code needed to be written to use ParaFitter would be for theIWork interface, which simply, executes the C program with a command line argument. An example of the Java code is shown in Figure 4.3. The program
writenormis a C program that writes one normalization image. It takes its input from the command line parameter, stored in the variable setnum. A portion of the original C code that constructed phase one is shown in Figure 4.2.
19
/* For each subset */
for (setnum = 0; setnum < numsets; setnum++) { /* For each view in the current subset */
for (numinset = 0; numinset < views_per_set; numinset++) { /* For each camera */
for (camera_num = 0; camera_num < num_cameras; camera_num++) { /* Backproject the set of ones */
backproject_one_view_atten(
camera[camera_num]->projections + viewnum * projydim * projzdim, image_error, angle, viewnum, camera_view);
} }
/* Write this subset to disk */
write_image(image_error, xdim, ydim, zdim, "norm_image." + setnum); }
int setnum = atoi(argv[1]);
/* For each view in the current subset */
for (numinset = 0; numinset < views_per_set; numinset++) { /* For each camera */
for (camera_num = 0; camera_num < num_cameras; camera_num++) { /* Backproject the set of ones */
backproject_one_view_atten(camera[camera_num]->projections + viewnum * projydim * projzdim, image_error, angle, viewnum, camera_view);
} }
/* Write this subset to disk */
write_image(image_error, xdim, ydim, zdim, "norm_image." + setnum);
public class WriteNormilazationFiles implements IWork { public void execute(IData id, ISharedData sharedData) {
WriteNormilazationFilesData d = (WriteNormilazationFilesData)id;
Process p = Runtime.getRuntime().exec(new String[] {"writenorm", Integer.toString(d.setnum)}); p.waitFor();
} }
Figure 4.2: C code of the original application for (numinset = 0; numinset < views_per_set; numinset++) {
/* For each camera */
for (camera_num = 0; camera_num < num_cameras; camera_num++) { /* Backproject the set of ones */
backproject_one_view_atten(
camera[camera_num]->projections + viewnum * projydim * projzdim, image_error, angle, viewnum, camera_view);
} }
/* Write this subset to disk */
write_image(image_error, xdim, ydim, zdim, "norm_image." + setnum); }
int setnum = atoi(argv[1]);
/* For each view in the current subset */
for (numinset = 0; numinset < views_per_set; numinset++) { /* For each camera */
for (camera_num = 0; camera_num < num_cameras; camera_num++) { /* Backproject the set of ones */
backproject_one_view_atten(camera[camera_num]->projections + viewnum * projydim * projzdim, image_error, angle, viewnum, camera_view);
} }
/* Write this subset to disk */
write_image(image_error, xdim, ydim, zdim, "norm_image." + setnum);
public class WriteNormilazationFiles implements IWork { public void execute(IData id, ISharedData sharedData) {
WriteNormilazationFilesData d = (WriteNormilazationFilesData)id;
Process p = Runtime.getRuntime().exec(new String[] {"writenorm", Integer.toString(d.setnum)}); p.waitFor();
} }
Figure 4.3: Java code of what phase one would be in ParaFitter
/* For each subset */
for (setnum = 0; setnum < numsets; setnum++) { /* For each view in the current subset */
for (numinset = 0; numinset < views_per_set; numinset++) { /* For each camera */
for (camera_num = 0; camera_num < num_cameras; camera_num++) { /* Backproject the set of ones */
backproject_one_view_atten(
camera[camera_num]->projections + viewnum * projydim * projzdim, image_error, angle, viewnum, camera_view);
} }
/* Write this subset to disk */
write_image(image_error, xdim, ydim, zdim, "norm_image." + setnum); }
int setnum = atoi(argv[1]);
/* For each view in the current subset */
for (numinset = 0; numinset < views_per_set; numinset++) { /* For each camera */
for (camera_num = 0; camera_num < num_cameras; camera_num++) { /* Backproject the set of ones */
backproject_one_view_atten(camera[camera_num]->projections + viewnum * projydim * projzdim, image_error, angle, viewnum, camera_view);
} }
/* Write this subset to disk */
write_image(image_error, xdim, ydim, zdim, "norm_image." + setnum);
public class WriteNormilazationFiles implements IWork { public void execute(IData id, ISharedData sharedData) {
WriteNormilazationFilesData d = (WriteNormilazationFilesData)id;
Process p = Runtime.getRuntime().exec(new String[] {"writenorm", Integer.toString(d.setnum)}); p.waitFor();
} }
Chapter 5
Performance Results
5.1
Testbed Configuration
Although our ultimate goal is to address the parallel computing problem in an interactive and heterogeneous desktop environment, as the first step of the project we utilized traditional clusters in prototype development and evaluation. This is necessary before we have a robust, deployable version of the framework and parallelized applications.
The cluster has 16 nodes, each with the following configuration:
• CPU: Two dual-core AMD Opteron 265s
• Motherboard: Tyan B2882-D
• RAM: 2GB PC3200 DDR ECC
• Disk: Seagate ST3808110AS 80GB 7200rpm SATA
• Network: Two onboard gigabit NICs, one onboard 100BT NIC (switch is 100BT)
• Removable media: Slimline IDE CDROM, 1.44MB diskette drive
thread controlling is based on runtime performance measurement and will easily accommo-date the heterogeneity in desktop environments.
In evaluating ParaFitter, we used both synthetic test cases and a real-world ap-plication. ParaFitter was used to dynamically control the level of concurrency for each of the test programs. In the following sections, we report and discuss the performance results for these test programs.
5.2
Distributed Word Count
We used a simple version of a text search problem as an example of embarrassingly parallel data processing applications. We searched the Old and New Testaments of the Bible [19] for a list of the 2000 most commonly used words in the English language [28]. For each of the 2000 words, the output is the total number of times each word appeared in the Bible. The Bible file is sized at 4.2 MB and contains 820,744 words.
This program can be parallelized in different ways. In our experiments, we have each task compare one word against the Bible. In other words, we partition the keywords to be searched and replicated the Bible. The Bible is considered read-only since it never changes, and therefore will only need to be sent once to the remote machines.
Each of the 2000 most commonly used English words are represented in an IData
object along with a local count. The IWork contains a small loop comparing each word in the Bible to the current search keyword of the IData, and updates the local count. As the results are returned, theIDataCollection.reduce()method is called, at the master node. This method adds the local counts to get the global result.
To test the capability of ParaFitter to automatically identify an application’s ideal level of concurrency, we performed an exhaustive benchmarking experiment. We did this by measuring the performance of the application with all the static combinations of client thread numbers and server thread numbers. With our testbed of 16 compute nodes, we set the range of the number of server threads from 1 to 16 and used the same range for the number of client threads. Table 5.2 gives a detailed breakdown of the benchmarking results. To save space, we only show a small number of sample rows.
a configuration of one client thread and zero server thread, gives the sequential execution time of the application.
Table 5.2 shows results from forty executions of ParaFitter on the distributed search. The same information is in this table as in Table 5.2, but you will notice a new variable, “Frequency” Because ParaFitter performs runtime adaptation, its decision making depends on the dynamic behavior of the application program, which may leads it to take different actions during multiple runs of the same program even with exactly the same input. Its different actions, in turn, will generate different impact on the runtime system and cause further deviation in program behavior. Therefore it is not surprising to see ParaFitter reaches different final configuration for the same program/input combination. The “Frequency” field for each configuration records the number of times ParaFitter selects this specific configuration as the optimal one.
Table 5.2 shows that with the highest frequency, ParaFitter picks the configuration using 11 client threads and 16 server threads as the optimal setting. This yields an average total execution time of 33 seconds, which includes the training cost, where ParaFitter gradually increases the number threads. Although this result does not match the best benchmarking results (which is about 21 seconds), overall ParaFitter is able to achieve a good speedup and deploy all 16 servers we have in our testbed automatically
Table 5.1: Distributed search benchmark results
Number Number Average Average Average Average Average of of Client Client Server Server Total Client Server Executed Execution Executed Execution Execution Rank Threads Threads Tasks Time (s) Tasks Time (s) Time (s)
1 2 16 150.0 19.919 1964.00 19.957 21.075 2 3 16 203.4 20.286 1910.80 20.297 21.389
3 1 16 77.4 20.433 2036.60 20.491 21.649
. . .
91 16 16 485.0 30.867 1629.20 30.637 31.966 . . .
101 6 9 377.6 31.657 1736.40 31.593 32.802 102 15 12 486.6 31.752 1627.40 31.567 32.858 103 13 13 501.2 31.970 1613.00 31.871 33.017 104 16 13 500.8 32.282 1613.60 32.054 33.352 105 11 10 492.6 32.299 1621.40 32.132 33.378
. . .
268 7 0 2114.0 182.043 0.00 0.000 183.151
269 3 0 2114.0 183.288 0.00 0.000 184.484
270 2 0 2114.0 238.893 0.00 0.000 240.050
271 1 1 1043.8 247.108 1070.20 247.068 248.389
1 8 16 1 604.00 25.5654 1510.00 25.6706 26.4576 2 25 11 1 658.00 26.3690 1456.00 26.3388 27.2068 3 9 16 6 777.17 27.0480 1337.00 27.0673 27.9919 4 19 10 2 656.50 27.6457 1457.50 27.7412 28.5694 5 10 16 6 656.83 27.9229 1457.50 27.9100 28.7601 6 21 15 1 779.00 28.3017 1335.00 28.1923 29.1134 7 13 16 2 782.50 29.8517 1331.50 29.7925 30.7168 8 25 13 1 691.00 30.0868 1423.00 30.0175 30.9518 9 20 12 1 809.00 30.0464 1306.00 30.1149 30.9654 10 25 12 1 738.00 29.9971 1376.00 30.0595 31.0047
11 11 16 8 826.38 32.2546 1287.88 32.2307 33.0833
12 9 12 1 805.00 32.5333 1309.00 32.5920 33.5326 13 14 16 3 983.00 32.8808 1131.00 32.8319 33.7135 14 21 13 1 785.00 32.9051 1329.00 32.9290 33.7141 15 12 16 7 830.86 33.1603 1283.14 33.1295 34.0116 16 25 14 1 796.00 33.4471 1318.00 33.4714 34.2921 17 16 16 3 863.67 34.2970 1250.33 34.3470 35.1735 18 23 16 1 860.00 34.6278 1254.00 34.6956 35.5550 19 23 15 1 948.00 37.9829 1166.00 37.9339 38.7886 20 21 16 3 991.00 38.1473 1123.00 38.0909 39.0001 21 17 16 1 931.00 38.4961 1183.00 38.3974 39.3110
. . .
5.3
Sorting
Figure 5.1: Surface plot of distributed search benchmark results
In our experiment, we sorted ten million records, each record 10 bytes long. We used two bytes for bucketing, splitting the character range of the first byte of the key into 95 buckets and then that of the second byte into 2 buckets, giving us a total of 190 buckets (or units of work).
Each task will be given a bucket identifier, used to identify the records belonging to its bucket. Once the task has gathered all the records for its bucket, it will execute Java’s built-in sort algorithm. Once the bucket has been sorted, it is returned to the sorted list. The sorted list is not written back to disk, as we focus on the in-memory sorting process.
applications benefits more from using ParaFitter. Because deploying tasks to server does not help improve the overall performance, we reduce our benchmarking and optimization space by using client threads only. Our results show that ParaFitter still finds a good configuration in this reduced configuration space.
Table 5.3 shows the average of five benchmarking executions of our sorting program using different numbers of client threads. We can see from this table that using 16 local threads gives the best execution time. However, the speedup appears to have leveled off after the number of threads increases above 3. This demonstrates that this implementation does not have good scalability.
Table 5.3 shows ParaFitter results, with highest frequency, that 6 client threads are best. ParaFitter did not find the goal of 16 client threads, but was able to perform the execution in less time then the goal of sixteen client threads.
Looking into this apparent anomaly, we were able to determine that this appli-cation has an interesting characteristic. In the benchmarking tests n threads were created all starting at the same time. Which causes heavy contention for the CPU cache and the memory system. ParaFitter, on the other hand, does not suffer from this problem because of the learning nature of this framework. It will start with only one thread, then increase the number of concurrent threads over time. With a gradual increase of load and concurrency, multiple threads work with an interleaved pattern, resulting in better throughput.
We have experimented with a small modification to the benchmarking code to help prove this hypothesis. Once a thread is started, it will first sleep for a random amount of time, between 0 and 5 seconds. In having this minor staggering of code execution, we observed an increase in throughput, although each thread is actually delayed by the artificial slack we inserted.
11 75.624 15 76.735 9 77.92 6 78.044 12 78.194 7 78.515 10 78.615 8 79.25 5 79.941 4 80.264 14 80.291 3 81.434
13 82.8
2 111.563 1 206.69
Table 5.4: Framework results with the sorting application
Number Average
of Total
Client Execution Rank Threads Frequency Time (s)
1 12 5 59.2829
2 10 11 59.4279
3 6 19 60.2450
4 8 17 60.4316
5 11 1 60.6351
6 7 7 60.6395
5.4
Biomedical Application
Next, we discuss results from the real-world biomedical imaging application de-scribe in Section 1.2.
at the data consumed and generated by the application. The input for this application is complex and difficult to describe without related domain knowledge, it suffices to say that it contains a blank image scan of 40.2 KB, an initial estimate image of 8 MB, a projection data file of 11.3 MB, and a matrix file of 39.4 MB. All of these files are used as inputs to produce one 8.0 MB output file, which contains the final estimate from the bit-cone scanning device. (as pictured in Figure 1.2)
Table 5.5 shows the benchmarking results for phase one. It indicates that using 6 client and 10 server threads forms the best configuration for this part of the application. Table 5.4 gives the ParaFitter results. We can see that it adopts the configuration using 7 client and 2 server threads most frequently. The corresponding static configuration only ranks 121 in the benchmarking tests, and the execution time using ParaFitter falls around ranking 136.
Note that in Table 5.5 and Table 5.6, there are only 144 observations. This is because we only executed with even numbers of servers threads due to the relatively long execution time per experiment (each benchmark configuration was execute three times).
The ParaFitter performance with phase one is not satisfactory. We believe the reason is due to the very limited scalability and performance sensitivity of the I/O system in our testbed. During this phase of the application run, our test program writes 40 inter-mediate files in parallel that are needed for phase two. Our test cluster is equipped with NFS (Network File System), which is notoriously vulnerable performance wise with parallel writes. The performance fluctuation caused by I/O contention severely affected ParaFitter’s decision making and caused the large variance in ParaFitter’s final configurations, as well as the gap between its findings and the true optimal configuration.
of of Client Client Server Server Phase One Client Server Executed Execution Executed Execution Execution Rank Threads Threads Tasks Time (s) Tasks Time (s) Time (s)
1 6 10 12 46.881 28 35.766 81.545
2 13 16 13 50.685 27 25.944 84.988
3 12 16 12 46.638 28 28.473 86.347
. . .
120 4 2 22.3333 128.58 17.6667 109.929 166.12 121 7 2 24.6667 133.724 15.3333 96.744 167.364 122 8 2 25.3333 133.583 14.6667 93.943 168.206
. . .
135 4 0 40 156.291 0 0 186.481
136 10 0 40 156.241 0 0 188.242
137 16 0 40 160.148 0 0 191.012
. . .
142 1 2 16.6667 217.429 23.3333 209.558 262.204
143 2 0 40 266.167 0 0 289.53
Table 5.6: Bitcone benchmark for phase two
Number Number Average Average Average Average Average of of Client Client Server Server Phase Two Client Server Executed Execution Executed Execution Execution Rank Threads Threads Tasks Time (s) Tasks Time (s) Time (s)
1 4 10 12 243 28 170.04 282.56
2 14 16 14 278.4 26 143.65 311.96
3 13 16 13 256.23 27 148.54 317.05
. . .
41 16 16 16 324.89 24 92.63 360.6
42 10 12 10.6667 342.8 29.3333 216.84 375.83
43 13 14 13 320.05 27 152.7 376.11
44 7 12 10.6667 342.32 29.3333 216.34 380.18 . . .
142 1 2 16.6667 1318.88 23.3333 1250.72 1397.42
143 2 0 40 1577.32 0 0 1597.22
144 1 0 40 3137.87 0 0 3163.39
Table 5.7: Bitcone framework for phase one
Number Number Average Average Average Average Average of of Client Client Server Server Phase One Client Server Executed Execution Executed Execution Execution Rank Threads Threads Freq. Tasks Time (s) Tasks Time (s) Time (s)
1 5 10 4 17 98.224 23 98.198 131.198
2 5 8 1 17 94.046 23 99.109 132.819
3 7 8 2 23 132.891 17 133.153 164.243
4 6 2 1 29 148.100 11 144.144 175.840
5 7 2 9 28.6667 157.457 11.3333 149.965 187.777
6 2 7 1 29 158.801 11 147.502 189.852
Table 5.8: Bitcone framework for phase two
Number Number Average Average Average Average Average of of Client Client Server Server Phase Two Client Server Executed Execution Executed Execution Execution Rank Threads Threads Freq. Tasks Time (s) Tasks Time (s) Time (s)
1 7 12 2 9.5 319.522 30.5 323.2 364.897
2 9 12 3 12.3333 334.315 27.6667 319.041 366.079
3 10 12 8 12 344.057 28 319.937 376.127
4 11 12 1 31 792.659 9 697.956 824.056
5 13 11 1 31 800.284 9 708.038 827.357
6 10 11 2 30.625 811.065 9.375 732.884 842.461
7 10 8 1 30 865.323 10 777.208 891.666
8 11 7 1 30 868.185 10 781.591 896.401
Chapter 6
Conclusion
In this thesis we described ParaFitter, an autonomous framework that dynamically adapts the number of concurrent tasks to the level of parallelism observed from runtime performance monitoring. This technique allows transparent optimization of applications’ parallel execution considering both performance and resource utilization. By performing case-by-case dynamic thread control, our approach can easily adapts to different applications and resource configurations.
With our experiments, we have demonstrated that our ParaFitter has the poten-tial in capturing the parallelism available from the interplay of an application’s inherent parallelism and the configuration of distributed computing environment. This greatly re-duces the end user’s load in performance tuning, which is often application-dependent and platform-dependent.
Bibliography
[1] Jacques M. Bahi, St´ephane Domas, and Kamel Mazouzi. Jace: A java environment for distributed asynchronous iterative computations. InPDP, pages 350–357, 2004.
[2] OpenMP Architecture Review Board. Openmp — openmp: Simple, portable, scalable smp programming, 1997-2006. Available from URLhttp://www.openmp.org.
[3] Brad Calder, Andrew A. Chien, Ju Wang, and Don Yang. The entropia virtual machine for desktop grids. In VEE ’05: Proceedings of the 1st ACM/USENIX international conference on Virtual execution environments, pages 186–196, New York, NY, USA, 2005. ACM Press.
[4] Nicholas Carriero, Eric Freeman, David Gelernter, and David Kaminsky. Adaptive parallelism and piranha. IEEE Computer, 28(1):40–49, 1995.
[5] Nicholas Carriero and David Gelernter. Linda in context. Commun. ACM, 32(4):444– 458, 1989.
[6] Lin Chao. Preface: Meet the Intel Core Duo processor. Intel Technology Journal, 10(2):iii–iv, May 2006.
[7] Andrew A. Chien, Brad Calder, Stephen Elbert, and Karan Bhatia. Entropia: archi-tecture and performance of an enterprise desktop grid system. Journal of Parallel and Distributed Computing, 63(5):597–610, 2003.
[8] DAGMan. Directed acyclic graph manager. Available from URL
http://www.cs.wisc.edu/condor/dagman/.
shared-memory programming. IEEE Computational Science and Engineering, 05(1):46–55, 1998.
[10] Jeffrey Dean and Sanjay Ghemawat. Mapreduce: Simplified data processing on large clusters. pages 137–150, 2004.
[11] Jeffrey Dean and Sanjay Ghemawat. Mapreduce: Simplified data processing on large clusters. InOSDI, pages 137–150, 2004.
[12] Message Passing Interface Forum. Message passing interface forum. Available from URLhttp://www.mpi-forum.org.
[13] I. Foster. Grid computing. AIP Conference Proceedings, 583:51 – 56, 2001.
[14] David Gelernter and David Kaminsky. Supercomputing out of recycled garbage: Pre-liminary experience with piranha. In6th ACM International Conference on Supercom-puting, pages 417–427, Washington, D.C., 1992.
[15] C.A. Goble, D. De Roure, N.R. Shadbolt, and A.A.A. Fernandes. Enhancing services and applications with knowledge and semantics. In I. Foster and C Kesselman, editors,
The Grid 2: Blueprint for a New Computing Infrastructure, pages 431–458. Morgan-Kaufmann, 2004.
[16] Jean-Pierre Goux, Sanjeev Kulkarni, Jeff Linderoth, and Michael Yoder. An enabling framework for master-worker applications on the computational grid. InHPDC, pages 43–50, 2000.
[17] Jim Gray. Sort benchmark home page. Available from URL
http://research.microsoft.com/barc/SortBenchmark/.
[18] Jim Gray, David T. Liu, Maria Nieto-Santisteban, Alexander S. Szalay, David DeWitt, and Gerd Heber. Scientific data management in the coming decade, 2005.
[19] Project Gutenberg. The bible, old and new testaments, king james version. Available from URLhttp://www.gutenberg.org/etext/10.
[20] Imaginis. Computed tomography imaging (ct scan, cat scan). Available from URL
iterative algorithm. Review of Scientific Instruments, 47(3):1123–1129, 2000.
[23] M. Litzkow, M. Livny, and M. Mutka. Condor: A hunter of idle workstations. In
Proceedings of 8th Internetaional Conference of Distributed Computing Systems, pages 104–111, 1988.
[24] Miron Livny and Marvin Solomon. Condor project homepage. Available from URL
http://www.cs.wisc.edu/condor/.
[25] Deborah T. Marr, Frank Binns, David L. Hill, Glenn Hinton, David A. Koufaty, Alan J. Miller, and Michael Upton. Hyper-threading technology architecture and microarchi-tecture. Intel Technology Journal, 6(1):4–15, February 2002.
[26] Hans Meuer, Erich Strohmaier, Jack Dongarra, and Horst Simon. Top500 supercom-puter sites. Available from URLhttp://www.top500.org.
[27] DOE Office of Science. The office of science data management challenge report, 2004.
[28] Charles K. Ogden. 2000 basic english words. Available from URL
http://ogden.basic-english.org/word2000.html.
[29] OMG. Object management group, 1997-2006. Available from URLwww.omg.org.
[30] Jonathan W. Strickland, Vincent W. Freeh, Xiaosong Ma, and Sudharshan S. Vazhku-dai. Governor: Autonomic throttling for aggressive idle resource scavenging. InICAC ’05: Proceedings of the Second International Conference on Automatic Computing, pages 64–75, Washington, DC, USA, 2005. IEEE Computer Society.
[31] V. S. Sunderam. PVM: a framework for parallel distributed computing. Concurrency, Practice and Experience, 2(4):315–340, 1990.
[32] J. Zhang, Y. Cheng, Y. Z. Lee, B. Gao, Q. Qiu, W. L. Lin, D. Lalush, J. P. Lu, and O. Zhou. A nanotube-based field emission x-ray source for microcomputed tomography.
DataAdapter DataCollectionAdapter <<interface>>
IData
setData(o : Object) : void setExecutionTime(t : long) : void getExecutionTime() : long
setSendingWorkTime(t : long) : void getSendingWorkTime() : long setSendingDataTime(t : long) : void getSendingDataTime() : long
setReceivingDataTime(t : long) : void getReceivingDataTime() : long
<<interface>> IDataCollection
startOfExecution() : void endOfExecution() : void getWork() : IWork
getSharedData() : ISharedData getParallelType() : int
getDataIndex(i : int) : IData reduce(data : IData) : void size() : int
ISharedData IWork
execute(id : IData,sharedData : ISharedData) : void
A.2 Server API
<<process>> ExecutionServer
<<thread>> ServerThread
ExecutionServerImplementation
<<create>> ExecutionServerImplementation() setWork(work : IWork) : void
setSharedData(shared : ISharedData) : void getData() : IData
setData(data : IData) : void stillWorking() : boolean run() : void
heartBeat() : boolean
<<interface>> IExecutionServer
<<interface>> TaskCompletedListener
taskCompleted(thread : ExecutionThread) : void
<<thread>> ClientLoadBalancerThread
<<thread>> ServerLoadBalancerThread
<<implementation>> ExecutionThread
getRunTimeAndReset() : long
<<thread>> ClientExecutionThread
<<thread>> ServerExecutionThread Finish() : void
getParallelizeData() : IDataCollection[]