standard. TheMPI-ULFMteam publishes the latest updates about the project on the fault tolerance research hub (fault-tolerance.org).
A reference implementation of MPI-ULFMis available based on the open source MPI implementation OpenMPI[2018]. The first release, ULFM-1, was based on an old version of OpenMPI (v1.7) compatible with the MPI-2 standard, which does not support non-blocking collectives. ULFM-1 also did not have proper support for thread safety. ULFM-2 was released in April 2017 based on OpenMPI (v3.0), compatible with the MPI-3 standard. It provides non-blocking collectives and lightly tested support for fault tolerance with multiple threads. ULFM-2 is used as a basis for our experimental analysis in Section3.5.
The popularity ofMPI-ULFMhas been increasing, and many researchers started relying on it for building resilient MPI applications. Pauli et al.[2013] used ULFM for resilient Monte Carlo simulations. Ali et al.[2014,2015] implemented exact and approximate recovery methods for 2D and 3D Partial Differential Equation (PDE) solvers over ULFM.Laguna et al.[2014] provided a programmability evaluation for ULFM in the context of a resilient molecular dynamics application.
The development of MPI-ULFM served as a promising opportunity for us to shift RX10 to supercomputer scale. In the following sections, we detail how we integratedMPI-ULFMwith X10 to provide failure notification and optimized fault- tolerant collectives at large scales. To the best of our knowledge, our work is the first to evaluate ULFM in the context of a high-level language.
3.2
X10 over MPI
In this section, we describe how X10 uses MPI with a focus on initializing X10 places, handling active messages, and handling collective operations.
3.2.1 Initialization
X10 programs can choose between sockets, MPI, or PAMI as a compilation target. WhenMPIis chosen, X10 places are launched as normalMPIprocesses. They start by initializing the default communicatorMPI_COMM_WORLDby callingMPI_INIT_THREAD(..) and specifying the desired thread support. MPIprovides four levels of thread sup- port:
• MPI_THREAD_SINGLE: for single-threaded MPI processes.
• MPI_THREAD_FUNNELED: for multi-threaded MPI processes, when MPI calls are done by the main thread only.
• MPI_THREAD_SERIALIZED: for multi-threaded MPI processes, when only one thread at a time can call MPI.
• MPI_THREAD_MULTIPLE: for multi-threaded MPI processes, when multiple threads can call MPI concurrently.
X10 places are multi-threaded, and all the threads can invoke communication actions. Thus only MPI_THREAD_SERIALIZEDandMPI_THREAD_MULTIPLE are feasible for X10. X10RT-MPI uses MPI_THREAD_MULTIPLE by default, unless the MPI imple- mentation does not support it or the user explicitly chooses the serialized mode. Due to lack of stable support forMPI_THREAD_MULTIPLEin currentMPI-ULFMimple- mentations, we changed the default behaviour of X10RT-MPI to use the serialized threading mode for anyMPI-ULFMimplementation. After a successful initialization, the default communicatorMPI_COMM_WORLDis used throughout the execution of the X10 program.
3.2.2 Active Messages
An active message encapsulates a function to be executed at a destination process. Theatconstruct is used to send active messages in X10. For example,at(q)async S; sends an active message to execute statementSat placeqasynchronously. X10 relies onX10RTto perform the required communication for exchanging active messages.
at(dst) asyncS;
x10rt_send_msg ( &msg );
err1 = MPI_Isend( &msg, …, &mpi_req );
err3= MPI_Iprobe (MPI_ANY_SOURCE, … ); err4 = MPI_Irecv ( &rcv_msg, …, &mpi_req ); msg_received( &rcv_msg );
push_to_scheduler( S );
err2= MPI_Test ( &mpi_req, &send_done, … );
err5= MPI_Test ( &mpi_req, &recv_done, … );
msg = serialize( S ); S = deserialize ( &rcv_msg );
X10RT X10RT-MPI Runtime Sender Receiver x10rt_mpi_send_msg ( &msg ); 1 2 3 4 5 10 9 8 7 6 11
Figure 3.2: X10 active messages implementation over MPI. Some of the method names shown in the figure are slightly different from the actual implementation for more clarity. The error variables colored in red, at steps 5, 6, and 8, mark the points at which MPI-ULFM may notify process failures, as will be discussed in Section3.3.2.
Figure 3.2shows the flow of actions by the X10 runtime and its MPI transport layer for sending an active message. On invokingat(q)async S;, the active message Sis serialised into a send buffer namedmsgthat is then passed to the abstract method x10rt_send_msg(&msg)inX10RT. This method invokes the corresponding concrete implementationx10rt_mpi_send_msg(&msg), which uses MPI to send the buffermsg. X10 uses non-blocking two-sided communication operations for transferring data and active messages between places. The sender place invokesMPI_Isend(..) and passes, among other parameters, the send buffermsgand a handle namedmpi_req that can be used for checking the status of this send request (see Figure3.2-step 4). MPI applications can useMPI_Wait(..)to block a process that issued a non-blocking
§3.2 X10 over MPI 45
request until the request completes. However, as mentioned in Section3.1, blocking an X10 place may lead to deadlocks. Therefore, X10 uses the alternative non-blocking methodMPI_Test(..)to check the completion status of its send and receive requests given their handles. Each place maintains a queue of pending requests and peri- odically checks their completion status. In Figure 3.2, the sender repeatedly calls MPI_Test(..)in step 5 to check the completion status of the send request issued in step 4. When the send request completes, X10RT-MPI removes its handle from the pending requests queue and expects the message to eventually reach its destination. Each X10 place actively probes for incoming messages from other places by re- peatedly calling the non-blocking functionMPI_Iprobe(..)and passing the wildcard valueMPI_ANY_SOURCEas the source parameter (step 6). WhenMPI_Iprobedetects the arrival of a message, X10RT-MPI extracts the message’s metadata including its source and number of bytes and then invokes the non-blocking request MPI_Irecv(..) to receive the message (step 7). A handle to this receive request is stored in the pend- ing requests queue and is periodically checked for completion usingMPI_Test(..) (step 8). When the message is received, X10RT-MPIpasses it to the X10 runtime by callingmsg_received(&msg)(step 9). The message is then deserialised to create the activityS(step 10), which is finally pushed to the work-stealing scheduler of X10 to eventually execute it (step 11).
It is worth noting that MPI has its own definition of what makes a request com- plete. A receive request is considered complete after the message is fully received. However, a send request is considered complete when the application can reuse the send buffer. If the send buffer is relatively small, MPI may copy it to an internal buffer and declare the request complete before it starts sending the message. If the send buffer is beyond MPI’s capacity, MPI will not be able to copy it to an internal buffer; it will not declare the request complete until after the message is actually sent to prevent the user from altering the send buffer. In the case of buffering, the source place may fail after declaring a send request complete but before sending the message. This possibility is taken into consideration inRX10. The protocols used for tracking the active messages assume that a message may be lost due to the failure of its source or its destination. These protocols will be described in details in Chapter4.
To summarise, X10 implements active messages using non-blocking MPI functions (MPI_Isend,MPI_Irecv, andMPI_Test). Each place is actively scanning its inbox for incoming messages from other places usingMPI_Iprobe(MPI_ANY_SOURCE,..).
3.2.3 Team Collectives
The APGAS model of X10 is well-suited for bulk-synchronousSPMDprograms that rely on collective operations for synchronization or data sharing. The following text quoted from [Hamouda et al.,2016] briefly describes how the X10 team implemented collective operations in X10. It refers to two types ofMPIcollective functions: blocking and non-blocking. A blocking collective function blocks the calling thread until the collective operation completes. A non-blocking collective function returns after initiating the collective operation without waiting for the operation to complete.
X10 contains a collectiveAPI similar to MPI, located in x10.util.Team, offering collective operations such as barrier, broadcast, all-to-all, etc. Team attempts to make use of any collective capabilities available in the underly- ing transport. For transports that provide native collectives,Teammaps its operations to the transport collective implementations. For transports that do not provide collectives, such as TCP/IP sockets,Teamprovidesemulated collective implementations. An interesting combination arises when the un- derlying transport supports some, but not all, of the functionality needed by X10. The X10 thread model requires non-blocking operations from the network transport, because there may be runnable tasks in the thread’s work queue, and a blocking network call will prevent that runnable work from completing, leading to possible deadlock. MPI-3 offers non-blocking collectives, but other than barrier these are optional, and MPI-2 only sup- ports blocking collectives. For best performance we still want to make use of these, so our implementation calls an emulated barrier immediately before issuing the blocking collective. This allows us to line up all places so that when they reach the blocking operation, they are all in a position to pass through the collective immediately. [Hamouda et al.,2016]
Figure3.3shows an example program usingTeamfor a reduction operation and the three differentTeamimplementations: emulated, native blocking and native non- blocking.
valteam = newTeam(Place.places()); for(p inPlace.places()) at(p) async{
valsrc = newRail[Int]( SIZE, (i:Long)=> i asInt); valdst = newRail[Int]( SIZE );
team.allreduce(src, 0, dst, 0, SIZE, Team.ADD); } 1 2 3 4 5 6 X10RT has native collectives? team.emu_allreduce(..); X10RT has non-blocking collectives? X10RT.allreduce(..); team.emu_barrier(..); X10RT.allreduce(..); No Yes No Yes
§3.2 X10 over MPI 47
3.2.3.1 The Emulated Implementation
The emulatedTeamimplementation organizes the places in a binary tree that facili- tates recursive communication between the nodes in a unidirectional way (e.g. for broadcast and gather) or a bidirectional way (e.g. for barrier and allreduce). The collective starts by blocking the parent places in a busy waiting loop until they re- ceive upward notifications from their children, as shown in Line2of the pseudocode below. When a parent receives notifications from its children, it switches to the child role and passes the notification to its own parent. A child place notifies its parent by creating a finish that governs a remote asynchronous task at the parent place (Line4), which serves two purposes. First, it notifies the parent that the child has entered the collective (Line5), which implies that the places in the child’s sub-tree have also entered the collective. Second, it blocks the child until the upper part of the tree enters the collectives (Line6). The completion of thefinishreturns control back to the child indicating the completion of the upward notification phase. At this point, the child switches to the parent role and releases its own children who are blocked locallyin Line6waiting for the downward notification. If the collective requires data to be transferred, these may be piggybacked on the notification messages.
1 /*as a parent:*/
2 wait_for_upward_notification(); 3 /*as a child:*/
4 finish at (parent) async {
5 deliver_upward_notification(); 6 wait_for_downward_notification(); 7 }
8 /*as a parent:*/
9 deliver_downward_notification();
3.2.3.2 The Native Implementation
WhenTeamis used in native mode, it implements the collective interfaces by delegat- ing to the underlyingMPIimplementation.Teamsurrounds each call to a nativeMPI
collective with a local finish that serves one purpose — ensuring the completion of the native collective that may be non-blocking. It emulates the execution of the
MPI collective as if it is done by an async statement. It does so by first calling a runtime function that increments the number offinishtasks, then calling the X10RT- MPI function that invokes theMPIcollective. X10RT-MPI receives a reference to the surroundingfinishand uses it to notify the termination of the emulatedasyncstate- ment after theMPIcollective completes. A notification of a blocking MPI collective is sent immediately after the blocking call returns. On the other hand, a non-blocking collective is notified only after MPI_Test reports its completion, in the same way active messages are checked for completion (see Section3.2.2). The same mechanism is applied with thePAMItransport.