Writing Hybrid MPI/OpenMP Code

The last few "Extreme Linux" columns have focused on multiprocessing using OpenMP. While often used in scientific models for shared memory parallelism on symmetric multi-processor (SMP) machines, OpenMP can also be used in conjunction with the Message Passing Interface (MPI) to provide a second level of parallelism for improved performance on Linux clusters having SMP compute nodes. Programs that mix OpenMP and MPI are often referred to as hybrid codes.

The last few “Extreme Linux” columns have focused on multiprocessing using OpenMP. While often used in scientific models for shared memory parallelism on symmetric multi-processor (SMP) machines, OpenMP can also be used in conjunction with the Message Passing Interface (MPI) to provide a second level of parallelism for improved performance on Linux clusters having SMP compute nodes. Programs that mix OpenMP and MPI are often referred to as hybrid codes.

Both MPI and OpenMP are portable across a wide variety of platforms from laptops to the largest supercomputers on the planet. MPI codes are usually written to dynamically decompose the problem among MPI processes. These processes run their piece of the problem on different processors usually having their own (distributed) memory. OpenMP codes, on the other hand, have directives that describe how parallel sections or loop iterations are to be split up among threads. The team of threads share a single memory space, but may also have some of their own private stack variables.

MPI programs should be written to generate the same answers no matter what decomposition is used, even if that decomposition is the entire problem space running in a single process on a single processor (in other words, serially). OpenMP programs should be written to generate the same answers no matter how many threads are used, and should yield the same answers even when compiled without OpenMP enabled.

Hybrid MPI/OpenMP codes present additional challenges over developing with just one or the other. Hybrid programs should generate the same answers no matter what decomposition is used for the distributed memory MPI parallelism and no matter how many OpenMP threads are used. Some hybrid codes — usually those that are carefully designed — are written so that they may be compiled as serial programs without even requiring the use of the MPI library. The example code included here is designed to run with any combination of MPI or OpenMP enabled.

Show Me the Code!

Since OpenMP spawns (or forks) a team of threads at the beginning of a parallel region and joins (or kills) them at the end of the parallel region, OpenMP parallelism should, in general, be exploited at as high a level as possible in the parallel code. Likewise, MPI should be used in a manner that minimizes communications, typically by decomposing the problem so that most of the work is done independently, if possible, by MPI processes over a subset of data.

In some cases it’s most effective to use MPI and OpenMP to parallelize processing at the same level. And that’s exactly what’s done in the hybrid.c code shown in Listing One.




Listing One: hybrid.c, an OpenMP and MPI hybrid


#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <math.h>
#ifdef USE_MPI
#include <mpi.h>
#endif /* USE_MPI */
#ifdef _OPENMP
#include <omp.h>
#endif /* _OPENMP */

int read_slab_info() {
/* This should read info from a file or something,
but we fake it */
return 80;
}

double process_slab(int snum)
{
int i, j;
double x;

for (i = 0; i < 10000; i++)
for (j = 0; j < 10000; j++)
x += sqrt((i-j)*(i-j) / (sqrt((i*i) + (j*j)) + 1));

return x;
}

void exit_on_error(char *message)
{
fprintf(stderr, “%s\n”, message);
#ifdef USE_MPI
MPI_Finalize();
#endif
exit(1);
}

int main(int argc, char **argv)
{
int i, j, p, me, nprocs, num_threads, num_slabs, spp;
int *my_slabs, *count;
double x, sum;
#ifdef _OPENMP
int np;
#endif /* _OPENMP */
#ifdef USE_MPI
int namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
#endif /* USE_MPI */

#ifdef USE_MPI
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &me);
MPI_Get_processor_name(processor_name, &namelen);
#else /* USE_MPI */
nprocs = 1;
me = 0;
#endif /* USE_MPI */

#ifdef _OPENMP
np = omp_get_num_procs();
omp_set_num_threads(np);
num_threads = omp_get_max_threads();
#else /* _OPENMP */
num_threads = 1;
#endif /* _OPENMP */

printf(“Process %d of %d”, me, nprocs);
#ifdef USE_MPI
printf(” running on %s”, processor_name);
#endif /* USE_MPI */
#ifdef _OPENMP
printf(” using OpenMP with %d threads”,
num_threads);
#endif /* _OPENMP */
printf(“\n”);

