Message Passing with MPI and PVM

Parallel computing can be utilized to perform numerous computations quickly and solve problems by making many processors work simultaneously on smaller subtasks or subsets of the data. Parallel computing is often used on problems that cannot be solved by more conventional means (such as using a serial algorithm running on a PC or workstation).

Parallel computing can be utilized to perform numerous computations quickly and solve problems by making many processors work simultaneously on smaller subtasks or subsets of the data. Parallel computing is often used on problems that cannot be solved by more conventional means (such as using a serial algorithm running on a PC or workstation).

Most parallel applications require communication, called message passing, between subtask processes. Message passing is used for things such as loading or distributing initial data, exchanging data while running an algorithm, and collecting final results, which are usually distributed across the memories local to each processor at the end of execution.

In last month’s column, we focused on the basic concepts of parallel programming, including problem decomposition and granularity. We presented examples of program initiation and termination using both PVM (Parallel Virtual Machine) and MPI (Message Passing Interface), the two most popular message passing applications programming interfaces.




Nodes, Processors, and Processes

Computer architectures are becoming increasingly complex and hierarchical. Today, most computers larger than a desktop PC contain more than one processor. Beowulf clusters consist of individual computers called nodes. Most clusters today have nodes with multiple processors called symmetric multiprocessor (SMP) nodes. Although individual processors within a single node have their own private cache memory, they can access and share the main memory in that node. Message passing provides a mechanism for the exchange of data in memory distributed across the nodes of a cluster.

By default, PVM, MPICH, and LAM all initiate one process on each node and then cycle through the list of nodes in round-robin fashion, starting additional processes until the requested number has been started. On SMP nodes, process migration is automatically handled by the kernel so that all processors are used if the correct number of processes (or tasks) are created on that node. In such a distributed computing environment, all processes will communicate over the network even if they are executing on the same node. In this case, faster shared memory buffers on a single multiprocessor node are not utilized.

On the other hand, on large multiprocessor systems (which do not include Beowulf clusters), special versions of PVM and MPI that can take advantage of shared memory data exchange (instead of performing network communication) are used. Computational scientists who really want to squeeze every last ounce of performance out of distributed SMPs will use PVM or MPI for network communication and some other API (like pthreads or OpenMP) for multiprocessor shared memory communication within the same node. This takes considerable effort, and such software engineering investments are typically reserved for only the most critically important programs. Most Beowulf programmers use the standard network versions of PVM or MPI, are careful to initiate the right number of tasks on each node, and rely on the kernel to migrate processes on SMP boxes included in the virtual machine.

This month, we’ll show how MPI and PVM perform message passing. This can be performed with simple send and receive operations (referred to as point-to-point communication) or through broadcast, gather/scatter, and global reduction operations (called collective communication).

Send and receive operations come in two flavors, blocking and non-blocking. Blocking routines will not return until the message has been sent or received, and relevant buffers may be safely manipulated.

Non-blocking routines return immediately and provide a handle that may be probed later to check for communication completion. Send and receive buffers used in non-blocking routines should not be accessed until the handle is checked to ensure that the operation is complete.

Basic MPI Communication

Figure One shows how to compile and run the mpi_message1 program. This program uses MPI routines for communication. It queries the user for a seed value, then uses this to fill an initial value array which is subsequently broadcast to all processes, computes a local sum using the received initial value array, and sends each sum to the root or master node (the node of rank zero), which computes a cumulative total. Each node prints its local sum, and the root node prints the cumulative total.




Figure One: Running mp1_message1


[forrest@beowulf mpi]$ mpicc -O -o mpi_message1 mpi_message1.c
[forrest@beowulf mpi]$ mpirun -np 6 mpi_message1
Enter some kind of seed value:
1.2345
0: My sum is 0.000000
1: My sum is 19197.565848
5: My sum is 95987.829239
4: My sum is 76790.263391
3: My sum is 57592.697543
2: My sum is 38395.131695
0: Total sum is 287963.487716

Listing One contains the source code for mpi_message1, which demonstrates the most fundamental means of message passing using MPI. These are MPI_Bcast(), MPI_Send(), and MPI_Recv() function calls.




Listing One: mpi_message1.c


#include <stdio.h>
#include “mpi.h”

#define ASIZE 100
#define PI3.141592653589793238462643

void main(int argc, char **argv)
{
int me, nprocs, namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
int i;
double seed, init_val[ASIZE], val[ASIZE], sum, tsum;
MPI_Status status;

MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &me);
MPI_Get_processor_name(processor_name, &namelen);

if (!me) { /* Only the first process in the group */
printf(“Enter some kind of seed value:\n”);
scanf(“%lf”, &seed);
for (i = 0; i < ASIZE; i++)
init_val[i] = (double)i * seed * PI;
}

/* Broadcast computed initial values to all other processes */
if (MPI_Bcast(init_val, ASIZE, MPI_DOUBLE, 0, MPI_COMM_WORLD)
!= MPI_SUCCESS)
fprintf(stderr, “Oops! An error occurred in MPI_Bcast()\n”);

for (i = 0, sum = 0.0; i < ASIZE; i++) {
val[i] = init_val[i] * me;
sum += val[i];
}
printf(“%d: My sum is %lf\n”, me, sum);

/* Send sum back to the first process */
if (me){/* All processes except the one of rank 0 */
MPI_Send(&sum, 1, MPI_DOUBLE, 0, 1, MPI_COMM_WORLD);
}
else {
tsum = sum;
for (i = 1; i < nprocs; i++) {
MPI_Recv(&sum, 1, MPI_DOUBLE, MPI_ANY_SOURCE, 1,
MPI_COMM_WORLD, &status);
tsum += sum;
}
printf(“%d: Total sum is %lf\n”, me, tsum);
}

