今天开始学习可重复(persistent)的非阻塞通信。

实际计算环境中,常在一个内部循环中重复使用相同的数据结构来传递不同的数据,这时候将数据结构与发送/接受动作绑定可以提高程序的效率。

可重复通信大致是

  1. 先使用MPI_Send_init()MPI_Recv_init()接口注册发送和接受动作,实际并不执行
  2. 在进程的其他位置通过重复通信控制函数,如MPI_Start(),来控制真正执行的时机
  3. 使用MPI_Request_free()来释放与重复非阻塞通信相关的通信对象并注销注册的重复非阻塞通信。MPI_Request_free()可在注册之后的任何地点调用,但仅当没有处在消息传递状态时,才可实际释放通信对象。

可重复的非阻塞标准通信

本篇主要总结可重复的非阻塞标准通信。


第一个例子,主要是可重复非阻塞发送不可重复的非阻塞接受匹配。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#define BUFSIZE 5
int main(int argc, char ** argv)
{
MPI_Request req;
MPI_Status stat;
int buf[BUFSIZE];
char pstr[BUFSIZE*(sizeof(int) + 8) + 50];
int tag = 123;
int dest = 0;
int rank, nproc, i, j;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
// Fill data to be sent.
for (i = 0; i < BUFSIZE; ++i)
{
buf[i] = BUFSIZE*rank + i;
}
// Create a persistent send request.
MPI_Send_init(buf, BUFSIZE, MPI_INT, dest, tag, MPI_COMM_WORLD, &req);
if (rank == 0)
{
int rbuf[BUFSIZE*nproc];
MPI_Request * rr = (MPI_Request *)malloc(nproc*sizeof(MPI_Request));
Receive data from other processes.
for (i = 0; i < nproc; ++i)
{
fprintf(stderr, "proc: %d before Irecv...\n", rank);
MPI_Irecv(rbuf + i*BUFSIZE, BUFSIZE, MPI_INT, i, tag, MPI_COMM_WORLD, rr + i);
fprintf(stderr, "proc: %d after Irecv...\n", rank);
}
// Send data.
MPI_Start(&req);
// Wait send done.
MPI_Wait(&req, &stat);
// Wait receive done.
MPI_Waitall(nproc, rr, MPI_STATUS_IGNORE);
// Info output.
for (i = 0; i < nproc; ++i)
{
sprintf(pstr, "proc: %d received messages from %d\n", rank, i);
for (j = 0; j < BUFSIZE-1; ++j)
{
sprintf(pstr, "%s rbuf[%d]=%d,", pstr, i*BUFSIZE+j, rbuf[i*BUFSIZE+j]);
}
sprintf(pstr, "%s rbuf[%d]=%d\n", pstr, i*BUFSIZE+j, rbuf[i*BUFSIZE+j]);
fprintf(stderr, "%s", pstr);
}
free(rr);
}
else
{
// If not master process.
MPI_Start(&req);
MPI_Wait(&req, &stat);
}
MPI_Request_free(&req);
MPI_Finalize();
return 0;
}

上面代码的大致过程是每个进程(包括主进程)都会向主进程(进程号为0)的进程发送数据,主进程通过不可重复的非阻塞接受MPI_Irecv()进行接受信息。
其中发送数据的进程只负责发送BUFSIZE的数组中的数据,主进程使用循环来将接收到的数据放到同一个大数组rbuf中,在等待非阻塞接受完成时,使用了MPI_Waitall()来代替通过循环方式的通信完成。
执行的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[zjshao@master 2-2-3-1]$ mpiexec -n 4 -host node01 send_init_irecv.x
proc: 0 before Irecv...
proc: 0 after Irecv...
proc: 0 before Irecv...
proc: 0 after Irecv...
proc: 0 before Irecv...
proc: 0 after Irecv...
proc: 0 before Irecv...
proc: 0 after Irecv...
proc: 0 received messages from 0
rbuf[0]=0, rbuf[1]=1, rbuf[2]=2, rbuf[3]=3, rbuf[4]=4
proc: 0 received messages from 1
rbuf[5]=5, rbuf[6]=6, rbuf[7]=7, rbuf[8]=8, rbuf[9]=9
proc: 0 received messages from 2
rbuf[10]=10, rbuf[11]=11, rbuf[12]=12, rbuf[13]=13, rbuf[14]=14
proc: 0 received messages from 3
rbuf[15]=15, rbuf[16]=16, rbuf[17]=17, rbuf[18]=18, rbuf[19]=19
[zjshao@master 2-2-3-1]$


