5.3 Resilient Application Frameworks
5.3.2 Resilient Iterative Framework
The resilient iterative framework can be used to implement fault tolerant iterative algorithms based on coordinated in-memory checkpointing. It enables the developer to focus on the algorithmic aspects of the application and rely on the framework to orchestrate the fault tolerance aspects. Meanwhile, it makes it possible for the application to optimize the performance by controlling the checkpointing scope.
§5.3 Resilient Application Frameworks 135
We applied multi-resolution resilience by building the iterative framework based on the PlaceLocalStore (Section 5.3.1.1). We chose this particular store type be- cause atomic multi-place updates are not needed and because global recovery is well-aligned with coordinated checkpointing. Table5.5lists the APIs of the iterative framework.
Table 5.5: The resilient iterative application framework APIs.
Class Function Returns
IterativeApp isFinished() Boolean step() void checkpoint() HashMap[K,V] restore(ckptState:HashMap[K,V]) void remake(changes:ChangeDescription) Any GlobalIterativeExecutor
make(ckptInterval:Long, spareCount:Long) IterativeExecutor
activePlaces() PlaceGroup
execute(app:IterativeApp) void
SPMDIterativeExecutor
make(ckptInterval:Long, spareCount:Long) IterativeExecutor
activePlaces() PlaceGroup
execute(app:IterativeApp) void
The programmer implements an iterative algorithm according to the specification of theIterativeAppinterface:
• isFinished(): defines the application’s convergence condition. • step(): defines the logic of a single iteration.
• checkpoint(): inserts critical application data in a key-value map for check- pointing.
• restore(ckptState): updates the program state given the last checkpointed state.
• remake(changes): provides the program with information about changes im- pacting the places, to rebuild its distributed data structures.
The programmer implements the above methods without attention to failures. ThestepandisFinishedfunctions are the same in non-resilient and resilient modes. X10 applications use global data structures mapped to a group of places. Theremake method is provided to enable the program to reconfigure its global data structures to adapt to the new place organization.
The framework provides two classes that execute an IterativeApp resiliently: GlobalIterativeExecutorandSPMDIterativeExecutor. Both are initialized with a user-defined checkpointing interval and a number of spare places. The framework manages the group of active places, which the program can obtain by calling the functionactivePlaces().
5.3.2.1 The Global Iterative Executor
TheGlobalIterativeExecutorcan be used for applications with arbitrary commu- nication patterns. It expects the programmer to define thestepfunction as a global function that internally distributes the step work across the active places in any way. Listing5.4outlines the execution procedure followed by the global executor:
Listing 5.4: The global executor. 1 def execute(app:IterativeApp) {
2 var i:Long = 1;
3 var err:Boolean = false; 4 while (true) { 5 try { 6 if (err) { 7 handlePlaceChanges(app); 8 globalRestore(app); 9 i = 1; 10 err = false; 11 } 12 if(app.isFinished()) break; 13 finish app.step(); // global step
14 i++;
15 if(i % ckptInterval == 0) globalCheckpoint(app); 16 } catch (e:Exception) {
17 if(e.isDPE()) err = true; else throw e;
18 }
19 }
20 }
The finishat Line13 waits for the completion of all child tasks created by the stepfunction and raises any detected exceptions duringstepexecution to the frame- work. The executor aims to guard the application from failures occurring during step execution, checkpointing, or restoration as long as the failure is not catastrophic. Detected exceptions that are not of type DeadPlaceExceptionare considered catas- trophic and cause the executor to stop and raise the error to the program (Listing5.4- Line17).
Checkpointing
Using a PlaceManager pm, a PlaceLocalStore rs, and a checkpoint identifier key, checkpointing is performed as follows:
21 def globalCheckpoint(app:IterativeApp) { 22 // alternating key for double-buffering
23 val k = key.equals("red") ? "black" : "red";
24 finish (for p in pm.activePlaces()) at (p) async {
25 rs.set(k, app.checkpoint());
26 }
27 key = k;
§5.3 Resilient Application Frameworks 137
Double-buffering is used to guard against failures that occur during checkpointing. We maintain the last two checkpoints of each place identified as black and red in Line 23. The color of the last valid checkpoint is stored in key. On creating a new checkpoint, the other color is used for checkpointing the state of all the active places (Line24-Line26). Only after checkpointing successfully completes, thekeyvariable is modified to carry the color of the new checkpoint (Line 27). A failure during checkpointing causes the code to skip over Line 27, thereby leaving the previous checkpoint as the valid one for recovery.
Recovery
When a recoverable failure is detected in Line17, theerr variable is set totrue to direct the execution towards the recovery path. The executor recovers the application by first calling the handlePlaceChangesfunction in Line 7. As shown in the code below, this function rebuilds the group of active places (Line30), recovers the resilient store (Line31— a catastrophic error will be raised if both replicas of a place are lost), and calls the remakefunction to allow the application to reconfigure its global data structures (Line32). However, it does not handle rolling back the application state. To recover the application state, the executor calls theglobalRestorefunction outlined below. It creates a task at each place that retrieves the last checkpoint from the resilient store and feeds it to the program by calling therestorefunction (Line36). As now both the structure of the application and its data have been recovered, execution can resume normally.
29 def handlePlaceChanges(app:IterativeApp) { 30 val changes = pm.rebuildActivePlaces(); 31 rs.recover(changes);
32 app.remake(changes); 33 }
34 def globalRestore(app:IterativeApp) {
35 finish for(p in pm.activePlaces()) at(p) async {
36 app.restore(rs.get(key));
37 }
38 }
5.3.2.2 The SPMD Iterative Executor
TheSPMDIterativeExecutoris optimized for a bulk-synchronous application, which executes as a series of synchronized steps across all the active places. A globalstep for such an application would create a fan-out finish at all the places to execute only one iteration. The repeated creation of remote micro tasks for each step would unnecessarily harm the performance. Listing 5.5 shows how the SPMD executor avoids this overhead by creating a coarse-grained task at each place that can execute multiple iterations, take periodic checkpoints, and perform part of the recovery work (Lines14–28). Note that a program that uses the SPMD executor needs to implement thestepfunction as a local function handling the work of a single place.
Listing 5.5: The SPMD executor. 1 def execute(app:IterativeApp) {
2 var err:Boolean = false; 3 while (true) { 4 try { 5 val res:Boolean; 6 if (err) { 7 handlePlaceChanges(app); 8 res = true; 9 err = false; 10 } else { 11 res = false; 12 }
13 val team = new Team(pm.activePlaces());
14 finish for (p in pm.activePlaces()) at(p) async {
15 if (res) {
16 app.restore(rs.get(key));
17 }
18 var i:Long = 1;
19 while (!app.isFinished()) {
20 finish app.step(); // local step
21 i++;
22 if(i % ckptInterval == 0) {
23 val k = key.equals("red") ? "black" : "red";
24 rs.set(k, app.checkpoint()); 25 team.agree(1); 26 key = k; 27 } 28 } 29 } 30 break; 31 } catch (e:Exception) {
32 if(e.isDPE()) err = true; else throw e;
33 }
34 }
35 }
In a failure-free execution, only one fan-out finish (Line 14) will be used for the entire computation. Otherwise, when a failure occurs, all the tasks terminate, and a new fan-out finish will be created after recovering the application’s structure. Recovering the application data is done by the main fan-out finish (at Line16).
Checkpointing
When a place reaches a checkpointing iteration, it independently saves its state in the resilient store using the opposite color of the last checkpoint (Lines23–24). Before the places switch to the new checkpoint (by updating thekeyvariable in Line26), global coordination is needed to ensure the successful creation of the new checkpoint at all the places. That is achieved using the fault tolerant collective agreement function
§5.3 Resilient Application Frameworks 139
provided by MPI-ULFM (see Section 3.4.4.3). This function is the only collective function that provides uniform failure reporting across all places. If one place is dead, the call toteam.agreein Line25will throw aDeadPlaceExceptionat all the places, and the recovery steps will consequently start.
Recovery
When a global failure is reported byteam.agree, the fan-outfinishat Line14termi- nates, and the control returns to the executor’s main thread. Handling a recoverable failure starts by recovering the application’s structure by callinghandlePlaceChanges. After that, a new fan-out finish starts, in which the first step to be performed at all places is recovering the application’s state. That is more efficient than calling the global executor’sglobalRestoremethod, which creates a separate fan-out finish for data recovery only.