Contents

Ways to Run CPL Library

Multiple Program Multiple Data (MPMD)

Currently two coupled codes are compiled separately. They will link through CPL library by running both using MPI's MPMD mode

mpiexec -n 64 ./md : -n 8 ./cfd

with all exchanges between them through CPL_send, CPL_recv using the single MPI_COMM_WORLD created by MPMD jobs. The problem with this approach is that they share a single MPI_COMM_WORLD. When we come to new codes, these require MPI_COMM_WORLD to be changed throughout the code if any communication in the code assume they are the only code in the world (pretty much every software does!). Although not too much work, the changes must then be maintained as a patch or re-applied with each new version of the code from the downstream repository. As an example, the LAMMPS socket (or APP) currently has four different patches just from changes in the last two years (and this is an established code).

MPI_Spawn -- Creating runs as we need them

Much better is to provide a function,

cplexec -n 64 ./md : -n 8 ./cfd

which creates two codes with their own MPI_COMM_WORLD. All intercomms will then be setup by CPL library and no changes are required to each code. One promising candidate to do this would be to write a C++ program which uses MMPI_Comm_spawn based on the command line input supplied. This would look something like,

#include <stdio.h>
#include <stdlib.h> 
#include <string.h>
#include "mpi.h"

#define NREALMS 2


int main(int argc, char **argv)
{

    //Parse commandline and convert to exec names, universe size and processors
    int n = 0;
    int UNIVERSE_SIZE = 0;
    int nprocs[NREALMS];
    char *ptr;
    char *execnames[NREALMS];


    MPI_Comm parentcomm, intercomm[NREALMS];
    MPI_Info infos[2] = { MPI_INFO_NULL, MPI_INFO_NULL };


    //Parse input arguments
    for (int i = 0; i < argc; ++i)
    {
        if ( strcmp("-n",argv[i]) == 0){
            int nproc = strtol(argv[i+1], &ptr, 10);
            UNIVERSE_SIZE += nproc;
            nprocs[n] = nproc;
            execnames[n] = argv[i+2];
            printf("%d, %d, %s, %s \n", nprocs[n], UNIVERSE_SIZE, argv[i+2], execnames[n]);
            n += 1;
        }
    }

    // Now spawn the programs you have parsed
    int ierr = MPI_Init( NULL, NULL);
    MPI_Comm_get_parent( &parentcomm );
    if (parentcomm == MPI_COMM_NULL)
    {

        //Loop and spawn
        for (int n = 0; n < 2; ++n){
            int errcodes[nprocs[n]];
            ierr = MPI_Comm_spawn( execnames[n], MPI_ARGV_NULL, nprocs[n], 
                                   MPI_INFO_NULL, 0, MPI_COMM_WORLD, &intercomm[n], errcodes );
        }

    } else {
        printf("ERROR -- parent comm has it's own parent!?\n");
        return 1;
    }
    fflush(stdout);
    ierr = MPI_Finalize(); 
    return 0;

}

compiled using mpic++ ./code.cpp -o ./cplexec and run as above,

cplexec -n 64 ./md -n 8 ./cfd

with sample md.exe and cfd.exe of the form,

//StackWorkers
#include <iostream>
#include <mpi.h>
#include <string.h>
#include <stdlib.h>     /* malloc, free, rand */

int main(int argc, char *argv[])
{
    int ierr = MPI_Init(NULL, NULL);
    MPI_Comm parent, comm;
    ierr = MPI_Comm_get_parent(&parent);
    int np; int rank; int nu; int ncp; int crank;
    MPI_Comm_size (MPI_COMM_WORLD, &np); // Find out number of processes
    MPI_Comm_rank (MPI_COMM_WORLD, &rank); // Find out process rank
    MPI_Comm_remote_size(parent, &nu);
    comm = MPI_COMM_WORLD;

    printf("I'm the spawned %s %d %d %d %d %d %d \n", argv[0], parent, np, rank, nu, ncp, crank );

    //Allocate and array and gather some data
    float sub_avg = float(rank);
    float *sub_avgs = NULL;
    if (rank == 0) {
      sub_avgs = (float *)malloc(sizeof(float) * np);
    }
    MPI_Gather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, 0,
               MPI_COMM_WORLD);

    if (rank == 0) {
        for (int i = 0; i < np; ++i)
            printf("proc %d send %f \n ", i, sub_avgs[i]);
    }
    fflush(stdout);
    ierr = MPI_Finalize();
    return 0;
}

There is a major problem with all this: The process which spawns the CFD and MD child processes is then sitting doing nothing. I don't think we can simple finalise it and be sure it won't take the children with it. One option would be to merge the parent into one of the child runs. This can be achieved by adding,

    //Merge parent with cfd processors
    MPI_Comm comm;
    MPI_Intercomm_merge(intercomm[1], 0, &comm );
    MPI_Barrier(comm);