MPI_Finalize();
}

MPI_Bcast() is a collective communications routine that sends data from a single process (in this case, the process with rank zero, which is specified in the fourth parameter) to all processes taking part in the parallel program. This single routine performs the necessary send operations in the specified process of the originator and receive operations on all processes (since the originator also sends the message to itself).

In this example, initial values are calculated by the rank zero process using a seed value input by the user, then MPI_Bcast() is used to distribute these initial values. MPI_Bcast() is passed the initial values array (init_ val), the array size (ASIZE), the data type (MPI_DOUBLE), the rank of the originator (0), and the communicator (MPI_COMM_WORLD). As noted last month, MPI_COMM_WORLD is a special communicator that denotes all of the processes available at initialization.

Next, the other processes multiply these initial values by their own process rank and sum them. These sums are printed and sent to the rank zero process using MPI_Send(). The rank zero process calls MPI_Recv() a total of nprocs – 1 times to obtain the sums from each process and accumulates them in tsum.

The MPI_Send() and MPI_Recv() routines provide point-to-point communication between any two processes. These routines are blocking; they will not return until the operation is complete and the buffer may be safely accessed. The MPI_Send() routine is passed the following: the address of the buffer to send (&sum), the number of elements in the buffer (1), the data type of the elements in the buffer (MPI_DOUBLE), the destination rank (0), a message tag (1), and the communicator (MPI_COMM_WORLD). The MPI_Recv() routine is passed the address of the buffer in which to receive (&sum), the number of elements in the buffer (1), the data type of the elements of the buffer (MPI_DOUBLE), the rank of the originator (in this case MPI_ANY_SOURCE, which acts like a wild card allowing messages to be received from any sender), a message tag (1, here denoting a slave process), the communicator (MPI_COMM_WORLD), and a status structure which contains some information about the message received.

Message tags are arbitrary positive integers chosen by the programmer to label messages. They are handy when processes pass around many different kinds of messages simultaneously. For instance, in a sorting or classification algorithm, a message tag of 1 could be used to signal the master processor that an individual slave is ready for more work. A message returned from the master with a tag of 2 could signify a message containing another block of data for processing, while a message with a tag of 3 might inform the slave that no other data is available for processing. Message tags may also be used to selectively receive messages (by requiring that they have a particular tag). Messages may be selectively received also by specifying the originator rank (instead of MPI_ ANY_SOURCE).

Global Reduction

PVM and MPI both provide efficient routines that will perform global data reduction operations. These include computing the sum or product of variables and finding the maximum or minimum of variables. MPI also includes predefined logical and bit-wise AND, OR, and XOR operations. These operations are performed using data spread across all processes involved in the parallel computation. In addition, programmers may define their own reduction operation functions in either PVM or MPI.

The mpi_message1 program can be simplified and shortened by replacing MPI_Send() and MPI_Recv(), the point-to-point communication routines that are used to send sums to the root node, with a single MPI_Reduce() routine. Listing Three contains source code for the mpi_message2 program, which is basically the mpi_message1 code that has been modified to use MPI_Reduce().




Listing Three: mpi_message2.c


#include <stdio.h>
#include “mpi.h”

#define ASIZE100
#define PI3.141592653589793238462643

int main(int argc, char **argv)
{
int me, nprocs, namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
int i;
double seed, init_val[ASIZE], val[ASIZE], sum, tsum;
MPI_Status status;

MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &me);
MPI_Get_processor_name(processor_name, &namelen);

if (!me) { /* Only the first process in the group */
printf(“Enter some kind of seed value:\n”);
scanf(“%lf”, &seed);
for (i = 0; i < ASIZE; i++)
init_val[i] = (double)i * seed * PI;
}

/* Broadcast computed initial values to all other processes */
if (MPI_Bcast(init_val, ASIZE, MPI_DOUBLE, 0, MPI_COMM_WORLD)
!= MPI_SUCCESS)
fprintf(stderr, “Oops! An error occurred in MPI_Bcast()\n”);

for (i = 0, sum = 0.0; i < ASIZE; i++) {
val[i] = init_val[i] * me;
sum += val[i];
}
printf(“%d: My sum is %lf\n”, me, sum);

/* Sum the values stored in sum from each process and provide result
in tsum on node of rank 0 */
MPI_Reduce(&sum, &tsum, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
if (!me) printf(“%d: Total sum is %lf\n”, me, tsum);

MPI_Finalize();
}

The MPI_Reduce() routine is passed the following: the address of the buffer to send (&sum), the address of the buffer in which to receive on the destination process (&tsum), the number of elements in the buffer (1), the data type of the elements in the buffer (MPI_ DOUBLE), the desired reduction operation (in this case the predefined MPI_SUM operation), the rank of the destination process (0), and the communicator (MPI_ COMM_WORLD).

When compiled and run, mpi_ message2 will provide the same output as mpi_message1. A similar reduction operation routine called pvm_reduce() is available in PVM.

And Just Ahead…

This brief introduction to message passing using MPI and PVM is by no means comprehensive. It should, however, provide some insights into how to write parallel code using these APIs. More advanced parallel programming techniques will be discussed in future columns.




Resources

Local Area Multicomputer (LAM) Message Passing Interface (MPI) http://www.lam-mpi.org

MPICH http://www-unix.mcs.anl.gov/mpi/mpich

Parallel Virtual Machine (PVM) http://www.epm.ornl.gov/pvm



Forrest Hoffman is a computer modeling and simulation researcher at Oak Ridge National Laboratory. He can be reached at forrest@ esd.ornl.gov.

Fatal error: Call to undefined function aa_author_bios() in /opt/apache/dms/b2b/linux-mag.com/site/www/htdocs/wp-content/themes/linuxmag/single.php on line 62