MPI Communicators and Groups

In the previous two columns, we discussed the master/slave programming idiom and how to pass derived data types in messages. This month, we continue our exploration of advanced uses of the Message Passing Interface (MPI) with a look at MPI communicators and groups, two MPI features that provide communications contexts and modularity for parallel codes.

In the previous two columns, we discussed the master/slave programming idiom and how to pass derived data types in messages. This month, we continue our exploration of advanced uses of the Message Passing Interface (MPI) with a look at MPI communicators and groups, two MPI features that provide communications contexts and modularity for parallel codes.

Communicators, What Are They Good For?

A communicator can be thought of as a communications “channel” — like a channel on a CB radio. An MPI process can “tune” to a communicator to send and receive messages, and like a CB radio, all messages sent via the communicator are received by all processes tuned to that communicator.

Up to now, all of the MPI codes we’ve seen have used MPI_COMM_WORLD. MPI_ COMM_WORLD as the default communicator and all processes tune to it. Available immediately upon initialization of MPI (via MPI_Init()), MPI_COMM_WORLD defines a default, global context for message passing. (Here a context refers to a safe realm of message passing in MPI.)

However, an MPI application can create multiple, separate communicators to insulate messages associated with one set of tasks or group of processes from those associated with another. In fact, if only a subset of processes are involved in a particular calculation, a new communicator comprising that group is required. Fortunately, new MPI communicators are easily created and destroyed.