可重复的阻塞接收MPI_Recv_init()与不可重复的非阻塞发送MPI_Isend()相匹配
所谓重复和不重复与是否阻塞没有关系,而是看是否重复利用同一个数据结构来发送和接收不同的数据。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
int main(int argc, char ** argv)
{
...
if (rank == 0)
{
MPI_Request rreq;
int rbuf[BUFSIZE];
// Create a persistent receive request.
MPI_Recv_init(rbuf, BUFSIZE, MPI_INT,
MPI_ANY_SOURCE, tag,
MPI_COMM_WORLD, &rreq);
// Send data to master process.
fprintf(stderr, "proc: %d before Isend...\n", rank);
MPI_Isend(buf, BUFSIZE, MPI_INT, 0, tag, MPI_COMM_WORLD, &sreq);
fprintf(stderr, "proc: %d after Isend...\n", rank);
// Start data receiving.
for (i = 0; i < nproc; ++i)
{
MPI_Start(&rreq);
MPI_Wait(&rreq, &stat);
sprintf(pstr, "proc: %d received messages from: %d\n",
rank, stat.MPI_SOURCE);
for (j = 0; j < BUFSIZE-1; ++j)
{
sprintf(pstr, "%s rbuf[%d]=%d,", pstr, j, rbuf[j]);
}
sprintf(pstr, "%s rbuf[%d]=%d\n", pstr, j, rbuf[j]);
fprintf(stderr, "%s", pstr);
}
MPI_Request_free(&rreq);
}
else
{
// Send data to master process.
fprintf(stderr, "proc: %d before Isend...\n", rank);
MPI_Isend(buf, BUFSIZE, MPI_INT, 0, tag, MPI_COMM_WORLD, &sreq);
fprintf(stderr, "proc: %d after Isend...\n", rank);
}
MPI_Wait(&sreq, &stat);
MPI_Finalize();
return 0;
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[zjshao@master 2-2-3-1]$ mpiexec -n 4 -host node01 recv_init_isend.x
proc: 1 before Isend...
proc: 2 before Isend...
proc: 2 after Isend...
proc: 3 before Isend...
proc: 3 after Isend...
proc: 0 before Isend...
proc: 1 after Isend...
proc: 0 after Isend...
proc: 0 received messages from: 0
rbuf[0]=0, rbuf[1]=1, rbuf[2]=2, rbuf[3]=3, rbuf[4]=4
proc: 0 received messages from: 2
rbuf[0]=10, rbuf[1]=11, rbuf[2]=12, rbuf[3]=13, rbuf[4]=14
proc: 0 received messages from: 3
rbuf[0]=15, rbuf[1]=16, rbuf[2]=17, rbuf[3]=18, rbuf[4]=19
proc: 0 received messages from: 1
rbuf[0]=5, rbuf[1]=6, rbuf[2]=7, rbuf[3]=8, rbuf[4]=9
[zjshao@master 2-2-3-1]$

可见,使用可重复的非阻塞接收,接收的顺序并不确定,哪个进程的信息先到达先接收哪个进程的信息。后接收的信息会覆盖先接收的数据
实际上可重复的非阻塞发送可以与任何接收动作相匹配,可重复的非阻塞可与任何发送动作相匹配。


下面的代码是将可重复的阻塞接收动作MPI_Recv()可重复的非阻塞发送动作MPI_Send_init()相匹配。
主进程不再向自己发送数据,而是其他的进程负责向主进程发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#define BUFSIZE 5
#define RECEIVER 0
int main(int argc, char ** argv)
{
MPI_Request sreq;
MPI_Status stat;
int buf[BUFSIZE];
char pstr[BUFSIZE*(sizeof(int) + 8) + 50];
int tag = 123;
int dest = 0;
int rank, nproc, i, j;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
// Fill data to be sent.
for (i = 0; i < BUFSIZE; ++i)
{
buf[i] = BUFSIZE*rank + i;
}
if (rank != RECEIVER)
{
MPI_Send_init(buf, BUFSIZE, MPI_INT, RECEIVER, tag, MPI_COMM_WORLD, &sreq);
}
if (rank == RECEIVER)
{
int rbuf[BUFSIZE];
for (i = 0; i < nproc-1; ++i)
{
fprintf(stderr, "proc: %d before MPI_Recv()...\n", rank);
MPI_Recv(rbuf, BUFSIZE, MPI_INT, MPI_ANY_SOURCE, tag, MPI_COMM_WORLD, &stat);
fprintf(stderr, "proc: %d after MPI_Recv()...\n", rank);
sprintf(pstr, "proc: %d received message from proc %d...\n", rank, stat.MPI_SOURCE);
for (j = 0; j < BUFSIZE-1; ++j)
{
sprintf(pstr, "%s rbuf[%d]=%d,", pstr, j, rbuf[j]);
}
sprintf(pstr, "%s rbuf[%d]=%d\n", pstr, j, rbuf[j]);
fprintf(stderr, "%s", pstr);
}
}
else
{
MPI_Start(&sreq);
fprintf(stderr, "proc: %d send started...\n", rank);
MPI_Wait(&sreq, &stat);
fprintf(stderr, "proc: %d send finished...\n", rank);
MPI_Request_free(&sreq);
}
MPI_Finalize();
return 0;
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
[zjshao@master 2-2-3-1]$ mpiexec -n 4 -host node01 send_init_recv.x
proc: 0 before MPI_Recv()...
proc: 1 send started...
proc: 1 send finished...
proc: 3 send started...
proc: 3 send finished...
proc: 0 after MPI_Recv()...
proc: 0 received message from proc 2...
rbuf[0]=10, rbuf[1]=11, rbuf[2]=12, rbuf[3]=13, rbuf[4]=14
proc: 0 before MPI_Recv()...
proc: 0 after MPI_Recv()...
proc: 0 received message from proc 3...
rbuf[0]=15, rbuf[1]=16, rbuf[2]=17, rbuf[3]=18, rbuf[4]=19
proc: 0 before MPI_Recv()...
proc: 0 after MPI_Recv()...
proc: 2 send started...
proc: 2 send finished...
proc: 0 received message from proc 1...
rbuf[0]=5, rbuf[1]=6, rbuf[2]=7, rbuf[3]=8, rbuf[4]=9
[zjshao@master 2-2-3-1]$

可见可以正常运行,但是如果我把BUFSIZE增大会有死锁的危险,例如:

1
2
3
...
#define BUFSIZE 5000000
...

再次编译运行:

1
2
3
4
5
6
7
8
9
10
[zjshao@master 2-2-3-1]$ mpiexec -n 4 -host node01 test.x
proc: 1 send started...
proc: 2 send started...
proc: 0 before MPI_Recv()...
proc: 3 send started...
proc: 1 send finished...
proc: 0 after MPI_Recv()...
[mpiexec@master.cluster] Sending Ctrl-C to processes as requested
[mpiexec@master.cluster] Press Ctrl-C again to force abort
[zjshao@master 2-2-3-1]$

为什么会出现上面死锁的情况?
原因是我们使用了阻塞的接收操作,而且进程0以此药从其他进程接收,因此需借助MPI环境提供的缓冲机制进行数据接收,而MPI提供的默认缓冲区大小是有限制的,当超过这个之后,发送端就要与接收动作同步,也就是说顺序执行,接收动作需按顺序从进程1,2…接收数据,但是如果当进程2的数据先于进程1的数据到达缓冲区,接收数据就无法完成接收,造成死锁。


可重复的非阻塞发送MPI_Send_init()和不可重复的非阻塞发送MPI_Recv_init()相匹配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#define BUFSIZE 5
#define RECEIVER 0
int main(int argc, char ** argv)
{
MPI_Request sreq;
MPI_Status sstat;
int buf[BUFSIZE];
char pstr[BUFSIZE*(sizeof(int) + 8) + 50];
int tag = 123;
int dest = 0;
int rank, nproc, i, j;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nproc);
// Fill data to be sent.
for (i = 0; i < BUFSIZE; ++i)
{
buf[i] = BUFSIZE*rank + i;
}
if (rank == RECEIVER)
{
int rbuf[nproc-1][BUFSIZE];
MPI_Request * rreq = (MPI_Request *)malloc((nproc-1)*sizeof(MPI_Request));
MPI_Status * rstat = (MPI_Status *)malloc((nproc-1)*sizeof(MPI_Status));
}
// For receiver processor.
int rbuf[nproc-1][BUFSIZE];
MPI_Request * rreq = (MPI_Request *)malloc((nproc-1)*sizeof(MPI_Request));
MPI_Status * rstat = (MPI_Status *)malloc((nproc-1)*sizeof(MPI_Status));
// Create persistent send and receive request.
if (rank != RECEIVER)
{
MPI_Send_init(buf, BUFSIZE, MPI_INT, RECEIVER, tag, MPI_COMM_WORLD, &sreq);
}
else
{
for (i = 0; i < nproc-1; ++i)
{
MPI_Recv_init(rbuf[i], BUFSIZE, MPI_INT, i+1, tag, MPI_COMM_WORLD, rreq+i);
}
}
if (rank != RECEIVER)
{
fprintf(stderr, "proc: %d before start...\n", rank);
MPI_Start(&sreq);
fprintf(stderr, "proc: %d after start...\n", rank);
fprintf(stderr, "proc: %d before wait...\n", rank);
MPI_Wait(&sreq, &sstat);
fprintf(stderr, "proc: %d after wait...\n", rank);
MPI_Request_free(&sreq);
}
else
{
// Start all receive request.
fprintf(stderr, "proc: %d before start all receive...\n", rank);
MPI_Startall(nproc-1, rreq);
fprintf(stderr, "proc: %d after start all receive...\n", rank);
// Wait all.
fprintf(stderr, "proc: %d before wait all...\n", rank);
MPI_Waitall(nproc-1, rreq, rstat);
fprintf(stderr, "proc: %d after wait all...\n", rank);
// Info output.
for (i = 0; i < nproc-1; ++i)
{
sprintf(pstr, "proc: %d received %d'th message from process %d\n",
rank, i, rstat[i].MPI_SOURCE);
for (j = 0; j < BUFSIZE-1; ++j)
{
sprintf(pstr, "%s rbuf[%d][%d]=%d,", pstr, i, j, rbuf[i][j]);
}
sprintf(pstr, "%s rbuf[%d][%d]=%d\n", pstr, i, j, rbuf[i][j]);
fprintf(stderr, "%s", pstr);
}
// Free request objects.
for (i = 0; i < nproc-1; ++i)
{
MPI_Request_free(rreq + i);
}
}
free(rreq);
free(rstat);
MPI_Finalize();
return 0;
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[zjshao@master 2-2-3-1]$ mpiexec -n 4 -host node01 send_init_recv_init.x
proc: 0 before start all receive...
proc: 0 after start all receive...
proc: 0 before wait all...
proc: 1 before start...
proc: 1 after start...
proc: 1 before wait...
proc: 1 after wait...
proc: 2 before start...
proc: 2 after start...
proc: 2 before wait...
proc: 2 after wait...
proc: 3 before start...
proc: 3 after start...
proc: 3 before wait...
proc: 3 after wait...
proc: 0 after wait all...
proc: 0 received 0'th message from process 1
rbuf[0][0]=5, rbuf[0][1]=6, rbuf[0][2]=7, rbuf[0][3]=8, rbuf[0][4]=9
proc: 0 received 1'th message from process 2
rbuf[1][0]=10, rbuf[1][1]=11, rbuf[1][2]=12, rbuf[1][3]=13, rbuf[1][4]=14
proc: 0 received 2'th message from process 3
rbuf[2][0]=15, rbuf[2][1]=16, rbuf[2][2]=17, rbuf[2][3]=18, rbuf[2][4]=19
[zjshao@master 2-2-3-1]$

如果还是增加传输数据的大小,仍有死锁的危险,原因仍是阻塞传输使用MPI环境默认缓冲区导致。

Comments