这篇文章上次修改于 1527 天前,可能其部分内容已经发生变化,如有疑问可询问作者。

主从编程模型

上文中介绍了一种通过rank值进行任务划分的并行编程模式。但是这种方式在某些场景下可能无法使用,或者计算资源不能充分利用。

场景一:任务可能在运行时增加的,提前无法确定总的任务量,这种场景下我们简单的直接将任务平均分配到每个进程。

场景二:每个任务的计算量不同,无法提前确定每个任务的计算时间,平均分配到每个节点,这种场景下如果我们简单的将任务平均分配到每个进程,会导致某些节点可能很快运行结束,而有些节点还剩余大量待处理任务。导致计算资源的浪费。

解决方案:这时我们可以通过将进程分为master, slaver,由master进程负责任务分配,slaver节点在单个任务处理完成后向master节点请求新任务,通过这种方式,可以在任务未完成之前每个节点均在进行计算,提高资源利用率。

实现方式

程序流程:

  1. 将rank值为0的进程作为master进程,由它进行任务的分配。
  2. 其他任务向master进程获取任务信息,进行计算。
  3. 当所有任务分配完毕后,slaver进程请求新任务时master会返回结束信息。

在实现这种流程时,一个主要的接口就是消息传递接口,MPI给我们提供了MPI_SendMPI_Recv 接口可以使用。

先看下接口定义

int MPI_Send( void *buf, int count, MPI_Datatype datatype, int dest,
                     int tag, MPI_Comm comm );
    buf: 发送数据的起始地址
    count: 发送数据的数量
    datatype: 发送数据的类型
    dest: 目标进程rank
    tag: 消息标识
    comm: 通信域
int MPI_Recv( void *buf, int count, MPI_Datatype datatype, int source,
                     int tag, MPI_Comm comm, MPI_Status *status )
    buf: 接收数据的内存地址
    count: 接收数量
    status: 接收状态
    datatype: 接收数据的类型
    source: 消息来源进程rank
    tag:    消息标识
    comm:    通信域

有了这两个接口,配合我们前面了解的几个接口,即可实现上述流程。

同样以0-99的任务进行举例说明,代码实现思路如下:

int rank;
int numprocs;
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
int wk[2] = {0, 0}; // work_id , exit_signal 
if(rank == 0) {
    //master 进程
    int current_work_id = 0; 
    int process_id; //slaver rank
    MPI_Status status;
    //对任务数量可变的情况,可以替换为队列不为空
    while(current_work_id <= 99) { 
        // 接收slaver请求
        MPI_Recv( &process_id, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, MPI_Status, &status ); 
        wk[0] = current_work_id;
        wd[1] = 0;
        MPI_Send( wk, 2, MPI_INT, process_id,MPI_ANY_TAG, MPI_COMM_WORLD ); // 向slaver发送任务id
        current_work_id += 1;
    }
    // 所有任务处理后,向所有slaver发送结束信号
    for(int i = 1;i < numprocs;i++) {
        wd[1] = 1;
        MPI_Send( wk, 2, MPI_INT, process_id,MPI_ANY_TAG, MPI_COMM_WORLD );
    }
} else {
    while(1) {
        //向master 进程请求新的任务
        MPI_Send( &process_id, 1, MPI_INT, 0,MPI_ANY_TAG, MPI_COMM_WORLD ); 
        //等待master响应任务分配情况
        MPI_Recv( wk, 2, MPI_INT, 0,MPI_ANY_TAG, MPI_COMM_WORLD, &status );
        if(wk[1] == 1) { 
            //如果收到exit信号
            break;
        } else { 
            //正常执行处理
            process(wk[0]);
        }
    }
}
MPI_Barrier(MPI_COMM_WORLD); // 等待所有任务直接完毕。