Fix cudaMemcpyAsync bug

We are trying to use the copy result of first cudaMemcpyAsync in the
second cudaMemcpyAsync without sync in between. This patch fixes it
by allocating a CPU side array to cache device side addr so that we
can avoid this consecutive cuda mem copy.

Fixes #957
This commit is contained in:
Kaiming Ouyang 2023-09-20 05:51:14 -07:00 committed by Sylvain Jeaugey
parent 559b70f86c
commit 4365458757
5 changed files with 21 additions and 13 deletions

View File

@ -42,9 +42,11 @@ ncclResult_t initChannel(struct ncclComm* comm, int channelId) {
/* channel->devPeers is not shared, so just free it when calling commFree() */
NCCLCHECK(ncclCudaCallocAsync(&channel->devPeers, nPeers, sharedRes->deviceStream.cudaStream));
ncclCommPushCudaFree(comm, channel->devPeers);
NCCLCHECK(ncclCalloc(&channel->devPeersHostPtr, nPeers));
for (int r = 0; r < nRanks; r++) {
uintptr_t addr = (uintptr_t)(comm->sharedRes->devPeers[channelId] + comm->topParentRanks[r]);
NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + r), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream));
channel->devPeersHostPtr[r] = (struct ncclDevChannelPeer*)addr;
}
}
@ -52,6 +54,8 @@ ncclResult_t initChannel(struct ncclComm* comm, int channelId) {
NCCLCHECK(ncclCudaCallocAsync(&channel->devRingUserRanks, nRanks, sharedRes->deviceStream.cudaStream));
ncclCommPushCudaFree(comm, channel->devRingUserRanks);
/* guarantee addr has been copied into channel->devPeers */
NCCLCHECK(ncclStrongStreamSynchronize(&sharedRes->deviceStream));
NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &sharedRes->deviceStream));
return ncclSuccess;
@ -77,6 +81,7 @@ ncclResult_t initNvlsChannel(struct ncclComm* comm, int channelId, struct ncclCo
uintptr_t addr = (uintptr_t)(parent->channels[channelId].nvlsDevPeers + tr);
channel->peers[comm->nRanks + 1 + r] = parent->channels[channelId].nvlsPeers + tr;
NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks + 1 + r), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream));
channel->devPeersHostPtr[comm->nRanks + 1 + r] = (struct ncclDevChannelPeer*)addr;
ncclAtomicRefCountIncrement(&parent->channels[channelId].nvlsPeers[tr].refCount);
}
} else {
@ -86,10 +91,12 @@ ncclResult_t initNvlsChannel(struct ncclComm* comm, int channelId, struct ncclCo
uintptr_t addr = (uintptr_t)(channel->nvlsDevPeers + r);
channel->peers[comm->nRanks + 1 + r] = channel->nvlsPeers + r;
NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks + 1 + r), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream));
channel->devPeersHostPtr[comm->nRanks + 1 + r] = (struct ncclDevChannelPeer*)addr;
ncclAtomicRefCountIncrement(&channel->nvlsPeers[r].refCount);
}
}
NCCLCHECK(ncclStrongStreamSynchronize(&sharedRes->deviceStream));
NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &sharedRes->deviceStream));
return ncclSuccess;
@ -114,6 +121,7 @@ ncclResult_t initCollnetChannel(struct ncclComm* comm, int channelId, struct ncc
addr = (uintptr_t)parent->channels[channelId].collnetDevPeers;
channel->peers[comm->nRanks] = parent->channels[channelId].collnetPeers;
NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream));
channel->devPeersHostPtr[comm->nRanks] = (struct ncclDevChannelPeer*)addr;
ncclAtomicRefCountIncrement(&parent->channels[channelId].collnetPeers->refCount);
} else {
NCCLCHECK(ncclCalloc(&channel->collnetPeers, 1));
@ -121,9 +129,11 @@ ncclResult_t initCollnetChannel(struct ncclComm* comm, int channelId, struct ncc
addr = (uintptr_t)channel->collnetDevPeers;
channel->peers[comm->nRanks] = channel->collnetPeers;
NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream));
channel->devPeersHostPtr[comm->nRanks] = (struct ncclDevChannelPeer*)addr;
ncclAtomicRefCountIncrement(&channel->collnetPeers->refCount);
}
NCCLCHECK(ncclStrongStreamSynchronize(&sharedRes->deviceStream));
NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &sharedRes->deviceStream));
return ncclSuccess;
@ -156,5 +166,7 @@ ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks, int collnetNRa
}
}
}
free(channel->devPeersHostPtr);
return ncclSuccess;
}

