Advanced Message Passing

In the past two columns, we've looked at the theory and basics of parallel computing. This month we get to the real world with a program that performs matrix multiplication.

In the past two columns, we’ve looked at the theory and basics of parallel computing. This month we get to the real world with a program that performs matrix multiplication.

Parallel programming applications must make efficient use of memory and minimize inter-process communication. This requires well-conceived data structures and careful consideration of message passing strategies. In the Open Source world, you basically have two choices for your superstructure: PVM (Parallel Virtual Machine, http://www.epm.ornl.gov/pvm/) and MPI (Message Passing Interface, http://www.lam-mpi.org/ and http://www-unix.mcs.anl.gov/mpi/mpich/).

Message Passing

Point-to-point communication (message passing between individual processes) uses the MPI_Send() and MPI_Recv() routines or the pvm_send() and pvm_recv() routines. These come in blocking and non-blocking varieties.

An alternative approach is to use simultaneous message passing involving some or all processes. This method, called collective communication, uses routines called MPI_Bcast() and MPI_Reduce() for MPI and pvm_mcast() for PVM. The MPI_Bcast() and pvm_mcast() routines distribute the same data to all processes while MPI_Reduce() is used to perform some mathematical operation on a variable spread across all processes. This month we’ll look at how MPI handles these tasks.

MPI_Reduce() has pre-defined operations to compute the sum or product of a variable, find the maximum or minimum value of a variable, or perform logical or bit-wise AND, OR, or XOR operations; however, a user-defined operation may also be performed. Collective communications are blocking, but the specification strongly suggests that they should not be relied upon for program synchronization.

A Real-World Example

Many scientific computational codes use matrix multiplication to solve a problem. Although parallel versions of subroutines that provide this functionality are easily available, you can learn a great deal about message passing strategies by writing these types of routines from scratch.

Given two matrices, A (an m x n matrix) and B (an r x p matrix), their product, AB = C, is defined only when r = n. That is, the number of rows of matrix B must equal the number of columns of matrix A. The product, C, is an m x p matrix. Each element in C is the inner product (or dot product) of a row vector of A with a column vector of B.

Figure One: Matrix Multiplication

When multiplying matrices with pencil and paper, you multiply the elements in the first row of A with the elements in the first column of B and add together these products to obtain the first element of C. This is denoted by the blue boxes in Figure One. The remaining elements of C are calculated in similar fashion using rows of A and columns of B.

Two Algorithms

You might want to have each process calculate one or more elements of C by passing each both a given row of A and a given column of B. Then each element could be returned to the root process and assembled into the final matrix C. This strategy will work, but it requires sending the same rows or columns to multiple processes, and that requires the root process to keep track of which elements of C will be returned from each process.

An alternative strategy would be to send a column of A and a row of B to each process. In this case each process would calculate one of the products contained in each element of C. These products could be summed by a global reduction routine to produce the final result. (See the red boxes in Figure One.) This strategy is easier to program and minimizes communications prior to the calculations, but it requires more communications afterward; it thus relies upon an efficient implementation of the reduction algorithm in the message passing system. Most MPI implementations use a tree-based algorithm to collate the data so that the reduction operations are actually distributed when more than four processes are involved in the operation.

While MPI_Bcast() is useful for distributing parameters or data common to calculations being performed by all processes, it is not useful for distributing subsets of data in a regular fashion to all processes. This is accomplished in MPI using the MPI_Scatter() routine. This routine effectively subsets the data and sends individual subsets to each process in turn according to its rank. A variant of this routine, called MPI_Scatterv(), can be used to send a varying amount of data to each process. Similarly, the MPI_Gather() and MPI_Gatherv() routines are used to collect subsets of data from processes and assemble them into a larger array.

Listing One contains the code for this alternative matrix multiplication strategy. To keep the code small for this column, the matrix sizes — M, N, and P — are pre-defined, the code is all in main(), the data are read from files, and the program requires N processes to be used in the calculation. This code can easily be converted into a subroutine that can handle any size matrix and utilize any number of processes less than or equal to N.

Listing One: mmult.c

1 #include <stdio.h>
2 #include <unistd.h>
3 #include “mpi.h”
5 #define M2
6 #define N3
7 #define P4
9 int main(int argc, char **argv)
10 {
11 FILE *fpa, *fpb;
12 int me, nprocs, namelen, i, j;
13 float a[M][N], b[N][P], c[M][P], d[M][P];
14 float atrans[N][M], avec[M], bvec[P];
15 char processor_name[MPI_MAX_PROCESSOR_NAME];
16 MPI_Status status;
18 MPI_Init(&argc, &argv);
19 MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
20 MPI_Comm_rank(MPI_COMM_WORLD, &me);
21 MPI_Get_processor_name(processor_name, &namelen);
23 if (!me) {
24 if (!(fpa=fopen(“a.dat”, “r”)) ||
!(fpb=fopen(“b.dat”, “r”)))
25 exit(-1);
26 printf(“a =\n”);
27 for (i = 0; i < M; i++) {
28 for (j = 0; j < N; j++) {
29 fscanf(fpa, ” %f “, &a[i][j]);
30 atrans[j][i] = a[i][j];
31 printf(” %f”, a[i][j]);
32 }
33 printf(“\n”);
34 }
35 printf(“b =\n”);
36 for (i = 0; i < N; i++) {
37 for (j = 0; j < P; j++) {
38 fscanf(fpb, ” %f “, &b[i][j]);
39 printf(” %f”, b[i][j]);
40 }
41 printf(“\n”);
42 }
43 fclose(fpa);
44 fclose(fpb);
45 }
47 if (MPI_Scatter(atrans, M, MPI_FLOAT, avec, M,
49 fprintf(stderr, “Error scattering A\n”);
51 if (MPI_Scatter(b, P, MPI_FLOAT, bvec, P,
53 fprintf(stderr, “Error scattering B\n”);
55 for (i = 0; i < M; i++)
56 for (j = 0; j < P; j++)
57 c[i][j] = avec[i] * bvec[j];
59 if (MPI_Allreduce(c, d, (M*P), MPI_FLOAT,
61 fprintf(stderr, “Error reducing\n”);
63 if (!me) {
64 printf(“a * b =\n”);
65 for (i = 0; i < M; i++) {
66 for (j = 0; j < P; j++)
67 printf(” %f”, d[i][j]);
68 printf(“\n”);
69 }
70 }
72 MPI_Finalize();
73 }

In Listing One (pg. 43), the matrices A and B are read from files (lines 23-45), and then MPI_Scatter() is used to send a column of A and a row of B to each process (lines 47-53). Because we know that C stores matrices (arrays) in memory a row at a time, we’ll send a column of A by building its transpose (rows and columns are switched) as we read in the elements (line 30), and send a row of the transpose. This will correspond to a column of A.

In lines 47-48 and lines 51-52, MPI_Scatter() is passed the address of the data buffer to send (atrans and b); the number elements to send to each process (M and P); the data type being sent (MPI_FLOAT); the address of the receive buffer (avec and bvec); the number of elements to receive (M and P); the data type to receive (MPI_FLOAT); the rank of the sending process (0); and the communicator (MPI_COMM_WORLD).

Next each process calculates its own product for each element of C in lines 55-57. These products must be summed across all processes in order to arrive at the result. MPI_Allreduce() is used to accomplish this in lines 59-60. Like MPI_Reduce(), MPI_Allreduce() performs an operation (again, these can be pre-defined or user-defined) on the data from each process. It’s passed the address of the send buffer (c), the address of the receive buffer (d), the number of elements to send (M*P), the data type (MPI_FLOAT), the operation (MPI_SUM), and the communicator (MPI_COMM_WORLD). Now every process has the answer stored in the array d.

The program is compiled and run as shown in Figure Two. Sure enough, it works!

Figure Two: Compiling and running mmult.c

[forrest@beowulf mmult]$ mpicc -O -o mmult mmult.c
[forrest@beowulf mmult]$ mpirun -np 3 mmult
a =
6.811383 30.782171 20.334776
4.417596 34.088512 26.125584
b =
9.833102 36.618156 18.256323 1.296968
25.917124 20.557819 41.508060 35.924686
41.968475 7.957412 24.677752 37.039406
a * b =
1718.181885 1044.046753 1903.875610 1867.861938
2023.365723 1070.441650 2140.317627 2198.024658

The Art of (Parallel) Programming

Obviously, you wouldn’t use a parallel computer to solve such a tiny matrix multiplication problem, but this small problem is perfect for testing an algorithm which may now be scaled up and applied to very large matrices. Careful thought and considerable creativity are required to create scalable parallel algorithms; as has been said before, the process is as much art as it is science.

These last three moth’s worth of columns have provided a thorough introduction to parallel programming techniques using the MPI and PVM message passing interfaces. This knowledge will serve you whether you’re writing code for a small Beowulf cluster or for the largest commercial parallel computers on Earth.

Forrest Hoffman is a computer modeling and simulation researcher at Oak Ridge National Laboratory. He can be reached at forrest@esd.ornl.gov

Comments are closed.