MPI communicators come in two flavors: intra-communicators for operations within a single group of processes, and inter-communicators for point-to-point communication between two groups of processes. We’ll start with intra-communicators, and discuss inter-communicators in next month’s column. As usual, the MPICH implementation (available at http://www-unix.mcs.anl.gov/mpimpich/index.html) is used for the examples. Error checking is absent in the examples due to space limitations.

Manipulating Intra-Communicators

Message tags are typically used to distinguish various types of messages from each other. However, it is also possible to use the same tag for two different messages in two different contexts by using two intra-communicators. This is most easily achieved by duplicating the MPI_COMM_WORLD communicator (which is an intra-communicator) so that the new context refers to the same group of processes, namely all of them.

The code in Listing One performs simultaneous message passing in two different contexts by sending two different messages to the neighboring process of higher rank (in a ring). Moreover, it demonstrates the independence of collective and point-to-point communications by performing two global reduction operations (one in each context) immediately following calls to non-blocking receives and corresponding non-blocking sends.




Listing One: context.c, an example of message passing in two contexts


#include <stdio.h>
#include “mpi.h”

#define SAME_TAG1

int main(int argc, char **argv)
{
int rank, size, namelen, one, two, buf1, buf2, sum1, sum2;
char processor_name[MPI_MAX_PROCESSOR_NAME];
MPI_Comm comm_two;
MPI_Request request[4];
MPI_Status stati[4];

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Get_processor_name(processor_name, &namelen);
printf(“Hello world! I’m rank %d of %d on %s\n”, rank, size,
processor_name);
MPI_Comm_dup(MPI_COMM_WORLD, &comm_two);

one = rank + 1;
two = (rank + 1) * 2;
/* Two non-blocking receives of the same type from the same source with
* the same tag, but for two different contexts. */
MPI_Irecv(&buf1, 1, MPI_INT, (rank == (size – 1) ? 0 : rank + 1),
SAME_TAG, MPI_COMM_WORLD, &request[0]);
MPI_Irecv(&buf2, 1, MPI_INT, (rank == (size – 1) ? 0 : rank + 1),
SAME_TAG, comm_two, &request[1]);
MPI_Isend(&two, 1, MPI_INT, (rank == 0 ? (size – 1) : rank – 1),
SAME_TAG, comm_two, &request[2]);
MPI_Isend(&one, 1, MPI_INT, (rank == 0 ? (size – 1) : rank – 1),
SAME_TAG, MPI_COMM_WORLD, &request[3]);
/* Collective communication mixed with point-to-point communication.
* MPI guarantees that a single communicator can do safe point-to-point
* and collective communication. */
MPI_Allreduce(&one, &sum1, 1, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
MPI_Allreduce(&two, &sum2, 1, MPI_INT, MPI_SUM, comm_two);
MPI_Waitall(4, request, stati);

printf(“%d: Received buf1=%d and buf2=%d, sum1=%d and sum2=%d\n”, rank,
buf1, buf2, sum1, sum2);

MPI_Comm_free(&comm_two);
MPI_Finalize();
}

As usual, we start with the basic parallel “Hello world!” code from previous examples. First, MPI is initialized using MPI_Init(); the rank of the process is obtained by calling MPI_Comm_rank(); the number of processes started is obtained from MPI_Comm_size(); the processor name (usually the host name) is obtained by calling MPI_Get_processor_ name(); and this information is subsequently printed along with “Hello world!”

Next, a new intra-communicator is created by calling MPI_ Comm_dup(). The new communicator, called comm_two, will comprise the same group of processes as the original — in this case all of the processes since MPI_COMM_WORLD was duplicated — but in a new context. The values of one and two are set to different unique values based on the rank of each process; these will serve as the messages for the point-to-point communications and will be used in the global reduction.

To continue, two non-blocking receives with the same message tag are executed, one for each communicator. The receive operations obtain single integers from their right-hand neighbors in a ring fashion (i.e., from the process with the next largest rank, or from process zero for the one having the largest rank of all). Next, two corresponding non-blocking sends with the same message tag are executed, again, one for each communicator. The sends pass single integers to their left hand neighbors in a ring.

While these point-to-point communications are occurring, two global reduction operations (one in each context) are executed: the one values are summed using the MPI_COMM_WORLD communicator and the two values are summed using the comm_two communicator. Then the MPI_Waitall() routine is called to block until the four point-to-point operations are completed. The arguments to MPI_Waitall() are the number of pending operations (4), the array of operation handles (request), and an array for storing status codes (stati).

The values received in the point-to-point communication (stored in buf1 and buf2) are then printed along with the two sums (stored in sum1 and sum2). The new communicator is subsequently destroyed by calling MPI_Comm_free() prior to finalizing MPI with MPI_Finalize() and ending the program.

Figure One shows the results of compiling and running this code with four processes. As we can see, each process correctly received the value passed from its neighbor. In addition, both global sums are computed correctly, and the sums were passed to all processes. As usual, the output is in no particular order. Overlapping point-to-point and collective operations worked as expected, and the two communicators prevent any confusion of messages exchanged with the same message tag.




Figure One: The output of context.c (Listing One)


[forrest@node01]$ mpicc -O -o context context.c
[forrest@node01]$ mpirun -np 4 context
Hello world! I’m rank 0 of 4 on node01
0: Received buf1=2 and buf2=4, sum1=10 and sum2=20
Hello world! I’m rank 2 of 4 on node03
2: Received buf1=4 and buf2=8, sum1=10 and sum2=20
Hello world! I’m rank 1 of 4 on node02
1: Received buf1=3 and buf2=6, sum1=10 and sum2=20
Hello world! I’m rank 3 of 4 on node04
3: Received buf1=1 and buf2=2, sum1=10 and sum2=20

Having multiple contexts is particularly useful when coupling together models that may have overlapping communications and message tags. Additionally, this sort of modularity is required for any kind of parallel library that may be called from a user’s code. A separate context insulates the communications performed by library routines from those performed by the user’s code.

Listing Two contains user code that calls routines to perform a matrix transpose from an example parallel library shown in Listing Three. The user code initializes MPI, obtains the rank, size, and processor name, and then calls mylib_init(), passing it the MPI_COMM_WORLD communicator. mylib_init() checks to see if it has already been initialized, and, if not, creates a new intra-communicator (comm_private) for its own use and returns.




Listing Two: context2.c, an example of a user code context


#include <stdio.h>
#include “mpi.h”
#include “mylib.h”

int main(int argc, char **argv)
{
int rank, size, namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
int i, *row;

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Get_processor_name(processor_name, &namelen);
/* Initialize my parallel math library */
mylib_init(MPI_COMM_WORLD);

row = (int *)malloc(size * sizeof(int));
printf(“%d: Row [", rank);
for (i = 0; i < size; i++) {
row[i] = rank * size + i;
printf(” %d”, row[i]);
}
printf(” ]\n”);
/* Call my parallel transpose routine */
mylib_transpose(row, size);
printf(“%d: Transpose [", rank);
for (i = 0; i < size; i++) {
printf(" %d", row[i]);
}
printf(” ]\n”);

/* Finalize my parallel math library */
mylib_finalize();
MPI_Finalize();
}

Next, the user code builds a row of the NxN matrix on each process, where N is simply the number of processes in this trivial example. Each process prints its row vector, then calls the mylib_transpose() routine (passing row and size). The library routine performs the matrix transposition by passing a single element from each row to each process; the result is stored in the temporary column vector called col, and is subsequently stuffed into the row vector passed in.




Figure Two: The output of Listings Two and Three


[forrest@node01]$ mpicc -O -c mylib.c
[forrest@node01]$ ar rv libmylib.a mylib.o
r – mylib.o
[forrest@node01]$ mpicc -O -o context2 context2.c -L. -lmylib
[forrest@node01]$ mpirun -np 4 context2
0: Row [ 0 1 2 3 ]
0: Transpose [ 0 4 8 12 ]
2: Row [ 8 9 10 11 ]
2: Transpose [ 2 6 10 14 ]
3: Row [ 12 13 14 15 ]
3: Transpose [ 3 7 11 15 ]
1: Row [ 4 5 6 7 ]
1: Transpose [ 1 5 9 13 ]

The user code prints out its new row vector, and then mylib_finalize() is called to free up the private communicator and reset the init_flag. Finally, MPI is finalized, and the program ends. The results of compiling and running context2.c are shown in Figure Two. Although the example does not actually contain any overlapping communication, it demonstrates the strategy generally used for achieving safe communications spaces in parallel libraries.




Listing Three: mylib.c, a library using its own context


#include <stdio.h>
#include “mpi.h”

int init_flag = 0, rank, size;
MPI_Comm comm_private;

void mylib_init(MPI_Comm comm_caller)
{
if (!init_flag) {
MPI_Comm_dup(comm_caller, &comm_private);
MPI_Comm_rank(comm_private, &rank);
MPI_Comm_size(comm_private, &size);
init_flag = 1;
}
return;
}

int *mylib_transpose(int *row, int len)
{
int i, *col;

col = (int *)malloc(len * sizeof(int));
MPI_Alltoall(row, 1, MPI_INT, col, 1, MPI_INT, comm_private);

for (i = 0; i < len; i++)
row[i] = col[i];
free(col);
return row;
}

void mylib_finalize()
{
if (init_flag)
MPI_Comm_free(&comm_private);
init_flag = 0;
return;
}

MPI Process Groups

In MPI, groups define the participating processes or membership in the context of a communicator. These group constructs can be manipulated to create specialized communicators for subsets of processes. All group manipulations are local and they can be different across processes; however, group definitions used for communicator creation must be consistent. Moreover, communicator creation must occur on all processes, even those that are not members of the corresponding group.

The default or base group created during MPI initialization is the one associated with the initial communicator MPI_COMM_WORLD. Once you obtain the handle of the base group using MPI_Comm_group(), you can manipulate the base group — including or excluding individual process or ranges of process ranks — to create a new group. Once the new group has been constructed, you can create a new communicator for that group.

The program in Listing Four creates a new communicator for slave processes (all processes except for the one of rank zero, the master process) using the group construct. The new communicator provides a “slave universe” scope for a collective reduction operation. A second reduction operation occurs using the MPI_COMM_WORLD communicator involving all processes.




Listing Four: group-slaves.c, an example of creating a new communicator


#include <stdio.h>
#include “mpi.h”

int main(int argc, char **argv)
{
int rank[2], size[2], namelen, xranks[] = { 0 };
char processor_name[MPI_MAX_PROCESSOR_NAME];
MPI_Group mpi_group_world, group_slaves;
MPI_Comm comm_slaves;
int send_val, recv_val, send_val2, recv_val2;

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank[0]);
MPI_Comm_size(MPI_COMM_WORLD, &size[0]);
MPI_Get_processor_name(processor_name, &namelen);

MPI_Comm_group(MPI_COMM_WORLD, &mpi_group_world);
MPI_Group_excl(mpi_group_world, 1, xranks, &group_slaves);
MPI_Comm_create(MPI_COMM_WORLD, group_slaves, &comm_slaves);

printf(“Hello world! I’m rank %d of %d on %s\n”, rank[0], size[0],
processor_name);
if (rank[0]) {
MPI_Comm_rank(comm_slaves, &rank[1]);
MPI_Comm_size(comm_slaves, &size[1]);
printf(“In the slave universe I’m rank %d of %d on %s\n”,
rank[1], size[1], processor_name);
send_val = size[1];
MPI_Reduce(&send_val, &recv_val, 1, MPI_INT, MPI_SUM, 0,
comm_slaves);
if (!rank[1]) printf(“Slave leader received reduced value %d\n”,
recv_val);
}
send_val2 = size[0];
MPI_Reduce(&send_val2, &recv_val2, 1, MPI_INT, MPI_SUM, 0,
MPI_COMM_WORLD);
if (!rank[0]) printf(“Master received reduced value %d\n”, recv_val2);

if (comm_slaves != MPI_COMM_NULL) MPI_Comm_free(&comm_slaves);
MPI_Group_free(&group_slaves);
MPI_Group_free(&mpi_group_world);
MPI_Finalize();
}

As usual, the program begins by initializing MPI, and obtaining rank, size, and processor name. Then the group handle corresponding to the default/base group is obtained by calling MPI_Comm_group() with the MPI_COMM_WORLD communicator. Stored in mpi_group _world, the group structure is passed to MPI_Group_excl(), which creates a new group structure (stored in group_ slaves) that includes all members of the original group except those contained in the xranks array. In this case, xranks contains only one member: a zero signifying the first or master process.

So on each process, a new group construct is made containing all but the process of rank 0. Next, a new communicator is built by calling MPI_Comm_create(), passing it the default communicator (MPI_COMM_WORLD), the new group construct (group_slaves), and a pointer for storing the new communicator comm_slaves.

All processes then print their rank, size, and processor name in the original default context. Next, the slaves call MPI_Comm_ rank() and MPI_Comm_size() in the new context using the comm_slaves communicator and print out these values. We can see from Figure Three that the rank and size in the new context is one less than those in the default context.




Figure Three: The output of Listing Four


[forrest@node01]$ mpicc -O -o group-slaves group-slaves.c
[forrest@node01]$ mpirun -np 4 group-slaves
Hello world! I’m rank 0 of 4 on node01
Master received reduced value 16
Hello world! I’m rank 1 of 4 on node02
In the slave universe I’m rank 0 of 3 on node02
Slave leader received reduced value 9
Hello world! I’m rank 3 of 4 on node04
In the slave universe I’m rank 2 of 3 on node04
Hello world! I’m rank 2 of 4 on node03
In the slave universe I’m rank 1 of 3 on node03

The slaves subsequently set send_val to the size of the new group and then call MPI_Reduce() to sum the values across all slave processes, leaving the result on the process of rank zero in the new context (the slave leader). Next, all processes (including the master) set send_val2 to the size of the full group and call MPI_Reduce() in the default context (using the MPI_COMM_WORLD communicator) to sum the values across all processes. The master then prints the sum.

At the end of the program, the slaves destroy the new communicator (comm_slaves) by calling MPI_Comm_free(). Notice that although the master called MPI_Comm_create() just like all the other processes, its comm_slaves communicator was set to MPI_COMM_NULL because it was not a member of the new group. As a result, the master can not call MPI_Comm_free() to destroy a communicator it doesn’t possess. Finally, the two group constructs are destroyed using MPI_Group_free(), MPI is finalized, and the program ends.

Stay Tuned for More…

This introduction to groups and communicators in MPI demonstrates the basic mechanism for achieving parallel code modularity and providing safe communications contexts for a variety of code constructs. Additional routines are available for manipulating both communicators and groups. More of these routines will be discussed in next month’s column, along with special communicators designed for message passing between groups (called inter-communicators).

And be sure to watch out for next month’s special cluster issue of Linux Magazine. It will be chock full of articles from guest authors, writing about all aspects of parallel computing with Linux.



Forrest Hoffman is a computer modeling and simulation researcher at Oak Ridge National Laboratory. He can be reached at forrest@climate.ornl.gov.

Fatal error: Call to undefined function aa_author_bios() in /opt/apache/dms/b2b/linux-mag.com/site/www/htdocs/wp-content/themes/linuxmag/single.php on line 62