/* Master process reads slab data */
if (!me) num_slabs = read_slab_info();
#ifdef USE_MPI
if (MPI_Bcast(&num_slabs, 1, MPI_INT, 0,
MPI_COMM_WORLD) != MPI_SUCCESS)
exit_on_error(“Error in MPI_Bcast()”);
#endif /* USE_MPI */

if (num_slabs < nprocs)
exit_on_error(“Number of slabs may not exceed
number of processes”);
/* maximum number of slabs per process */
spp = (int)ceil((double)num_slabs /
(double)nprocs);
if (!me) printf(“No more than %d slabs will
assigned to each process\n”, spp);

/* allocate list and count of slabs for each
process */
if (!(my_slabs = (int *)malloc(nprocs*spp*
sizeof(int)))) {
perror(“my_slabs”);
exit(2);
}
if (!(count = (int *)malloc(nprocs*sizeof(int)))) {
perror(“count”);
exit(2);
}
/* initialize slab counts */
for (p = 0; p < nprocs; p++) count[p] = 0;
/* round robin assignment of slabs to processes
for better potential
* load balancing
*/
for (i = j = p = 0; i < num_slabs; i++) {
my_slabs[p*spp+j] = i;
count[p]++;
if (p == nprocs -1)
p = 0, j++;
else
p++;
}

/* each process works on its own list of slabs,
but OpenMP threads
* divide up the slabs on each process because
of OpenMP directive
*/
#pragma omp parallel for reduction(+: x)
for (i = 0; i < count[me]; i++) {
printf(“%d: slab %d being processed”, me,
my_slabs[me*spp+i]);
#ifdef _OPENMP
printf(” by thread %d”, omp_get_thread_num());
#endif /* _OPENMP */
printf(“\n”);
x += process_slab(my_slabs[me*spp+i]);
}

#ifdef USE_MPI
if (MPI_Reduce(&x, &sum, 1, MPI_DOUBLE, MPI_SUM, 0,
MPI_COMM_WORLD) != MPI_SUCCESS)
exit_on_error(“Error in MPI_Reduce()”);
#else /* USE_MPI */
sum = x;
#endif /* USE_MPI */

if (!me) printf(“Sum is %lg\n”, sum);

#ifdef USE_MPI
printf(“%d: Calling MPI_Finalize()\n”, me);
MPI_Finalize();
#endif /* USE_MPI */
exit(0);
}

hybrid.c does nothing useful, but it does demonstrate the framework one might use for developing a hybrid code. As usual, the OpenMP parts of the code, except for the actual #pragma omp directive, are wrapped with #ifdefs. As a result, OpenMP components are compiled only if the appropriate flag is provided to the compiler. In addition, the MPI parts are wrapped so that the code may be compiled without MPI. The USE_MPI macro is set manually during compilation to enable the use of MPI. The _OPENMP macro should never be defined manually; it’s automatically set when the code is compiled with OpenMP enabled.

hybrid.c decomposes “slabs” into chunks that are assigned to each MPI process. The slabs are processed independently, so this is the level at which MPI parallelism should be employed. Each MPI process then loops over each slab in its chunk or list of slabs, processing each slab in turn. This high level loop is an excellent candidate for employing OpenMP parallelism as well. In this way, each thread in the team of threads associated with an MPI process independently processes its own slab. The code provided here does not actually process slabs of data; instead it has some loops that just waste time to simulate work for each slab.

Just inside main(), MPI is initialized if MPI was enabled during compilation by defining the USE_MPI preprocessor macro. Otherwise, nprocs is set to 1 and me is set to 0. Next, when OpenMP is enabled, the number of threads to use is set to the number of processors on the node. The number of processors on the node is obtained by the call to omp_get_ num_procs(), and the number of threads is set by the call to omp_set_num_threads(). A call to omp_get_max_ threads() checks that that the number of threads was set correctly. When compiled with OpenMP disabled, num_ threads is simply set to 1.

A diagnostic message is printed by each process describing the configuration used for running the code. Next, the master process calls read_slab_info() to obtain the number of slabs (num_slabs) and this value is subsequently broadcast to all MPI processes by calling MPI_Bcast(). Then the number of slabs is checked to be sure that the number of MPI processes does not exceed the number of slabs to be processed.