on the parent and,

    if ( strcmp("./cfd.exe",argv[0]) == 0){
        MPI_Intercomm_merge(parent, 1, &comm );
        MPI_Barrier(comm);
    }else{
        comm = MPI_COMM_WORLD;
    }

to the child. However, the merged cfd and parent still don't share an MPI_COMM_WORLD so this won't work in general! In addition, the topology may be extremely badly setup with the parent on an entirely different node to the remaining cfd processors which it will commonly communicate (spawn is discouraged for reasons of efficiency in general).

A Better Option -- Create both Runs and link them using MPI_Port

The use of MPI ports and connect presents a better options. This allow two previous unrelated MPI jobs to link up and begin sharing an intercomm. In principle, this requires names to be posted on an ompi-server (or mpich equivalent). However, for scientific computing, writing to a shared file space is a common feature and one which we can safely assume is possible. In fact, many coupling schemes actually go no futher than writing and reading based coupling between code.

So, if we were to open a port in one MPI program and write this to a file. All we would need to do in program two is to read the port from this file, connect and begin sending information. The code to do this,

#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
#include <fstream>

using namespace std;

int main( int argc, char *argv[] )
{
    int num_errors = 0;
    int rank, size;
    char port1[MPI_MAX_PORT_NAME];
    char port2[MPI_MAX_PORT_NAME];
    MPI_Status status;
    MPI_Comm comm1, comm2;
    int data = 0;

    char *ptr;
    int runno = strtol(argv[1], &ptr, 10);
    for (int i = 0; i < argc; ++i)
        printf("inputs %d %d %s \n", i,runno, argv[i]);

    MPI_Init(&argc, &argv);
    MPI_Comm_size(MPI_COMM_WORLD, &size);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);

    if (runno == 0)
    {
        printf("0: opening ports.\n");fflush(stdout);
        MPI_Open_port(MPI_INFO_NULL, port1);
        printf("opened port1: <%s>\n", port1);

        //Write port file
        ofstream myfile;
        myfile.open("port");
        if( !myfile )
                cout << "Opening file failed" << endl;
        myfile << port1 << endl;
        if( !myfile )
            cout << "Write failed" << endl;
        myfile.close();

        printf("Port %s written to file \n", port1); fflush(stdout);

        printf("accepting port1.\n");fflush(stdout);

        //Establish connection and send data
        MPI_Comm_accept(port1, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm1);

        printf("sending 5 \n");fflush(stdout);
        data = 5;
        MPI_Send(&data, 1, MPI_INT, 0, 0, comm1);
        MPI_Close_port(port1);
    }
    else if (runno == 1)
    {

        //Read port file
        size_t   chars_read = 0;  
        ifstream myfile;
        //Wait until file exists and is avaialble
        myfile.open("port");
        while(!myfile){
            myfile.open("port");
            cout << "Opening file failed" << myfile << endl;
            usleep(30000);
        }
        while( myfile && chars_read < 255 ) {
            myfile >> port1[ chars_read ];    
            if( myfile ) 
                 ++chars_read; 
            
            if( port1[ chars_read - 1 ] == '\n' ) 
                 break;
        }
        printf("Reading port %s from file \n", port1); fflush(stdout);
        remove( "port" );

        //Establish connection and recieve data
        MPI_Comm_connect(port1, MPI_INFO_NULL, 0, MPI_COMM_SELF, &comm1);
        MPI_Recv(&data, 1, MPI_INT, 0, 0, comm1, &status);
        printf("Received %d 1\n", data); fflush(stdout);

    }

    //Barrier on intercomm before disconnecting
    MPI_Barrier(comm1);
    MPI_Comm_disconnect(&comm1);
    MPI_Finalize();
    return 0;
}

This code should be compiled and then run as two separate executables with run number runno<\code> specified by a command line option,

<code> mpiexec ./a.out 0 & mpiexec ./a.out 1

This means that the CPL_init can be written to check if MPI_COMM_WORLD contains everything and if not, attempt to open a socket between the two codes. Once open, this can be used as CPL_INTER_COMM is in the main setup and extended to an intracomm by merging the two wit MPI_intercomm_create(CPL_REALM_COMM, comm_size - 1, CPL_WORLD_COMM ... etc

This does all mean that the cplexec will not be needed, instead both codes can simply be run using something like,

mpiexec -n 64 ./md.exe > MD_run_hist & mpiexec -n 8 ./cfd.exe > CFD_run_hist

It is this approach that CPL library uses. A function, cplexec, is still provided to create both jobs as Python subprocesses, check all libraries are consistent between both runs, kill both jobs neatly if either fails and attempt to ensure any errors are raised correctly.