• No results found

The proposed communication layer provides an easy-to-use high level user inter- face for inter-node communication. The combination of this communication layer and the task engine imposes further challenges which need to be handled.

The task engine focuses on applications using fine-grained tasks. Classical com- munication usually communicates in a bulk synchronous way [113]. After a phase of computation a phase of communication follows. However, synchronizations between larger phases of an application are dissolved by the data-driven approach. This means, these synchronizations cannot be used for bulk communication any- more.

Additionally, fine-grained tasks compute the data in small portions piece by piece. Referring to the 𝛼-𝛽 model [57] the communication time t for a message of

2 // [...]

3 // Adds a new item ready for communication

4 void AddReadyItem(data_type const & item) {

5 std::lock_guard<std::mutex> lock(mutex_);

6 ready_items_.emplace_back(item);

7

8 // If the send buffer is full, start sending

9 if (ready_items_.size() == waiting_buf_size_)

10 send();

11 }

12 // Send the ready_items_ to the recipients_

13 void send() {

14 send_pt2pt_wrapper_.SendToRanks(recipients_, std::move(ready_items_));

15 ready_items_.clear();

16 }

17 // [...]

18 };

length n, with bandwidth r and latency t0 is given by:

t = t0+ n ⋅ r . (4.1)

If the message is very small, the latency will dominate the time spent in the communication. This also means, communicating small chunks of data after successfully executing a single task will be slower than sending larger chunks of data after several tasks have finished. Therefore, the data will be buffered in a send queue before sending.

4.3.1 Collecting Send Data from Different Tasks

The send queue is used for buffering data before communication takes place (see Listing 4.19). The send queue is configured upfront by the user by defining the chunk size and the recipients. After the computation of the data inside a task is finished, the data will be added to the send queue. When the desired chunk size is reached, the send queue will automatically send the data to the requested recipients.

Another problem arising from the modified communication scheme is the iden- tification of the data. For bulk synchronous communication, a defined set of ordered data will be send. The corresponding receiver exactly knows the data and the order it arrives (assuming message ordering). As an example the 10 multipole expansions could have the ID starting from 100 to 109. Using the send queue, it can be ensured to send a 10 multipole expansions, but the IDs could be in an arbitrary order. Even worse, if the chunk size was set to a fixed size, it is not known which multipole expansion will be sent in which message.

A workaround could be to employ the MPI tag for communicating the identifiers. This would imply that we have to use a wildcard tag matching on the receiving end of the communication. Additionally, the tag feature itself is limited and therefore this workaround is not a sustainable solution.

The problem can be solved by extending the serialization tools discussed earlier. For the buffered send, a small meta-data object is added before the regular data in memory. This meta-data object consists of the number of elements serialized and the identifiers used internally. This is done internally, the user does not need to specify the extension with meta-data.

4.3.2 Dynamically Receiving Messages

Another issue for the task-based inter-node communication is the receiving of messages. As discussed before, multithreaded MPI introduces several performance drawbacks. Restricting the receiving to a single threads can be advantageous in this case.

A simple solution for the receiving of messages could be to post all non-blocking receives of an MPI process at the beginning and constantly check if a request is finished. However, determining all receive calls upfront might be cumbersome. Besides this, calling all receives upfront might also be a performance bottleneck. Additionally, testing a request usingMPI_Test involves an internal critical section. Therefore it should be avoided to frequently callMPI_Testfrom concurrent threads.

The receiving mechanism offered in the task engine is restricted to a single communication thread. This thread frequently probes MPI (MPI_Iprobe) for new

messages. If a new message is available this message will be received using a non-blocking receive. Afterwards, the communication thread will process the message further (e.g. deserialization). If the sent data is related to a data-event in the static data-flow dispatcher, the communication thread will trigger this event. Thus the static data-flow dispatcher will dispatch the event as usual (like events triggered after the computation). For the dispatcher it makes no difference if the data came from actual computation inside a task or by communication.

4.3.3 Defining Communication Schemes

For a user friendly communication interface in the task engine some things are still missing. As for the static data-flow dispatcher, the user needs to decide what to do with the data. The data-flow dispatcher allows to configure the event handlers triggered for certain events upfront. The same is required for the communication. Whenever new data is computed and ready for sending, a dispatcher needs to be configured deciding where to send the data now or later.

The static data-flow dispatcher is capable of resolving the dispatch at compile- time. Unfortunately, this is not possible for the communication dispatcher. The number of used MPI processes is not fixed at compile-time and thus is neither the distribution of work. To define a communication scheme, this however is a requirement. Therefore, the static data-flow dispatcher was extended with a communication dispatcher resulting in a hybrid (static and dynamic) data-flow dispatcher. The hybrid data-flow dispatcher functions similar to the static data- flow dispatcher. The only difference is that the communication dispatcher is called as well, after the dispatching of static events has been called. The communication scheme used for the communication dispatcher must be defined by the user upfront.

10−2 10−1 Runtime [s] Ideal Scaling Measurement 1 2 4 8 16 32 0.0 0.5 1.0 #MPI Processes Efficiency Ideal Scaling Measurement

Figure 4.6:This benchmark uses the large particle system encompassing 103 680 particles and a multipole order of 15. It was conducted on 32 nodes of JURECA resulting in 768 cores in total.