In preparation for decomposing these slabs into chunks, the maximum number of slabs per process is determined by dividing the total number of slabs by the number of MPI processes. This value is rounded up to an integer by ceil(). Next, memory is allocated for arrays containing the list of slabs for each process (my_slabs) and the number of slabs contained in each list (count). Then the slab count for each process is initialized to 0.

In many scientific codes, slabs or columns or blocks distributed among processes in this way may take slightly different amounts of time to process. Often there is a spatial correlation between slabs and processing time — that is to say, slabs near each other tend to take about the same amount of time to process. In an attempt to balance the load, the chunks/lists are filled in round robin fashion. This should help even out the load among processes thereby providing better load balancing.

The loop over slabs assigns one slab to each process (by adding one slab to the list for each process), then cycles through again until all slabs are assigned to a process. Following this assignment loop, each slab is processed in a loop that runs from 0 to count[me], the number of slabs in the list for this process. The processing loop has an OpenMP parallel for directive above it, so trips through the loop are distributed among OpenMP threads that are spawned at this point in the code.

The slab number, obtained from the slab list for this process, is passed to the process_slab() routine. The slab number, MPI process rank, and OpenMP thread number are printed for diagnostic purposes for every trip through the loop. The process_slab() routine merely performs meaningless calculations before returning a double precision value.

The returned values are accumulated into the variable x on each process. Recall that the reduction clause in the omp parallel for directive causes each thread to have its own stack variable for x, and then the values are reduced (summed in this case) across all threads in each process before the loop exits.

These values are then summed across all MPI processes by the MPI_Reduce() call when MPI is enabled. Without MPI, the sum is merely the x value, irrespective of whether OpenMP is used. The sum is printed by the first MPI process (or the single serial process), MPI is finalized, and the program ends.

Building and Running the Hybrid Code

The Portland Group compiler was used along with the MPICH implementation of MPI in the cases below. The -mp flag on the compiler line enables OpenMP, and the USE_MPI preprocessor macro, set on the compile line, causes the MPI components to be compiled into the program.

Output One contains partial results from building and running the hybrid program with both MPI and OpenMP enabled. The program is run on four dual-processor nodes. As you can see from the diagnostic output, the slabs are distributed round robin fashion among MPI processes, and the loop iterations are distributed among threads on each process. The sum is printed at the end. This version of the code ran in 1 minute, 6 seconds.




Output One: Building and running the hybrid code with MPI and OpenMP


[forrest@node01 hybrid]$ make hybrid
pgcc -O -mp -DUSE_MPI -
I/usr/local/mpich_pgi/include -c -o hybrid.o
hybrid.c
pgcc -mp -L/usr/local/mpich_pgi/lib -o hybrid
hybrid.o -lm -lmpich

[forrest@node01 hybrid]$ time mpirun -nolocal
-np 4 -machinefile

machines hybrid
running hybrid
Process 2 of 4 running on node03 using OpenMP
with 2 threads
2: slab 2 being processed by thread 0
2: slab 42 being processed by thread 1
2: slab 6 being processed by thread 0
2: slab 46 being processed by thread 1

2: Calling MPI_Finalize()
Process 1 of 4 running on node02 using OpenMP
with 2 threads
1: slab 1 being processed by thread 0
1: slab 41 being processed by thread 1
1: slab 45 being processed by thread 1
1: slab 5 being processed by thread 0

1: Calling MPI_Finalize()
Process 3 of 4 running on node04 using OpenMP
with 2 threads
3: slab 3 being processed by thread 0
3: slab 43 being processed by thread 1
3: slab 47 being processed by thread 1
3: slab 7 being processed by thread 0

3: Calling MPI_Finalize()
Process 0 of 4 running on node01 using OpenMP
with 2 threads
No more than 20 slabs will assigned to each process
0: slab 0 being processed by thread 0
0: slab 40 being processed by thread 1
0: slab 4 being processed by thread 0
0: slab 44 being processed by thread 1

Sum is 3.09086e+11
0: Calling MPI_Finalize()

real 1m6.847s
user 0m0.113s
sys 0m0.178s

