By applying Foster’s methodology to the Pascal’s Triangle example it has been shown informally how the program can be converted into a program suitable for a distributed computer.
% 1. pascal(0,0,1) combined with distribute statement A:
pascal(T,0,0,1) <-- true, 0 = 0, slice_width(Sw), T is 0 // Sw. % 2. pascal(0,0,1) combined with distribute statement B:
pascal(T+1,0,0,1) <-- true, slice_width(Sw), halo_size(N), curr_task(T), 0 rem N = 0, tasks(T), range((T+1)*Sw - N, (T+1)*Sw, 0), tasks(T+1). % 3. pascal(0,0,1) combined with distribute statement C (always false): pascal(T,0,0,1) <-- true, tasks(T), 0 > 0, 0 \= 0, curr_task(T).
% 4. Rule 1 combined with distribute statement A:
pascal(T,I,0,1) <-- pascal(T,J,0,1), rows(Nn), I is J+1, Nn > I, I = 0, slice_width(Sw), T is 0 // Sw.
% 5. Rule 1 combined with distribute statement B:
pascal(T+1,I,0,1) <-- pascal(T,J,0,1), rows(Nn), I is J+1, Nn > I, slice_width(Sw), halo_size(N), curr_task(T), I rem N = 0,
range((T+1)*Sw - N, (T+1)*Sw, 0), tasks(T+1). % 6. Rule 1 combined with distribute statement C:
pascal(T,I,0,1) <-- pascal(T,J,0,1), rows(Nn), I is J+1, Nn > I, I > 0, I \= 0, curr_task(T).
% 7. Rule 2 combined with distribute statement A:
pascal(T,I,I,1) <-- I = I, pascal(T,J,J,1), rows(Nn), I is J+1, Nn > I, slice_width(Sw), T is I // Sw.
% 8. Rule 2 combined with distribute statement B:
pascal(T+1,I,I,1) <-- pascal(T,J,J,1), rows(Nn), I is J+1, Nn > I, slice_width(Sw), halo_size(N), curr_task(T), I rem N = 0,
range((T+1)*Sw - N, (T+1)*Sw, I), tasks(T+1).
% 9. Rule 2 combined with distribute statement C (always false):
pascal(T,I,I,1) <-- pascal(T,J,J,1), rows(Nn), I is J+1, Nn > I, I > 0, I \= I, curr_task(T).
% 10. Rule 3 combined with distribute statement A:
pascal(T,R+1,C,N1+N2) <-- pascal(T,R,C,N1), rows(Nn), R+1 < Nn, pascal(T,R,C-1,N2), R+1 = C, slice_width(Sw), T is C // Sw. % 11. Rule 3 combined with distribute statement B:
pascal(T+1,R+1,C,N1+N2) <-- pascal(T,R,C,N1), rows(Nn), R+1 < Nn,
pascal(T,R,C-1,N2), slice_width(Sw), halo_size(N), curr_task(T),
(R+1) rem N = 0, range((T+1)*Sw - N, (T+1)*Sw, C), tasks(T+1). % 12. Rule 3 combined with distribute statement C:
pascal(T,R+1,C,N1+N2) <-- pascal(T,R,C,N1), rows(Nn), R+1 < Nn, pascal(T,R,C-1,N2), R+1 > 0, R+1 \= C, curr_task(T).
4.4.1
Combining Program Rules and Distribute Statements
To create a distributed single-program multiple data program, the program rules are combined with the distribute statements, as described in Section 4.2.1.
To specify how the Pascal’s Triangle program is partitioned, an extra attribute is added to the pascal relation. This attribute is the task number of the task where the tuple is stored. If the tuple is stored on a different task, then the tuple is sent to that task. When this extra attribute is added, the pascal(Row,Column,Value) relation becomes pascal(Task,Row,Column,Value).
The result of combining the pascal rules (from Figure 4.2) and the halo distribute rules (from Figure 4.7) is shown in Figure 4.8. The new rules created by combining the pascal rules with the distribute statements are labelled 1 to 12.
Currently, there is no distributed version of JStar. To test the distribute statements, a distributed environment was simulated inside the Starlog interpreter. This distributed environment was simulated by creating separate sets of pascal rules for each task. Each of these sets of rules simulates one task.
In each of these sets of rules, the output of the curr task(T) predicate is hard-coded to the task number for that simulated task. The Task attribute for any of the input tuples from the pascal relation are also hard-coded to the task number of simulated task. This simulates only being able to access tuples that are stored on the same task.
Simulating a distributed environment allowed testing the distribute statements so that errors could be found and corrected. The distributed program produced the same results as the original program for several different test cases. Although testing is not as good as a proof, it is possible to have some confidence that the distributed program works as intended.
4.4.2
Compiling the Distributed Program
To compile the pascal SPMD program in Figure 4.8 a Starlog compiler which supports distributed programs (described in Section 4.2.2) is required.
There are various optimisations that can be done at this point. Some of these rules in can be eliminated or simplified. Rules 3 and 9 can be eliminated because they both have a clause which is always false.
Rule 1 can be simplified to pascal(0,0,0,1) <-- true . Rule 4 can be eliminated because the head of rule 4 simplifies to pascal(0,0,0,1), which has already been proved true by rule 1.
Many of the rules in Figure 4.8 contain a lot of duplication, as they were created by combining each of the program rules with each of the distribute statements. The performance of this code could likely be improved by sharing duplicated code between rules.[14]. The existing Starlog compiler already does
an optimisation called common prefix factorisation[14]. Common prefix factorisation factors out shared sequences of instructions. This similar to the imperative programming language optimisation where repeated if statements with common elements in the conditions can be rewritten so that each element is only evaluated once.
Part of a distributed Starlog program is coordination between each of the tasks. This allows each of the tasks to know that they have received all the tuples they will ever receive at a particular point in the stratification order. For the pascal example, some static analysis of the program can help optimise the coordination between the tasks:
• Each task with a task number of T only sends to task T+1. This means that each task will only
receive messages from T-1 (except for the first task which does not receive any messages). Each task only has to coordinate with the previous task and the next task, which requires less communication than coordinating with all the tasks.
• As haloing is used, data is not sent every row. Every rule in Figure 4.8 only sends on rows where
the remainder of the row number divided by the halo size is zero (i.e. the row number is divisible by the halo size). This is because of the clauses halo size(N), R rem N = 0 in each of the rules, which send the output tuples to the next node. Each task can assume it will not receive any tuples for rows where the row number is not divisible by the halo size.
As the program is stratified by row number, this will be translated into imperative machine code in which the outer loop iterates over the row number. The program would stop every number of rows to perform communication, along the lines of this pseudocode:
nextCommunicationRow = 0; row=0
while row < rows
// communicate this generation? if (row == nextCommunicationRow) {
start sending data (nonblocking) to next receive data from previous
finish sending data
nextCommunicationRow += rowsPerMessage calculate next row