High Performance I/O: Parallel netCDF

netCDF consists of application programming interfaces (APIs) and self-describing file formats containing metadata and data all in a single file.

As clusters grow and computational applications scale to more and more processors, input and output (I/O) frequently becomes a performance-limiting bottleneck. While hardware manufacturers continue to improve I/O bandwidth between memory and disk, the distributed memory environment of Linux clusters poses unique challenges for obtaining high performance I/O.

Parallel and cluster file systems attempt to provide the needed scalability by distributing file data on disks around the cluster. In addition, parallel I/O is accessible to applications through MPI-IO, the I/O layer specified in the MPI-2 standard. However, most model developers want a high-level interface that can deal with the details of collective I/O and distributed file systems no matter what they are.

Parallel file systems are finally coming of age, and MPI-IO is available in both MPICH and LAM/MPI, the two most popular MPI implementations for Linux clusters.[ The Parallel Virtual File System (PVFS) and rudimentary MPI-IO use were first discussed in this column in July and August of 2002, available online at http://www.linuxmagazine.com/2002-07/extreme_01.html and http://www.linuxmagazine.com/2002-08/extreme_01.html, respectively.] Now PVFS2 is available for testing, and the ROMIO implementation of MPI-IO is available in MPICH2 (see sidebar). Other file system solutions are also available, including Lustre, the Global File System (GFS) from Red Hat, and CXFS from Silicon Graphics (SGI).

Parallel programmers will not, in general, want to use a programming interface to any one of these file systems since it ties their application to a specific file system for I/O. Instead, MPI-IO should be used to access these file systems, because the MPI-IO implementation provides native access to the file systems anyway. Using MPI-IO makes applications portable to any kind of parallel environment, while providing at least rudimentary parallel I/O.

In many cases, the model developer may prefer to perform I/O using an interface developed specifically for the file format in common use. These domain-specific file programming interfaces are typically written for sequential I/O, often for use only in serial applications. What’s needed is a parallel implementation of such file interfaces, preferably layered on top of MPI-IO so that they can take advantage of its collective operations and its interfaces to parallel, cluster, and global file systems.

Enter parallel netCDF and parallel HDF5, versions of the network Common Data Form and the Hierarchical Data Format, respectively. netCDF was developed by the University Corporation for Atmospheric Research (UCAR) and is maintained by its Unidata branch (http://www.unidata.ucar.edu/packages/netcdf). NetCDF is used by atmospheric scientists and climate modelers. HDF, developed by the National Center for Supercomputing Application (NCSA) at the University of Illinois at Urbana-Champagne (http://hdf.ncsa.uiuc.edu), is commonly used by the satellite and remote sensing communities.

Both HDF and netCDF consist of application programming interfaces (APIs) and self-describing file formats containing metadata and data all in a single file. HDF files contain various objects or files within them making it a more complex and potentially more powerful method for storing data. NetCDF, on the other hand, is simpler. It contains a header where metadata is stored and a subsequent data section containing the array-based data frequently used by earth scientists and modelers. Both formats are binary yet are portable to any computer platform. Only parallel netCDF will be described here.

The Standard netCDF Interface

The netCDF interface and file format has been around for many years. NetCDF data is self-describing since information about the data is contained in the same file. Many conventions have developed describing standard tags and attributes that should be contained in netCDF headers. NetCDF data is architecture-independent because data is represented in a standard binary format called eXternal Data Representation, or XDR.

Data can be accessed directly and efficiently through the netCDF API, and data can be appended to a netCDF dataset at a later time. Normally, one process can write to a netCDF file while multiple processes may simultaneously read the same file. However, for most parallel applications this is insufficient.

The standard API is available for AIX, HPUX, IRIX, Linux, Mac OS, OSF1, SunOS, UNICOS, and various versions of Windows. It can be used with C, C++, FORTRAN, FORTRAN-90, as well as other programming languages.

To use the standard netCDF API in a parallel application, one must usually ship data to a single process that performs all I/O operations. This is inefficient and cumbersome, particularly if the machine on which the process runs has insufficient memory to hold the otherwise-distributed data. In such a case, data must be brought in a block at a time and written to disk, thus slowing computational processing.

Listing One contains an example program called sequential.c that demonstrates how data generated in parallel is typically written to a netCDF file using the standard interface. (As usual for examples, very little error checking is done for the sake of brevity.) This program assumes it will be run on an even number of MPI processes so that data is evenly divided among them.

In sequential.c., MPI is initialized using MPI_Init() just inside main(). The process rank is collected using MPI_Comm_rank(); the number of processes involved in the computation is found using MPI_Comm_size(); and the processor name using MPI_Get_processor_name(). Then the first MPI process creates the netCDF output file using nc_create() and defines all the dimensions, variables, and attributes to be contained in the file. The call to nc_enddef() moves the file from define mode to data mode.

Listing One: sequential.c

#include <stdio.h>
#include <time.h>
#include <strings.h>
#include <netcdf.h>
#include <mpi.h>

#define NLON 128
#define NLAT 64

double *make_temp(int len)
{
int i;
double *temp;

if (!(temp = (double *)malloc(len * sizeof(double)))) {
perror(“temp”);
}
for (i = 0; i < len; i++)
temp[i] = 278.15 + (i * 0.001);

return temp;
}

void handle_error(int status)
{
if (status != NC_NOERR) {
fprintf(stderr, “%s\n”, nc_strerror(status));
exit(1);
}
return;
}

int main(int argc, char** argv)
{
int rank, size, namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
int ncid, lonid, latid, timeid, Tid, Tdimids[3], len;
size_t counts[3], starts[3];
char *Tname = “temperature”, *Tunits = “K”;
double Trange[] = { 0.0, 500.0 }, *temp, *alltemps;
char *title = “Global surface temperature data”, history[256], buf[26];
time_t tm;

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Get_processor_name(processor_name, &namelen);
printf(“Rank %d of %d started on %s\n”, rank, size, processor_name);

if (!rank) {
/* Create the netCDF dataset and define everything */
handle_error(nc_create(“global_data_seq.nc”, 0, &ncid));
nc_def_dim(ncid, “lon”, NLON, &lonid);
nc_def_dim(ncid, “lat”, NLAT, &latid);
nc_def_dim(ncid, “time”, NC_UNLIMITED, &timeid);
Tdimids[0] = timeid, Tdimids[2] = lonid, Tdimids[1] = latid;
nc_def_var(ncid, “T”, NC_DOUBLE, 3, Tdimids, &Tid);
nc_put_att_text(ncid, Tid, “long_name”, strlen(Tname), Tname);
nc_put_att_text(ncid, Tid, “units”, strlen(Tunits), Tunits);
nc_put_att_double(ncid, Tid, “valid_range”, NC_DOUBLE, 2, Trange);
nc_put_att_text(ncid, NC_GLOBAL, “title”, strlen(title), title);
tm = time((time_t *)NULL);
(void)ctime_r(&tm, buf);
if (buf[24] = ’\n’) buf[24] = ’\0’;
sprintf(history, “Created %s”, buf);
nc_put_att_text(ncid, NC_GLOBAL, “history”, strlen(history), history);
nc_enddef(ncid);
}

len = NLON * NLAT / size;
/* Read or compute temperatures */
temp = make_temp(len);
/* Gather temperatures to master process for writing */
if (!rank) {
if (!(alltemps = (double *)malloc(NLON * NLAT * sizeof(double))))
perror(“alltemps”);
}
if (MPI_Gather(temp, len, MPI_DOUBLE, alltemps, len, MPI_DOUBLE, 0,
MPI_COMM_WORLD) != MPI_SUCCESS)
fprintf(stderr, “%d: Error in MPI_Gather()\n”, rank);
if (!rank) {
starts[0] = starts[1] = starts[2] = 0;
counts[0] = 1, counts[1] = NLAT, counts[2] = NLON;
handle_error(nc_put_vara_double(ncid, Tid, starts, counts, alltemps));
nc_close(ncid);
free(alltemps);
}

free(temp);
MPI_Finalize();
}

At this point, the vector lengths for each process are computed and temperature vectors are constructed and filled in the call to make_temp(). Upon return from that call, the first process allocates memory to hold the temperatures for the entire globe from all processes. Next, MPI_Gather() is called to transfer the subsets of temperatures from all processes to the first process. The first process sets start and count vectors and calls nc_put_vara_double() with the alltemps buffer containing all the temperatures to store the data into the netCDF file. Then the first process closes the netCDF file by calling nc_close() and frees the temporary global temperature array alltemps. Finally, all processes free their smaller vector of temperatures before calling MPI_Finalize() and exiting.

Figure One shows the results of compiling and running sequential.c with two processes using MPICH. The ncdump program, a standard netCDF utility, is used to view the contents of the netCDF file. All the header information looks correct, and the data section contains values which we expect from the program.

Figure One: Results of running Listing One

[forrest@node01 pnetcdf]$ mpicc –O –o sequential sequential.c –lnetcdf
[forrest@node01 pnetcdf]$ time mpirun –np 2 sequential
Rank 0 of 2 started on node01.cluster.ornl.gov
Rank 1 of 2 started on node02.cluster.ornl.gov

real 0m0.706s
user 0m0.045s
sys 0m0.172s
[forrest@node01 pnetcdf]$ ncdump global_data_seq.nc | more
netcdf global_data_seq {
dimensions:
lon = 128 ;
lat = 64 ;
time = UNLIMITED ; // (1 currently)
variables:
double T(time, lat, lon) ;
T:long_name = “temperature” ;
T:units = “K” ;
T:valid_range = 0., 500. ;

// global attributes:
:title = “Global surface temperature data” ;
:history = “Created Tue Apr 13 01:32:12 2004” ;
data:

T =
278.15, 278.151, 278.152, 278.153, 278.154, 278.155, 278.156, 278.157,
278.158, 278.159, 278.16, 278.161, 278.162, 278.163, 278.164,
278.165,
278.166, 278.167, 278.168, 278.169, 278.17, 278.171, 278.172,
278.173,

282.23, 282.231, 282.232, 282.233, 282.234, 282.235, 282.236,
282.237,
282.238, 282.239, 282.24, 282.241, 282.242, 282.243, 282.244,
282.245 ;
}

A Parallel Implementation

While programs like sequential.c work well for small numbers of nodes, they do not scale well. The process performing I/O becomes a serious bottleneck, and memory limitations can make the problem worse.

To solve these problems, researchers at Argonne National Laboratory have developed a parallel implementation of the netCDF API that retains complete file compatibility with the standard (sequential) implementation (http://www-unix.mcs.anl.gov/parallel-netcdf/). Since it’s written on top of MPI-IO, this new parallel implementation of the netCDF library allows the developer to leverage the collective operations and parallel file system access (if appropriate) in the MPI-IO layer.

Just as with the serial interface, netCDF file access occurs in one of two modes: define mode for manipulating the metadata header or declaring new variables, and data mode for reading and writing array data. The parallel implementation uses many of the library calls familiar to developers; however, the prefix to these function names is changed to ncmpi_ for C and nfmpi_ for Fortran to avoid naming clashes.

In data mode, the parallel netCDF implementation has two modes of operation: collective data mode and independent data mode. In collective data mode (the default), all processes must call the same function on the same netCDF file handle at the same point in the code; however, different parameters for these calls are acceptable. In independent data mode, each process can perform different I/O operations independently. It is expected that most developers will want to use collective data mode.

Two varieties of library calls are available for storing data when in data mode: the high level data mode and the flexible data mode interfaces. The high level data mode interface resembles more closely the original netCDF functions, but the flexible data mode interfaces takes advantage of MPI functionality by providing better handling of internal data. In both cases, the collective functions all end in _all.

Listing Two contains a program called collective.c that’s very much like Listing One. However, in this program, all the netCDF calls have been replaced with matching parallel netCDF calls, which begin with ncmpi_. In collective.c, all MPI processes call the parallel netCDF functions to define the dataset and store data into it. Since headers are assumed to be small, a single process actually performs the I/O operations in define mode once ncmpi_enddef() is called.

Listing Two: collective.c

#include <stdio.h>
#include <time.h>
#include <strings.h>
#include <pnetcdf.h>
#include <mpi.h>

#define NLON 128
#define NLAT 64

double *make_temp(int len)
{
int i;
double *temp;

if (!(temp = (double *)malloc(len * sizeof(double)))) {
perror(“temp”);
}
for (i = 0; i < len; i++)
temp[i] = 278.15 + (i * 0.001);

return temp;
}

void handle_error(int status)
{
if (status != NC_NOERR) {
fprintf(stderr, “%s\n”, ncmpi_strerror(status));
exit(1);
}
return;
}

int main(int argc, char** argv)
{
int rank, size, namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
int ncid, lonid, latid, timeid, Tid, Tdimids[3], len;
size_t counts[3], starts[3];
char *Tname = “temperature”, *Tunits = “K”;
double Trange[] = { 0.0, 500.0 }, *temp, *alltemps;
char *title = “Global surface temperature data”, history[256], buf[26];
time_t tm;

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Get_processor_name(processor_name, &namelen);
printf(“Rank %d of %d started on %s\n”, rank, size, processor_name);

/* Create the netCDF dataset and define everything */
handle_error(ncmpi_create(MPI_COMM_WORLD, “global_data_col.nc”, 0,
MPI_INFO_NULL, &ncid));
ncmpi_def_dim(ncid, “lon”, NLON, &lonid);
ncmpi_def_dim(ncid, “lat”, NLAT, &latid);
ncmpi_def_dim(ncid, “time”, NC_UNLIMITED, &timeid);
Tdimids[0] = timeid, Tdimids[2] = lonid, Tdimids[1] = latid;
ncmpi_def_var(ncid, “T”, NC_DOUBLE, 3, Tdimids, &Tid);
ncmpi_put_att_text(ncid, Tid, “long_name”, strlen(Tname), Tname);
ncmpi_put_att_text(ncid, Tid, “units”, strlen(Tunits), Tunits);
ncmpi_put_att_double(ncid, Tid, “valid_range”, NC_DOUBLE, 2, Trange);
ncmpi_put_att_text(ncid, NC_GLOBAL, “title”, strlen(title), title);
tm = time((time_t *)NULL);
(void)ctime_r(&tm, buf);
if (buf[24] = ’\n’) buf[24] = ’\0’;
sprintf(history, “Created %s”, buf);
ncmpi_put_att_text(ncid, NC_GLOBAL, “history”, strlen(history),
history);
ncmpi_enddef(ncid);

len = NLON * NLAT / size;
/* Read or compute temperatures */
temp = make_temp(len);
/* Don’t gather temperatures, just do a collective write */
starts[0] = starts[2] = 0;
starts[1] = NLAT / size * rank;
counts[0] = 1, counts[1] = NLAT / size, counts[2] = NLON;
handle_error(ncmpi_put_vara_all(ncid, Tid, starts, counts, temp, len,
MPI_DOUBLE));
ncmpi_close(ncid);

free(temp);
MPI_Finalize();
}

Just as before, each process calls make_temp() to generate a subset of the global temperature data. But upon return, it isn’t necessary for a single process to allocate more memory and gather the data from all other processes. Instead, the processes figure out where their data should start and how large it is (storing these values in the starts and counts vectors). Then all processes call ncmpi_put_vara_all() to store their data into the netCDF dataset. Next, all processes call ncmpi_close() to close the file, free() to free their subset of data, and MPI_Finalize() to stop MPI.

The ncmpi_put_vara_all() call is a flexible data mode function. It is passed not only the pointer to the buffer, but also the length and its MPI datatype (MPI_DOUBLE). Similar high level data mode functions may also be used to store data.

Notice that the counts and starts vectors in collective.c are of type size_t as they were in sequential.c. Some of the parallel netCDF documentation states that these vectors should be of type MPI_Offset (which is long long) to support large files. However, in the version of the library presently available (0.9.3), these are still of type size_t.

Figure Two contains the results of compiling and running collective.c on two nodes. To ensure that the same results were obtained, the ncdump utility was used to convert the netCDF files into ASCII. Then diff was used to compare the two ASCII files. As can be seen in Figure Two, only the dataset names and creation times differ. Otherwise, the two files are identical.

FIGURE TWO: The results of running Listing Two

[forrest@node01 pnetcdf]$ mpicc -O \
-I/usr/local/parallel-netcdf-0.9.3/include –o collective collective.c \
-L/usr/local/parallel-netcdf-0.9.3/lib -lpnetcdf
[forrest@node01 pnetcdf]$ time mpirun –np 2 collective
Rank 0 of 2 started on node01.cluster.ornl.gov
Rank 1 of 2 started on node02.cluster.ornl.gov
real 0m0.336s
user 0m0.023s
sys 0m0.121s
[forrest@node01 pnetcdf]$ ncdump global_data_seq.nc > seq
[forrest@node01 pnetcdf]$ ncdump global_data_col.nc > col
[forrest@node01 pnetcdf]$ diff seq col
1c1
< netcdf global_data_seq {

> netcdf global_data_col {
14c14
< :history = “Created Tue Apr 13 01:32:12 2004” ;

> :history = “Created Tue Apr 13 01:34:48 2004” ;

While these example programs are too small for benchmarking performance, it’s apparent that collective I/O will offer better performance for large problems. Moreover, the parallel netCDF library allows model developers to avoid allocating temporary memory and performing gather operations on big array just to store the data to disk.