Output Two contains the output from building and running the same code with OpenMP disabled. In this case, the -mp flag is not passed to the compiler, but the USE_MPI macro is defined. This program is again run on the same four dual-processor nodes. However, since OpenMP is not enabled, only one processor is effectively used in each node. The same sum is printed, and the program ends. With MPI only, the code ran in 1 minute, 25 seconds.




Output Two: Building and running the hybrid code with MPI only


[forrest@node01 hybrid]$ make hybrid_mpi_only
pgcc -O -DUSE_MPI -I/usr/local/mpich_pgi/include -c -o
hybrid_mpi_only.o hybrid.c
pgcc -L/usr/local/mpich_pgi/lib -o
hybrid_mpi_only hybrid_mpi_only.o
-lm -lmpich

[forrest@node01 hybrid]$ time mpirun -nolocal
-np 4 -machinefile

machines hybrid_mpi_only
Process 1 of 4 running on node02
1: slab 1 being processed
1: slab 5 being processed
1: slab 9 being processed

1: Calling MPI_Finalize()
Process 2 of 4 running on node03
2: slab 2 being processed
2: slab 6 being processed
2: slab 10 being processed

2: Calling MPI_Finalize()
Process 3 of 4 running on node04
3: slab 3 being processed
3: slab 7 being processed
3: slab 11 being processed

3: Calling MPI_Finalize()
Process 0 of 4 running on node01
No more than 20 slabs will assigned to each process
0: slab 0 being processed
0: slab 4 being processed
0: slab 8 being processed

Sum is 3.09086e+11
0: Calling MPI_Finalize()

real 1m25.025s
user 0m0.086s
sys 0m0.109s

Next, the same code is compiled and run without MPI, but with OpenMP enabled. Output Three contains the results from this OpenMP-only run. In this case, all slabs are assigned to the single process, and each slab is processed by one of the two threads running on a single dual-processor node. The same sum is computed, but the time required to complete has risen to 4 minutes and 43 seconds.




Output Three: Building and running the hybrid code with OpenMP only


[forrest@node01 hybrid]$ make hybrid_omp_only
pgcc -O -mp -c -o hybrid_omp_only.o hybrid.c
pgcc -mp -o hybrid_omp_only hybrid_omp_only.o -lm

[forrest@node01 hybrid]$ time ./hybrid_omp_only
Process 0 of 1 using OpenMP with 2 threads
No more than 80 slabs will assigned to each process
0: slab 0 being processed by thread 0
0: slab 40 being processed by thread 1
0: slab 1 being processed by thread 0
0: slab 41 being processed by thread 1
.
.
.
Sum is 3.19123e+11

real 4m43.153s
user 9m26.291s
sys 0m0.000s

Finally, the hybrid code is compiled without either MPI or OpenMP. Output Four contains the results. The slabs are assigned to the single process, and each is processed one at a time. The same answer is generated. A sample run completed in 5 minutes, 37 seconds.




Output Four: Building and running the hybrid code without MPI or OpenMP


[forrest@node01 hybrid]$ make hybrid_serial
pgcc -O -c -o hybrid_serial.o hybrid.c
pgcc -o hybrid_serial hybrid_serial.o -lm

[forrest@node01 hybrid]$ time ./hybrid_serial
Process 0 of 1
No more than 80 slabs will assigned to each process
0: slab 0 being processed
0: slab 1 being processed
0: slab 2 being processed
.
.
.
0: slab 79 being processed
Sum is 3.09086e+11

real 5m37.036s
user 5m37.025s
sys 0m0.000s

Hybrids Are Go

As you can see from these results, adding OpenMP parallelism to the MPI parallel code reduced total run time.

With only a little additional design effort, it’s fairly easy to write a hybrid MPI/OpenMP code. Moreover, with careful attention to coding, it’s often possible to develop your code so that it can be run in any combination. Go ahead and give it a try on your own cluster.



Forrest Hoffman is a computer modeling and simulation researcher at Oak Ridge National Laboratory. He can be reached at forrest@climate.ornl.gov. You can download the code used in this article from http://www.linux-mag.com/downloads/2004-04/extreme.

Fatal error: Call to undefined function aa_author_bios() in /opt/apache/dms/b2b/linux-mag.com/site/www/htdocs/wp-content/themes/linuxmag/single.php on line 62