From 4365458757e4107ecbf629b2fd6e0e19a5d237c2 Mon Sep 17 00:00:00 2001 From: Kaiming Ouyang Date: Wed, 20 Sep 2023 05:51:14 -0700 Subject: [PATCH] Fix cudaMemcpyAsync bug We are trying to use the copy result of first cudaMemcpyAsync in the second cudaMemcpyAsync without sync in between. This patch fixes it by allocating a CPU side array to cache device side addr so that we can avoid this consecutive cuda mem copy. Fixes #957 --- src/channel.cc | 12 ++++++++++++ src/include/comm.h | 2 ++ src/init.cc | 2 +- src/transport.cc | 8 ++------ src/transport/nvls.cc | 10 ++++------ 5 files changed, 21 insertions(+), 13 deletions(-) diff --git a/src/channel.cc b/src/channel.cc index 3edcc2f..245dfd5 100644 --- a/src/channel.cc +++ b/src/channel.cc @@ -42,9 +42,11 @@ ncclResult_t initChannel(struct ncclComm* comm, int channelId) { /* channel->devPeers is not shared, so just free it when calling commFree() */ NCCLCHECK(ncclCudaCallocAsync(&channel->devPeers, nPeers, sharedRes->deviceStream.cudaStream)); ncclCommPushCudaFree(comm, channel->devPeers); + NCCLCHECK(ncclCalloc(&channel->devPeersHostPtr, nPeers)); for (int r = 0; r < nRanks; r++) { uintptr_t addr = (uintptr_t)(comm->sharedRes->devPeers[channelId] + comm->topParentRanks[r]); NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + r), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream)); + channel->devPeersHostPtr[r] = (struct ncclDevChannelPeer*)addr; } } @@ -52,6 +54,8 @@ ncclResult_t initChannel(struct ncclComm* comm, int channelId) { NCCLCHECK(ncclCudaCallocAsync(&channel->devRingUserRanks, nRanks, sharedRes->deviceStream.cudaStream)); ncclCommPushCudaFree(comm, channel->devRingUserRanks); + /* guarantee addr has been copied into channel->devPeers */ + NCCLCHECK(ncclStrongStreamSynchronize(&sharedRes->deviceStream)); NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &sharedRes->deviceStream)); return ncclSuccess; @@ -77,6 +81,7 @@ ncclResult_t initNvlsChannel(struct ncclComm* comm, int channelId, struct ncclCo uintptr_t addr = (uintptr_t)(parent->channels[channelId].nvlsDevPeers + tr); channel->peers[comm->nRanks + 1 + r] = parent->channels[channelId].nvlsPeers + tr; NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks + 1 + r), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream)); + channel->devPeersHostPtr[comm->nRanks + 1 + r] = (struct ncclDevChannelPeer*)addr; ncclAtomicRefCountIncrement(&parent->channels[channelId].nvlsPeers[tr].refCount); } } else { @@ -86,10 +91,12 @@ ncclResult_t initNvlsChannel(struct ncclComm* comm, int channelId, struct ncclCo uintptr_t addr = (uintptr_t)(channel->nvlsDevPeers + r); channel->peers[comm->nRanks + 1 + r] = channel->nvlsPeers + r; NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks + 1 + r), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream)); + channel->devPeersHostPtr[comm->nRanks + 1 + r] = (struct ncclDevChannelPeer*)addr; ncclAtomicRefCountIncrement(&channel->nvlsPeers[r].refCount); } } + NCCLCHECK(ncclStrongStreamSynchronize(&sharedRes->deviceStream)); NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &sharedRes->deviceStream)); return ncclSuccess; @@ -114,6 +121,7 @@ ncclResult_t initCollnetChannel(struct ncclComm* comm, int channelId, struct ncc addr = (uintptr_t)parent->channels[channelId].collnetDevPeers; channel->peers[comm->nRanks] = parent->channels[channelId].collnetPeers; NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream)); + channel->devPeersHostPtr[comm->nRanks] = (struct ncclDevChannelPeer*)addr; ncclAtomicRefCountIncrement(&parent->channels[channelId].collnetPeers->refCount); } else { NCCLCHECK(ncclCalloc(&channel->collnetPeers, 1)); @@ -121,9 +129,11 @@ ncclResult_t initCollnetChannel(struct ncclComm* comm, int channelId, struct ncc addr = (uintptr_t)channel->collnetDevPeers; channel->peers[comm->nRanks] = channel->collnetPeers; NCCLCHECK(ncclCudaMemcpyAsync((uintptr_t*)(channel->devPeers + comm->nRanks), (uintptr_t*)&addr, 1, sharedRes->deviceStream.cudaStream)); + channel->devPeersHostPtr[comm->nRanks] = (struct ncclDevChannelPeer*)addr; ncclAtomicRefCountIncrement(&channel->collnetPeers->refCount); } + NCCLCHECK(ncclStrongStreamSynchronize(&sharedRes->deviceStream)); NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &sharedRes->deviceStream)); return ncclSuccess; @@ -156,5 +166,7 @@ ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks, int collnetNRa } } } + + free(channel->devPeersHostPtr); return ncclSuccess; } diff --git a/src/include/comm.h b/src/include/comm.h index e79bf54..8986f93 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -124,6 +124,8 @@ struct ncclSharedResources { struct ncclChannel { struct ncclChannelPeer** peers; struct ncclDevChannelPeer** devPeers; + /* devPeer pointer array used for host side access */ + struct ncclDevChannelPeer** devPeersHostPtr; struct ncclRing ring; int* devRingUserRanks; struct ncclTree tree; diff --git a/src/init.cc b/src/init.cc index 1ea1d7e..309ce10 100644 --- a/src/init.cc +++ b/src/init.cc @@ -437,7 +437,7 @@ static ncclResult_t devCommSetup(ncclComm_t comm) { NCCLCHECKGOTO(ncclCudaMemcpyAsync(devCommAndChans, &tmpCommAndChans, 1, comm->sharedRes->deviceStream.cudaStream), ret, fail); exit: - CUDACHECK(cudaStreamSynchronize(comm->sharedRes->deviceStream.cudaStream)); + NCCLCHECK(ncclStrongStreamSynchronize(&comm->sharedRes->deviceStream)); NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &comm->sharedRes->deviceStream)); return ret; fail: diff --git a/src/transport.cc b/src/transport.cc index f4b8a2a..9817beb 100644 --- a/src/transport.cc +++ b/src/transport.cc @@ -147,11 +147,9 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* if (conn->connected == 0) { NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[i] + sendDataOffset++, 1, comm->rank, conn), ret, fail); if (ret == ncclSuccess) { - struct ncclDevChannelPeer* addr; conn->connected = 1; /* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */ - CUDACHECKGOTO(cudaMemcpyAsync(&addr, &comm->channels[c].devPeers[sendPeer], sizeof(struct ncclDevChannelPeer*), cudaMemcpyDeviceToHost, comm->sharedRes->hostStream.cudaStream), ret, fail); - CUDACHECKGOTO(cudaMemcpyAsync(&addr->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); } else if (ret == ncclInProgress) { allChannelsConnected = false; } @@ -167,11 +165,9 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* if (conn->connected == 0) { NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[i] + recvDataOffset++, 1, comm->rank, conn), ret, fail); if (ret == ncclSuccess) { - struct ncclDevChannelPeer* addr; conn->connected = 1; /* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */ - CUDACHECKGOTO(cudaMemcpyAsync(&addr, &comm->channels[c].devPeers[recvPeer], sizeof(struct ncclDevChannelPeer*), cudaMemcpyDeviceToHost, comm->sharedRes->hostStream.cudaStream), ret, fail); - CUDACHECKGOTO(cudaMemcpyAsync(&addr->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[recvPeer]->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); } else if (ret == ncclInProgress) { allChannelsConnected = false; } diff --git a/src/transport/nvls.cc b/src/transport/nvls.cc index 633cb04..07be99d 100644 --- a/src/transport/nvls.cc +++ b/src/transport/nvls.cc @@ -359,12 +359,10 @@ ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) { peer->send[0].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2); peer->send[0].conn.flags |= NCCL_NVLS_MIN_POLL; - struct ncclDevChannelPeer* addr; - CUDACHECKGOTO(cudaMemcpyAsync(&addr, comm->channels[c].devPeers + nvlsPeer, sizeof(struct ncclDevChannelPeer*), cudaMemcpyDeviceToHost, comm->sharedRes->hostStream.cudaStream), res, cleanup); - CUDACHECKGOTO(cudaMemcpyAsync(&addr->send[0], &peer->send[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup); - CUDACHECKGOTO(cudaMemcpyAsync(&addr->recv[0], &peer->recv[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup); - CUDACHECKGOTO(cudaMemcpyAsync(&addr->send[1], &peer->send[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup); - CUDACHECKGOTO(cudaMemcpyAsync(&addr->recv[1], &peer->recv[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup); + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[0], &peer->send[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup); + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[0], &peer->recv[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup); + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[1], &peer->send[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup); + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[1], &peer->recv[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup); /*INFO(NCCL_INIT|NCCL_NVLS, "Peer %d Channel %d MC buff %p/%p UC Buff %p/%p", nvlsPeer, c,