这里总结MPI集合通信中的 N->N 通信,多个进程将数据经过某种处理以后在发散到同组内的其他进程中,这些进程可以看成是其他 N->1 和 1->N 通信的组合。

MPI_Allgather

此操作将分散在各个进程中的数据进行收集操作,然后再将收集到的向量广播到同组内的所有进程中,也就是先使用MPI_Gather,然后再使用MPI_Bcast。与MPI_Gather相同,收集到的数据顺序与进程号的顺序相同。

看一下函数原型,基本上与MPI_Gather相同,但是这回不需要指定根进程了。

1
2
3
4
5
6
7
MPI_Allgather(void* send_data,
int send_count,
MPI_Datatype send_datatype,
void* recv_data,
int recv_count,
MPI_Datatype recv_datatype,
MPI_Comm communicator)

下面我写了个简单的小例子来看看效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#define SIZE 5
int main(int argc, char ** argv)
{
int sb[SIZE], rank, nproc;
MPI_Comm comm = MPI_COMM_WORLD;
char outstr[nproc*SIZE*100];
MPI_Init(&argc, &argv);
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &nproc);
// Allocate receive buffer.
int * rb = (int *)malloc(nproc*SIZE*sizeof(int));
// Fill send buffer.
for (int i = 0; i < SIZE; ++i)
{
sb[i] = SIZE*rank + i;
}
MPI_Allgather(sb, SIZE, MPI_INT, rb, SIZE, MPI_INT, comm);
sprintf(outstr, "proc: %d. ", rank);
for (int i = 0; i < SIZE*nproc; ++i)
{
sprintf(outstr, "%srb[%d] = %d, ",outstr, i, rb[i]);
}
fprintf(stderr, "%s\n", outstr);
free(rb);
MPI_Finalize();
return 0;
}

运行结果:

1
2
3
4
5
6
[zjshao@master 4-3-1]$ mpiexec -n 4 -host node01 allgather.x
proc: 0. rb[0] = 0, rb[1] = 1, rb[2] = 2, rb[3] = 3, rb[4] = 4, rb[5] = 5, rb[6] = 6, rb[7] = 7, rb[8] = 8, rb[9] = 9, rb[10] = 10, rb[11] = 11, rb[12] = 12, rb[13] = 13, rb[14] = 14, rb[15] = 15, rb[16] = 16, rb[17] = 17, rb[18] = 18, rb[19] = 19,
proc: 1. rb[0] = 0, rb[1] = 1, rb[2] = 2, rb[3] = 3, rb[4] = 4, rb[5] = 5, rb[6] = 6, rb[7] = 7, rb[8] = 8, rb[9] = 9, rb[10] = 10, rb[11] = 11, rb[12] = 12, rb[13] = 13, rb[14] = 14, rb[15] = 15, rb[16] = 16, rb[17] = 17, rb[18] = 18, rb[19] = 19,
proc: 2. rb[0] = 0, rb[1] = 1, rb[2] = 2, rb[3] = 3, rb[4] = 4, rb[5] = 5, rb[6] = 6, rb[7] = 7, rb[8] = 8, rb[9] = 9, rb[10] = 10, rb[11] = 11, rb[12] = 12, rb[13] = 13, rb[14] = 14, rb[15] = 15, rb[16] = 16, rb[17] = 17, rb[18] = 18, rb[19] = 19,
proc: 3. rb[0] = 0, rb[1] = 1, rb[2] = 2, rb[3] = 3, rb[4] = 4, rb[5] = 5, rb[6] = 6, rb[7] = 7, rb[8] = 8, rb[9] = 9, rb[10] = 10, rb[11] = 11, rb[12] = 12, rb[13] = 13, rb[14] = 14, rb[15] = 15, rb[16] = 16, rb[17] = 17, rb[18] = 18, rb[19] = 19,
[zjshao@master 4-3-1]$

这里我没有收集单个数而是一数组,所以输出会比较多。

MPI_Allreduce

其实就是将上面的收集操作替换成了规约操作,其他过程是一样的,组内所有进程都作为根执行一次规约操作,操作完毕后所有进程接收缓冲区的数据均相同。这个操作等价于先进程一次MPI_Reduce然后再执行一次MPI_Bcast

函数原型:

1
2
3
4
5
6
int MPI_Allreduce(const void *sendbuf,
void *recvbuf,
int count,
MPI_Datatype datatype,
MPI_Op op,
MPI_Comm comm)

其中count为发送进程缓冲区中发送数据的个数。

下面写了个小例子来看下效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
int main(int argc, char ** argv)
{
int rank, nproc, sb, rb;
MPI_Comm comm = MPI_COMM_WORLD;
MPI_Init(&argc, &argv);
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &nproc);
sb = rank + 1;
MPI_Allreduce(&sb, &rb, 1, MPI_INT, MPI_SUM, comm);
fprintf(stderr, "process %d: sb = %d, rb = %d\n", rank, sb, rb);
MPI_Finalize();
return 0;
}

执行结果:

1
2
3
4
5
6
[zjshao@master 4-3-2]$ mpiexec -n 4 allreduce.x
process 1: sb = 2, rb = 10
process 3: sb = 4, rb = 10
process 0: sb = 1, rb = 10
process 2: sb = 3, rb = 10
[zjshao@master 4-3-2]$

整个过程可以大致用下图来表示:

MPI_Reduce_scatter

本操作首先对多个向量进行规约处理,然后再将规约后的向量发散到同组内的其他进程中。相当于以某个进程为根,执行一次规约操作后跟一次发散操作。

函数原型:

1
2
3
4
5
6
int MPI_Reduce_scatter(const void *sendbuf,
void *recvbuf,
const int recvcounts[],
MPI_Datatype datatype,
MPI_Op op,
MPI_Comm comm)

这里需要注意的是const int * recvcounts这个参数,他并不是个整型而是一个int指针,我么可以向其传递一个数组,数组的长度应该与进程数相同,数组中的数表示每个进程中接收得到的数据个数,下面的例子中全部为1。

来看例子吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#include "mpi.h"
#include <stdio.h>
#define SIZE 4
int main(int argc, char ** argv)
{
int rank, nproc, rb;
int sb[SIZE] = {1, 2, 3, 4};
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
int rcnt[SIZE] = {1, 1, 1, 1};
MPI_Reduce_scatter(sb, &rb, rcnt, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
fprintf(stderr, "process %d: rb = %d\n", rank, rb);
MPI_Finalize();
return 0;
}

这个程序与上面图片的过程相同,执行结果:

1
2
3
4
5
6
[zjshao@master 4-3-3]$ mpiexec -n 4 reduce_scatter.x
process 1: rb = 8
process 2: rb = 12
process 3: rb = 16
process 0: rb = 4
[zjshao@master 4-3-3]$

MPI_Alltoall

此操作将一个通信子内的所有进程都执行一次发散操作,从图上看类似矩阵转置的效果,只不过这里的0轴是进程。

函数原型:

1
2
3
4
5
6
7
int MPI_Alltoall(const void *sendbuf,
int sendcount,
MPI_Datatype sendtype,
void *recvbuf,
int recvcount,
MPI_Datatype recvtype,
MPI_Comm comm)

这里需要注意的是int sendcountint recvcount参数,这里是针对一个进程的发散操作而言的,而不是整体。
例如下面这张图中的过程,整两个参数都为1,而不是4

下面直接上代码例子吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include "mpi.h"
#include <stdio.h>
#define SIZE 4
int main(int argc, char ** argv)
{
int rank, nproc;
int sb[SIZE], rb[SIZE];
MPI_Comm comm = MPI_COMM_WORLD;
char outstr[50*SIZE + 100];
MPI_Init(&argc, &argv);
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &nproc);
// Fill send buffer.
for (int i = 0; i < SIZE; ++i)
{
sb[i] = SIZE*rank + i + 1;
}
// All to all.
MPI_Alltoall(sb, 1, MPI_INT, rb, 1, MPI_INT, comm);
sprintf(outstr, "proc %d:\n", rank);
// Output send buffer.
for (int i = 0; i < SIZE; ++i)
{
sprintf(outstr, "%ssb[%d] = %d, ", outstr, i, sb[i]);
}
sprintf(outstr, "%s\n", outstr);
// Output receive buffer.
for (int i = 0; i < SIZE; ++i)
{
sprintf(outstr, "%srb[%d] = %d, ", outstr, i, rb[i]);
}
sprintf(outstr, "%s\n", outstr);
fprintf(stderr, "%s", outstr);
MPI_Finalize();
return 0;
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[zjshao@master 4-3-4]$ mpiexec -n 4 alltoall.x
proc 0:
sb[0] = 1, sb[1] = 2, sb[2] = 3, sb[3] = 4,
rb[0] = 1, rb[1] = 5, rb[2] = 9, rb[3] = 13,
proc 1:
sb[0] = 5, sb[1] = 6, sb[2] = 7, sb[3] = 8,
rb[0] = 2, rb[1] = 6, rb[2] = 10, rb[3] = 14,
proc 2:
sb[0] = 9, sb[1] = 10, sb[2] = 11, sb[3] = 12,
rb[0] = 3, rb[1] = 7, rb[2] = 11, rb[3] = 15,
proc 3:
sb[0] = 13, sb[1] = 14, sb[2] = 15, sb[3] = 16,
rb[0] = 4, rb[1] = 8, rb[2] = 12, rb[3] = 16,
[zjshao@master 4-3-4]$

MPI_Scan

这个是个逐级执行规约操作,即进程i对进程0, 1, … i进行规约,还是看图会直观些:

其他的没有什么要注意的参数也是规约操作的类似,直接上例子,操作过程如下图:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include "mpi.h"
#include <stdio.h>
int main(int argc, char ** argv)
{
int rank, nproc, sb, rb;
MPI_Comm comm = MPI_COMM_WORLD;
MPI_Init(&argc, &argv);
MPI_Comm_rank(comm, &rank);
MPI_Comm_size(comm, &nproc);
sb = rank + 1;
MPI_Scan(&sb, &rb, 1, MPI_INT, MPI_SUM, comm);
fprintf(stderr, "proc: %d, sb = %d, rb = %d.\n", rank, sb, rb);
MPI_Finalize();
return 0;
}

结果

1
2
3
4
5
6
[zjshao@master 4-3-5]$ mpiexec -n 4 scan.x
proc: 0, sb = 1, rb = 1.
proc: 1, sb = 2, rb = 3.
proc: 2, sb = 3, rb = 6.
proc: 3, sb = 4, rb = 10.
[zjshao@master 4-3-5]$

Comments