View File

@ -124,6 +124,8 @@ struct ncclSharedResources {
struct ncclChannel {
struct ncclChannelPeer** peers;
struct ncclDevChannelPeer** devPeers;
/* devPeer pointer array used for host side access */
struct ncclDevChannelPeer** devPeersHostPtr;
struct ncclRing ring;
int* devRingUserRanks;
struct ncclTree tree;

View File

@ -437,7 +437,7 @@ static ncclResult_t devCommSetup(ncclComm_t comm) {
NCCLCHECKGOTO(ncclCudaMemcpyAsync(devCommAndChans, &tmpCommAndChans, 1, comm->sharedRes->deviceStream.cudaStream), ret, fail);
exit:
CUDACHECK(cudaStreamSynchronize(comm->sharedRes->deviceStream.cudaStream));
NCCLCHECK(ncclStrongStreamSynchronize(&comm->sharedRes->deviceStream));
NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &comm->sharedRes->deviceStream));
return ret;
fail:

View File

@ -147,11 +147,9 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
if (conn->connected == 0) {
NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[i] + sendDataOffset++, 1, comm->rank, conn), ret, fail);
if (ret == ncclSuccess) {
struct ncclDevChannelPeer* addr;
conn->connected = 1;
/* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */
CUDACHECKGOTO(cudaMemcpyAsync(&addr, &comm->channels[c].devPeers[sendPeer], sizeof(struct ncclDevChannelPeer*), cudaMemcpyDeviceToHost, comm->sharedRes->hostStream.cudaStream), ret, fail);
CUDACHECKGOTO(cudaMemcpyAsync(&addr->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail);
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail);
} else if (ret == ncclInProgress) {
allChannelsConnected = false;
}
@ -167,11 +165,9 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
if (conn->connected == 0) {
NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[i] + recvDataOffset++, 1, comm->rank, conn), ret, fail);
if (ret == ncclSuccess) {
struct ncclDevChannelPeer* addr;
conn->connected = 1;
/* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */
CUDACHECKGOTO(cudaMemcpyAsync(&addr, &comm->channels[c].devPeers[recvPeer], sizeof(struct ncclDevChannelPeer*), cudaMemcpyDeviceToHost, comm->sharedRes->hostStream.cudaStream), ret, fail);
CUDACHECKGOTO(cudaMemcpyAsync(&addr->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail);
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[recvPeer]->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail);
} else if (ret == ncclInProgress) {
allChannelsConnected = false;
}

View File

@ -359,12 +359,10 @@ ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {
peer->send[0].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);
peer->send[0].conn.flags |= NCCL_NVLS_MIN_POLL;
struct ncclDevChannelPeer* addr;
CUDACHECKGOTO(cudaMemcpyAsync(&addr, comm->channels[c].devPeers + nvlsPeer, sizeof(struct ncclDevChannelPeer*), cudaMemcpyDeviceToHost, comm->sharedRes->hostStream.cudaStream), res, cleanup);
CUDACHECKGOTO(cudaMemcpyAsync(&addr->send[0], &peer->send[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
CUDACHECKGOTO(cudaMemcpyAsync(&addr->recv[0], &peer->recv[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
CUDACHECKGOTO(cudaMemcpyAsync(&addr->send[1], &peer->send[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
CUDACHECKGOTO(cudaMemcpyAsync(&addr->recv[1], &peer->recv[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[0], &peer->send[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[0], &peer->recv[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[1], &peer->send[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[1], &peer->recv[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);
/*INFO(NCCL_INIT|NCCL_NVLS, "Peer %d Channel %d MC buff %p/%p UC Buff %p/%p",
nvlsPeer, c,