From 88d44d777f6970bdbf6610badcbd7e25a05380f0 Mon Sep 17 00:00:00 2001 From: Sylvain Jeaugey Date: Mon, 13 Nov 2023 10:26:55 -0800 Subject: [PATCH] 2.19.4-1 Split transport connect phase into multiple steps to avoid port exhaustion when connecting alltoall at large scale. Defaults to 128 peers per round. Fix memory leaks on CUDA graph capture. Fix alltoallv crash on self-sendrecv. Make topology detection more deterministic when PCI speeds are not available (fix issue #1020). Properly close shared memory in NVLS resources. Revert proxy detach after 5 seconds. Add option to print progress during transport connect. Add option to set NCCL_DEBUG to INFO on first WARN. --- makefiles/version.mk | 2 +- src/debug.cc | 3 + src/enqueue.cc | 23 +++++- src/graph/xml.cc | 5 +- src/include/comm.h | 4 +- src/include/proxy.h | 7 +- src/include/transport.h | 2 + src/init.cc | 16 ++-- src/proxy.cc | 97 ++++------------------ src/transport.cc | 179 +++++++++++++++++++++++++--------------- src/transport/nvls.cc | 19 +++-- 11 files changed, 174 insertions(+), 183 deletions(-) diff --git a/makefiles/version.mk b/makefiles/version.mk index 5e32150..b383eeb 100644 --- a/makefiles/version.mk +++ b/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 NCCL_MINOR := 19 -NCCL_PATCH := 3 +NCCL_PATCH := 4 NCCL_SUFFIX := PKG_REVISION := 1 diff --git a/src/debug.cc b/src/debug.cc index 21cec22..63b3e5b 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -139,6 +139,8 @@ void ncclDebugInit() { pthread_mutex_unlock(&ncclDebugLock); } +NCCL_PARAM(WarnSetDebugInfo, "WARN_ENABLE_DEBUG_INFO", 0); + /* Common logging function used by the INFO, WARN and TRACE macros * Also exported to the dynamically loadable Net transport modules so * they can share the debugging mechanisms and output files @@ -172,6 +174,7 @@ void ncclDebugLog(ncclDebugLogLevel level, unsigned long flags, const char *file if (level == NCCL_LOG_WARN) { len = snprintf(buffer, sizeof(buffer), "\n%s:%d:%d [%d] %s:%d NCCL WARN ", hostname, pid, tid, cudaDev, filefunc, line); + if (ncclParamWarnSetDebugInfo()) ncclDebugLevel = NCCL_LOG_INFO; } else if (level == NCCL_LOG_INFO) { len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] NCCL INFO ", hostname, pid, tid, cudaDev); } else if (level == NCCL_LOG_TRACE && flags == NCCL_CALL) { diff --git a/src/enqueue.cc b/src/enqueue.cc index dbb9865..ae56dec 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -627,13 +627,12 @@ static ncclResult_t scheduleP2pTasksToPlan( while (nChannelsMax*nRanks > comm->p2pnChannels*4 && nChannelsMax > 1) nChannelsMax /= 2; } - bool fuseOk; + bool fuseOk = false; // We can perform 8 send/recv per round per CTA. Make sure we jump between fused blocks at node boundaries. while (tasks->nTasksP2p != 0) { for (int i=0; i < tasks->p2pOrderSteps; i++) { int sendPeer = sendOrder[i]; int recvPeer = recvOrder[i]; - if ((i % (NCCL_MAX_WORK_ELEMENTS_P2P/2)) == 0) fuseOk = false; struct ncclTaskP2p* send = sendPeer != -1 ? ncclIntruQueueHead(&peers[sendPeer].sendQueue) : NULL; struct ncclTaskP2p* recv = recvPeer != -1 ? ncclIntruQueueHead(&peers[recvPeer].recvQueue) : NULL; if (sendPeer == comm->rank) { @@ -669,6 +668,7 @@ static ncclResult_t scheduleP2pTasksToPlan( if (send) sendBytes -= send->chunk*sendChunkBytesMax; do { + if ((i % (NCCL_MAX_WORK_ELEMENTS_P2P/2)) == 0) fuseOk = false; ssize_t recvChunkBytes = std::min(recvBytes, recvChunkBytesMax); // -1 preserved ssize_t sendChunkBytes = std::min(sendBytes, sendChunkBytesMax); if (recvChunkBytes != 0) { @@ -879,6 +879,14 @@ static ncclResult_t reclaimPlan(struct ncclComm* comm, struct ncclCommCallback* if (plan->persistent) { comm->persistentRefs -= 1; NCCLCHECK(ncclCudaFree(plan->workHead)); + for (int c=0; c < plan->channelUbound; c++) { + struct ncclProxyOp* q = ncclIntruQueueHead(&plan->channels[c].proxyOpQueue); + while (q != nullptr) { + struct ncclProxyOp* q1 = q->enqNext; + ncclMemoryPoolFree(&plan->memPool_ncclProxyOp, q); + q = q1; + } + } while (!ncclIntruQueueEmpty(&plan->ipcMemQueue)) { struct ncclPointerList* q = ncclIntruQueueDequeue(&plan->ipcMemQueue); CUDACHECKIGNORE(cudaIpcCloseMemHandle(q->ptr)); @@ -1093,9 +1101,16 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan ncclResult_t ncclLaunchKernelAfter_NoCuda(struct ncclComm* comm, struct ncclKernelPlan* plan) { if (!(plan->persistent || comm->persistentRefs != 0 || ncclCudaLaunchBlocking)) { - // If this isn't being captured and there aren't any CUDA graphs alive - // then we don't need to do our proxyOp pushing on the host stream. + // We are not using the host stream for proxy ops and reclaimation submission. NCCLCHECK(hostStreamPlanTask(comm, plan)); + } else { + // We are using the host stream for proxy ops and reclaimation submission. + // Only plans with proxy ops have a callback pushed by ncclLaunchPrepare. + // Since non-persistent plans also require reclaimation, we have to do it + // here. + if (!plan->persistent && !plan->hasProxyOps) { + ncclIntruQueueMpscEnqueue(&comm->callbackQueue, &plan->reclaimer); + } } return ncclSuccess; } diff --git a/src/graph/xml.cc b/src/graph/xml.cc index 47fda1f..f06f65d 100644 --- a/src/graph/xml.cc +++ b/src/graph/xml.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "core.h" #include "nvmlwrap.h" #include "xml.h" @@ -500,11 +501,11 @@ ncclResult_t ncclTopoGetXmlFromSys(struct ncclXmlNode* pciNode, struct ncclXml* if (index == -1) { if (path) { char deviceSpeedStr[MAX_STR_LEN]; - float deviceSpeed; + float deviceSpeed = FLT_MAX; NCCLCHECK(ncclTopoGetStrFromSys(path, "max_link_speed", deviceSpeedStr)); sscanf(deviceSpeedStr, "%f GT/s", &deviceSpeed); char portSpeedStr[MAX_STR_LEN]; - float portSpeed; + float portSpeed = FLT_MAX; NCCLCHECK(ncclTopoGetStrFromSys(path, "../max_link_speed", portSpeedStr)); sscanf(portSpeedStr, "%f GT/s", &portSpeed); NCCLCHECK(xmlSetAttr(pciNode, "link_speed", portSpeed < deviceSpeed ? portSpeedStr : deviceSpeedStr)); diff --git a/src/include/comm.h b/src/include/comm.h index bc5a9c5..328ffef 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -299,7 +299,7 @@ struct ncclComm { // Flag to ask NCCL kernels to abort volatile uint32_t *abortFlag; volatile uint32_t *childAbortFlag; - volatile uint32_t *abortFlagRefCount; + uint32_t *abortFlagRefCount; // Device side of the communicator (for cudaFree's) struct ncclDevComm* devComm; // actually = &ncclDevCommAndChannels::comm @@ -342,8 +342,6 @@ struct ncclComm { int nvlsRegSupport; /* sharable NVLS resource. */ struct ncclNvlsSharedRes* nvlsResources; - struct ncclShmemCollBuff nvlsShmem; - void *nvlsShmemHandle; ssize_t channelSize; // User requested work size (bytes) for channel partitions diff --git a/src/include/proxy.h b/src/include/proxy.h index daf3885..8093c0c 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -194,7 +194,6 @@ struct ncclProxyRpcResponseHeader { }; struct ncclProxyState { - int internalRefCount; int refCount; int tpRank; int tpnRanks; @@ -209,11 +208,10 @@ struct ncclProxyState { ncclNet_t* ncclNet; ncclCollNet_t* ncclCollNet; volatile uint32_t* abortFlag; - volatile uint32_t* abortFlagRefCount; // Service thread pthread_t thread; struct ncclSocket* listenSock; - volatile int stop; + int stop; CUcontext cudaCtx; ncclResult_t asyncResult; @@ -294,6 +292,5 @@ ncclResult_t ncclProxyClientGetFdBlocking(struct ncclComm* comm, struct ncclProx ncclResult_t ncclProxyStop(struct ncclComm* comm); ncclResult_t ncclProxyShmUnlink(struct ncclComm* comm); -ncclResult_t ncclProxyDestroy(struct ncclProxyState *proxyState); -ncclResult_t ncclProxyTryDetach(struct ncclProxyState *proxyState); +ncclResult_t ncclProxyDestroy(struct ncclComm* comm); #endif diff --git a/src/include/transport.h b/src/include/transport.h index d0cd974..27529df 100644 --- a/src/include/transport.h +++ b/src/include/transport.h @@ -67,6 +67,8 @@ struct ncclNvlsSharedRes { char shareableHandle[NVLS_HANDLE_SIZE]; size_t ucGran; int nChannels; + struct ncclShmemCollBuff nvlsShmem; + void *nvlsShmemHandle; }; #endif /* CUDART_VERSION >= 12010 */ diff --git a/src/init.cc b/src/init.cc index c681f2a..e82e64e 100644 --- a/src/init.cc +++ b/src/init.cc @@ -179,13 +179,7 @@ static ncclResult_t commFree(ncclComm_t comm) { * free all intra-process communicators; therefore, we only need to focus on local * resource cleanup in commFree(). */ if (comm->proxyState && comm->proxyRefCountOld == 0 && comm->proxyState->thread) { - if (*comm->abortFlag == 0) { - /* regular thread join */ - pthread_join(comm->proxyState->thread, nullptr); - } else { - /* try to detach thread due to abort */ - ncclProxyTryDetach(comm->proxyState); - } + pthread_join(comm->proxyState->thread, nullptr); } delete[] comm->userRedOps; @@ -219,7 +213,7 @@ static ncclResult_t commFree(ncclComm_t comm) { free(comm->sharedRes->tpRankToLocalRank); NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->hostStream)); NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->deviceStream)); - NCCLCHECK(ncclProxyDestroy(comm->sharedRes->proxyState)); + NCCLCHECK(ncclProxyDestroy(comm)); free(comm->sharedRes); } } @@ -237,7 +231,7 @@ static ncclResult_t commFree(ncclComm_t comm) { if (ncclAtomicRefCountDecrement(comm->abortFlagRefCount) == 0) { NCCLCHECK(ncclCudaHostFree((void *)comm->abortFlag)); - free((void*)comm->abortFlagRefCount); + free(comm->abortFlagRefCount); } free((void*)comm->config.netName); @@ -1645,7 +1639,7 @@ exit: fail: if (comm) { if (comm->abortFlag) ncclCudaHostFree((void *)comm->abortFlag); - if (comm->abortFlagRefCount) free((void*)comm->abortFlagRefCount); + if (comm->abortFlagRefCount) free(comm->abortFlagRefCount); free(comm); } if (newcomm) *newcomm = NULL; @@ -2086,7 +2080,7 @@ fail: if (childComm) { if (comm && !comm->config.splitShare) { if (childComm->abortFlag) ncclCudaHostFree((void*)childComm->abortFlag); - if (childComm->abortFlagRefCount) free((void*)childComm->abortFlagRefCount); + if (childComm->abortFlagRefCount) free(childComm->abortFlagRefCount); } free(childComm); } diff --git a/src/proxy.cc b/src/proxy.cc index 976b1d3..db36a15 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -18,14 +18,6 @@ #include #include -#define PROGRESS_RUNNING 0 -#define PROGRESS_REQUEST_STOP 1 -#define PROGRESS_ABORT 2 -#define PROGRESS_COMPLETE 3 - -#define SERVICE_RUNNING 0 -#define SERVICE_COMPLETE 1 - enum { proxyRecv=0, proxySend=1 }; static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) { @@ -720,13 +712,13 @@ static ncclResult_t ncclProxyGetPostedOps(struct ncclProxyState* proxyState, int if (state->active == NULL) { pthread_mutex_lock(&pool->mutex); - while (pool->nextOps == -1 && state->stop == PROGRESS_RUNNING) { + while (pool->nextOps == -1 && !state->stop) { struct ncclProxyArgs profArgs; // Only used for profiling purposes ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileSleep); pthread_cond_wait(&pool->cond, &pool->mutex); ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileWakeup); } - if (state->stop != PROGRESS_RUNNING) { // We might have been woken up to stop. + if (state->stop) { // We might have been woken up to stop. pthread_mutex_unlock(&pool->mutex); return ncclSuccess; } @@ -864,7 +856,7 @@ void* ncclProxyProgress(void *proxyState_) { * frequency of calling ncclProxyGetPostedOps() and reduce the perf impact. */ int proxyOpAppendCounter = 0; struct ncclProxyArgs profArgs; // Only used for profiling purposes - while (state->stop == PROGRESS_RUNNING || (state->stop == PROGRESS_REQUEST_STOP && state->active)) { + while ((state->stop == 0 || (state->stop == 1 && state->active)) && *proxyState->abortFlag == 0) { int idle = 1; ncclResult_t ret = progressOps(proxyState, state, state->active, &idle); if (ret != ncclSuccess) { @@ -878,7 +870,7 @@ void* ncclProxyProgress(void *proxyState_) { int added = 0; proxyOpAppendCounter = 0; TIME_START(3); - if (state->stop == PROGRESS_RUNNING) + if (state->stop == 0) ret = ncclProxyGetPostedOps(proxyState, &added); if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); } if (ret != ncclSuccess) { @@ -891,9 +883,6 @@ void* ncclProxyProgress(void *proxyState_) { } lastIdle = idle; } - - /* progress serive thread should be waiting for me, I need to notify it. */ - __atomic_store_n(&state->stop, PROGRESS_COMPLETE, __ATOMIC_RELEASE); return NULL; } @@ -916,11 +905,7 @@ ncclResult_t ncclProxyStart(struct ncclComm* comm) { static ncclResult_t ncclProxyProgressCreate(struct ncclProxyState* proxyState) { struct ncclProxyProgressState* state = &proxyState->progressState; if (!state->thread) { - pthread_attr_t attr; - SYSCHECK(pthread_attr_init(&attr), "pthread_attr_init"); - SYSCHECK(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED), "pthread_attr_setdetachstate"); - SYSCHECK(pthread_create(&state->thread, &attr, ncclProxyProgress, proxyState), "pthread_create"); - SYSCHECK(pthread_attr_destroy(&attr), "pthread_attr_destroy"); + pthread_create(&state->thread, NULL, ncclProxyProgress, proxyState); ncclSetThreadName(state->thread, "NCCL Progress%2d", proxyState->tpLocalnRanks); } return ncclSuccess; @@ -932,17 +917,10 @@ ncclResult_t ncclProxyProgressDestroy(struct ncclProxyState* proxyState) { // Request the proxy to stop and then wake it if (state->opsPool) { pthread_mutex_lock(&state->opsPool->mutex); - if (*proxyState->abortFlag == 0) - state->stop = PROGRESS_REQUEST_STOP; - else - state->stop = PROGRESS_ABORT; + state->stop = 1; pthread_cond_signal(&state->opsPool->cond); pthread_mutex_unlock(&state->opsPool->mutex); - /* progress thread is always detached, wait for it to exit. */ - uint64_t t0 = clockNano(); - while (__atomic_load_n(&state->stop, __ATOMIC_ACQUIRE) != PROGRESS_COMPLETE) { - if (clockNano() - t0 >= 1000) sched_yield(); - } + pthread_join(state->thread, NULL); } // Free off any memory allocated for the proxy arg pools @@ -1582,19 +1560,6 @@ void* ncclProxyService(void* _args) { ncclSocketClose(proxyState->listenSock); free(proxyState->listenSock); proxyOpsFree(proxyState); - - if (*proxyState->abortFlag) { - /* abort happened, need to notify main thread I am done. */ - __atomic_store_n(&proxyState->stop, SERVICE_COMPLETE, __ATOMIC_RELEASE); - } - - if (ncclAtomicRefCountDecrement(proxyState->abortFlagRefCount) == 0) { - ncclCudaHostFree((void *)proxyState->abortFlag); - free((void*)proxyState->abortFlagRefCount); - } - - /* proxy itself holds one internal ref count, needs to call ncclProxyDestroy */ - ncclProxyDestroy(proxyState); return NULL; } @@ -1603,8 +1568,6 @@ ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union NCCLCHECK(ncclCalloc(&comm->sharedRes->proxyState, 1)); comm->proxyState = comm->sharedRes->proxyState; comm->proxyState->refCount = 1; - /* ref count for communicator and proxy service thread. */ - comm->proxyState->internalRefCount = 2; comm->proxyState->listenSock = sock; comm->proxyState->peerAddresses = peerAddresses; // Seed the random number generator for UDS filename generation @@ -1627,8 +1590,6 @@ ncclResult_t ncclProxyCreate(struct ncclComm* comm) { proxyState->tpLocalnRanks = comm->localRanks; proxyState->cudaDev = comm->cudaDev; proxyState->abortFlag = comm->abortFlag; - proxyState->abortFlagRefCount = comm->abortFlagRefCount; - ncclAtomicRefCountIncrement(comm->abortFlagRefCount); proxyState->p2pnChannels = comm->p2pnChannels; proxyState->p2pChunkSize = comm->p2pChunkSize; proxyState->nChannels = comm->nChannels; @@ -1686,41 +1647,15 @@ ncclResult_t ncclProxyStop(struct ncclComm* comm) { return ncclSuccess; } -ncclResult_t ncclProxyDestroy(struct ncclProxyState *proxyState) { - if (__atomic_sub_fetch(&proxyState->internalRefCount, 1, __ATOMIC_ACQ_REL) == 0) { - free(proxyState->peerAddresses); - free(proxyState->peerSocks); - free(proxyState->proxyOps); - free(proxyState->sharedDevMems); - expectedProxyResponseFree(proxyState); - free(proxyState); - } - return ncclSuccess; -} +ncclResult_t ncclProxyDestroy(struct ncclComm* comm) { + struct ncclProxyState* sharedProxyState = comm->sharedRes->proxyState; -/* detach all proxy threads in case of abort */ -ncclResult_t ncclProxyTryDetach(struct ncclProxyState *proxyState) { - if (proxyState && proxyState->thread) { - /* proxy service thread can call cudaFreeHost to free pinned host mem, but - * it can cause a hang if main thread is issuing other cuda calls. To solution - * should be allocate/free pinned host mem using cuMem* driver API, this waiting - * 5 secs is just a workaround for now. */ - bool join = false; - struct timespec start, now; - clock_gettime(CLOCK_MONOTONIC, &start); - do { - clock_gettime(CLOCK_MONOTONIC, &now); - if (__atomic_load_n(&proxyState->stop, __ATOMIC_ACQUIRE) == SERVICE_COMPLETE) { - /* proxy thread is done, join it. */ - pthread_join(proxyState->thread, NULL); - join = true; - break; - } - } while(now.tv_sec - start.tv_sec < 5); - - if (join == false) { - pthread_detach(proxyState->thread); - } - } + assert(sharedProxyState->refCount == 0); + free(sharedProxyState->peerAddresses); + free(sharedProxyState->peerSocks); + free(sharedProxyState->proxyOps); + free(sharedProxyState->sharedDevMems); + expectedProxyResponseFree(sharedProxyState); + free(sharedProxyState); return ncclSuccess; } diff --git a/src/transport.cc b/src/transport.cc index c66a81e..a465d6b 100644 --- a/src/transport.cc +++ b/src/transport.cc @@ -65,13 +65,28 @@ void dumpData(struct ncclConnect* data, int ndata) { } } +NCCL_PARAM(ConnectRoundMaxPeers, "CONNECT_ROUND_MAX_PEERS", 128); +NCCL_PARAM(ReportConnectProgress, "REPORT_CONNECT_PROGRESS", 0); +#include + ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, int connIndex, int* highestTransportType/*=NULL*/) { // Stream used during transport setup; need for P2P pre-connect + CUDA Graph ncclResult_t ret = ncclSuccess; int highestType = TRANSPORT_P2P; // track highest transport type - struct ncclConnect** data = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Store intermediate send/recvData structs for connect - struct ncclConnect** recvData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given recv connection within a channel - struct ncclConnect** sendData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given send connection within a channel + struct ncclConnect** data; // Store intermediate send/recvData structs for connect + struct ncclConnect** recvData; // Points to entries inside data for given recv connection within a channel + struct ncclConnect** sendData; // Points to entries inside data for given send connection within a channel + int done = 0; + + int maxPeers = ncclParamConnectRoundMaxPeers(); + NCCLCHECK(ncclCalloc(&data, maxPeers)); + NCCLCHECK(ncclCalloc(&recvData, maxPeers)); + NCCLCHECK(ncclCalloc(&sendData, maxPeers)); + + struct timeval timeStart, timeLast; + gettimeofday(&timeStart, NULL); + timeLast = timeStart; // struct copy + bool timeReported = false; NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->hostStream), ret, fail); // First time initialization @@ -87,23 +102,24 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* // The first N entries contain recvData, connection information for recv connections // The next M entries contain sendData, connection information for send connections // It's not guaranteed that each entry of data has the same number of total or send/recv specific connections - data[i] = (ncclConnect*) malloc(sizeof(ncclConnect) * 2*MAXCHANNELS); - recvData[i] = data[i]; + int p = i-(done+1); + if (recvMask || sendMask) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS)); + recvData[p] = data[p]; int sendChannels = 0, recvChannels = 0; int type; TIME_START(0); for (int c=0; c(comm, graph, recvData[i]+recvChannels++, c, recvPeer, connIndex, &type), ret, fail); + NCCLCHECKGOTO(selectTransport<0>(comm, graph, recvData[p]+recvChannels++, c, recvPeer, connIndex, &type), ret, fail); if (type > highestType) highestType = type; } } TIME_STOP(0); TIME_START(1); - sendData[i] = recvData[i]+recvChannels; + sendData[p] = recvData[p]+recvChannels; for (int c=0; c(comm, graph, sendData[i]+sendChannels++, c, sendPeer, connIndex, &type), ret, fail); + NCCLCHECKGOTO(selectTransport<1>(comm, graph, sendData[p]+sendChannels++, c, sendPeer, connIndex, &type), ret, fail); if (type > highestType) highestType = type; } } @@ -112,70 +128,100 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* TIME_START(2); if (sendPeer == recvPeer) { if (recvChannels+sendChannels) { - NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); - NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); - sendData[i] = data[i]; - recvData[i] = data[i]+sendChannels; + NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); + NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); + sendData[p] = data[p]; + recvData[p] = data[p]+sendChannels; } } else { - if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail); - if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail); - if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail); - if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail); + if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail); + if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail); + if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail); + if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail); } TIME_STOP(2); + + if (i-done == maxPeers || i == comm->nRanks-1) { + // Loop until all channels with all ranks have been connected + bool allChannelsConnected; + allChannelsConnected = false; + while (!allChannelsConnected) { + allChannelsConnected = true; + for (int j=done+1; j<=i; j++) { + int recvPeer = (comm->rank - j + comm->nRanks) % comm->nRanks; + int sendPeer = (comm->rank + j) % comm->nRanks; + uint64_t recvMask = comm->connectRecv[recvPeer]; + uint64_t sendMask = comm->connectSend[sendPeer]; + + int p = j-(done+1); + int sendDataOffset = 0; + int recvDataOffset = 0; + for (int c=0; cchannels[c].peers[sendPeer]->send + connIndex; + // This connector hasn't completed connection yet + if (conn->connected == 0) { + NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[p] + sendDataOffset++, 1, comm->rank, conn), ret, fail); + if (ret == ncclSuccess) { + conn->connected = 1; + /* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */ + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); + } else if (ret == ncclInProgress) { + allChannelsConnected = false; + } + } + } + TIME_STOP(3); + + // Start with recv channels + TIME_START(4); + if (recvMask & (1UL<channels[c].peers[recvPeer]->recv + connIndex; + // This connector hasn't completed connection yet + if (conn->connected == 0) { + NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[p] + recvDataOffset++, 1, comm->rank, conn), ret, fail); + if (ret == ncclSuccess) { + conn->connected = 1; + /* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */ + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[recvPeer]->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); + } else if (ret == ncclInProgress) { + allChannelsConnected = false; + } + } + } + TIME_STOP(4); + } + if (sendMask || recvMask) { + free(data[p]); + data[p] = NULL; + } + } + if (ncclParamReportConnectProgress() && comm->rank == 0) { + struct timeval now; + gettimeofday(&now, NULL); + if (((now.tv_sec - timeLast.tv_sec)*1.0 + (now.tv_usec-timeLast.tv_usec)*1e-6) > 1) { + float elapsed = (now.tv_sec - timeStart.tv_sec)*1.0 + (now.tv_usec-timeStart.tv_usec)*1e-6; + float remaining = elapsed*(comm->nRanks-done)/done; + printf("%sP2p connect: %g%% Elapsed %d:%02d Remaining %d:%02d ", + timeReported ? "\r" : "", done*100.0/comm->nRanks, ((int)elapsed)/60, ((int)elapsed)%60, ((int)remaining)/60, ((int)remaining)%60); + fflush(stdout); + timeReported = true; + timeLast = now; // struct copy; + } + } + } + done = i; + } } - // Loop until all channels with all ranks have been connected - bool allChannelsConnected; - allChannelsConnected = false; - while (!allChannelsConnected) { - allChannelsConnected = true; - for (int i=1; inRanks; i++) { - int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; - int sendPeer = (comm->rank + i) % comm->nRanks; - uint64_t recvMask = comm->connectRecv[recvPeer]; - uint64_t sendMask = comm->connectSend[sendPeer]; - - int sendDataOffset = 0; - int recvDataOffset = 0; - for (int c=0; cchannels[c].peers[sendPeer]->send + connIndex; - // This connector hasn't completed connection yet - if (conn->connected == 0) { - NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[i] + sendDataOffset++, 1, comm->rank, conn), ret, fail); - if (ret == ncclSuccess) { - conn->connected = 1; - /* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */ - CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); - } else if (ret == ncclInProgress) { - allChannelsConnected = false; - } - } - } - TIME_STOP(3); - - // Start with recv channels - TIME_START(4); - if (recvMask & (1UL<channels[c].peers[recvPeer]->recv + connIndex; - // This connector hasn't completed connection yet - if (conn->connected == 0) { - NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[i] + recvDataOffset++, 1, comm->rank, conn), ret, fail); - if (ret == ncclSuccess) { - conn->connected = 1; - /* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */ - CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[recvPeer]->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); - } else if (ret == ncclInProgress) { - allChannelsConnected = false; - } - } - } - TIME_STOP(4); - } - } + if (timeReported) { + struct timeval now; + gettimeofday(&now, NULL); + float elapsed = (now.tv_sec - timeStart.tv_sec)*1.0 + (now.tv_usec-timeStart.tv_usec)*1e-6; + printf("\rP2p connect done in %d:%02d \n", + ((int)elapsed)/60, ((int)elapsed)%60); + fflush(stdout); } /* We need to sync ranks here since some ranks might run too fast after connection setup @@ -205,7 +251,6 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* } } comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0UL; - free(data[i]); } free(data); diff --git a/src/transport/nvls.cc b/src/transport/nvls.cc index c9a3bbc..4dfae51 100644 --- a/src/transport/nvls.cc +++ b/src/transport/nvls.cc @@ -393,18 +393,18 @@ ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) { typeSize = sizeof(struct localRegData); if (comm->localRank == 0) { shmPath[0] = '\0'; - NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, comm->localRanks - 1, &comm->nvlsShmemHandle), res, cleanup); + NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, comm->localRanks - 1, &comm->nvlsResources->nvlsShmemHandle), res, cleanup); NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup); } else { NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup); - NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, -1, &comm->nvlsShmemHandle), res, cleanup); + NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, -1, &comm->nvlsResources->nvlsShmemHandle), res, cleanup); } /* need 2 pools and a shared counter for shmem-based collectives */ - comm->nvlsShmem.cnt[0] = (size_t*)nvlsShmem; - comm->nvlsShmem.ptr[0] = (void*)((char*)comm->nvlsShmem.cnt[0] + sizeof(size_t)); - comm->nvlsShmem.cnt[1] = (size_t*)((char*)comm->nvlsShmem.ptr[0] + typeSize * comm->localRanks); - comm->nvlsShmem.ptr[1] = (void*)((char*)comm->nvlsShmem.cnt[1] + sizeof(size_t)); - comm->nvlsShmem.round = 0; + comm->nvlsResources->nvlsShmem.cnt[0] = (size_t*)nvlsShmem; + comm->nvlsResources->nvlsShmem.ptr[0] = (void*)((char*)comm->nvlsResources->nvlsShmem.cnt[0] + sizeof(size_t)); + comm->nvlsResources->nvlsShmem.cnt[1] = (size_t*)((char*)comm->nvlsResources->nvlsShmem.ptr[0] + typeSize * comm->localRanks); + comm->nvlsResources->nvlsShmem.ptr[1] = (void*)((char*)comm->nvlsResources->nvlsShmem.cnt[1] + sizeof(size_t)); + comm->nvlsResources->nvlsShmem.round = 0; return res; @@ -418,6 +418,7 @@ ncclResult_t ncclNvlsFree(struct ncclComm* comm) { if (resources == NULL) return ncclSuccess; if (ncclAtomicRefCountDecrement(&resources->refCount) == 0) { + NCCLCHECK(ncclShmClose(resources->nvlsShmemHandle)); NCCLCHECK(nvlsGroupUnbind(comm, resources)); NCCLCHECK(nvlsGroupUnmapMem(comm, resources)); free(resources); @@ -476,7 +477,7 @@ ncclResult_t tryRegisterBuffer(struct ncclComm *comm, struct localRequestData *r /* get all buffer addresses */ NCCLCHECKGOTO(ncclCalloc(®Record->addrs, comm->localRanks), ret, fail); regRecord->addrs[comm->localRank] = regRecord->buff; - NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsShmem, regRecord->addrs + comm->localRank, regRecord->addrs, sizeof(uintptr_t)), ret, fail); + NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsResources->nvlsShmem, regRecord->addrs + comm->localRank, regRecord->addrs, sizeof(uintptr_t)), ret, fail); /* enqueue record */ ncclIntruQueueEnqueue(&comm->regRecordQueue, regRecord); @@ -551,7 +552,7 @@ ncclResult_t ncclNvlsLocalRegisterBuffer(struct ncclComm *comm, const void *send regRequestHead = regRequestHead->next; } - NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsShmem, regData + comm->localRank, regData, sizeof(struct localRegData)), ret, fail); + NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsResources->nvlsShmem, regData + comm->localRank, regData, sizeof(struct localRegData)), ret, fail); /* first check whether all local ranks find their registered buffer */ for (int i = 0; i < comm->localRanks; ++i) {