diff --git a/makefiles/version.mk b/makefiles/version.mk index 5e32150..b383eeb 100644 --- a/makefiles/version.mk +++ b/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 NCCL_MINOR := 19 -NCCL_PATCH := 3 +NCCL_PATCH := 4 NCCL_SUFFIX := PKG_REVISION := 1 diff --git a/src/debug.cc b/src/debug.cc index 21cec22..63b3e5b 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -139,6 +139,8 @@ void ncclDebugInit() { pthread_mutex_unlock(&ncclDebugLock); } +NCCL_PARAM(WarnSetDebugInfo, "WARN_ENABLE_DEBUG_INFO", 0); + /* Common logging function used by the INFO, WARN and TRACE macros * Also exported to the dynamically loadable Net transport modules so * they can share the debugging mechanisms and output files @@ -172,6 +174,7 @@ void ncclDebugLog(ncclDebugLogLevel level, unsigned long flags, const char *file if (level == NCCL_LOG_WARN) { len = snprintf(buffer, sizeof(buffer), "\n%s:%d:%d [%d] %s:%d NCCL WARN ", hostname, pid, tid, cudaDev, filefunc, line); + if (ncclParamWarnSetDebugInfo()) ncclDebugLevel = NCCL_LOG_INFO; } else if (level == NCCL_LOG_INFO) { len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] NCCL INFO ", hostname, pid, tid, cudaDev); } else if (level == NCCL_LOG_TRACE && flags == NCCL_CALL) { diff --git a/src/enqueue.cc b/src/enqueue.cc index dbb9865..ae56dec 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -627,13 +627,12 @@ static ncclResult_t scheduleP2pTasksToPlan( while (nChannelsMax*nRanks > comm->p2pnChannels*4 && nChannelsMax > 1) nChannelsMax /= 2; } - bool fuseOk; + bool fuseOk = false; // We can perform 8 send/recv per round per CTA. Make sure we jump between fused blocks at node boundaries. while (tasks->nTasksP2p != 0) { for (int i=0; i < tasks->p2pOrderSteps; i++) { int sendPeer = sendOrder[i]; int recvPeer = recvOrder[i]; - if ((i % (NCCL_MAX_WORK_ELEMENTS_P2P/2)) == 0) fuseOk = false; struct ncclTaskP2p* send = sendPeer != -1 ? ncclIntruQueueHead(&peers[sendPeer].sendQueue) : NULL; struct ncclTaskP2p* recv = recvPeer != -1 ? ncclIntruQueueHead(&peers[recvPeer].recvQueue) : NULL; if (sendPeer == comm->rank) { @@ -669,6 +668,7 @@ static ncclResult_t scheduleP2pTasksToPlan( if (send) sendBytes -= send->chunk*sendChunkBytesMax; do { + if ((i % (NCCL_MAX_WORK_ELEMENTS_P2P/2)) == 0) fuseOk = false; ssize_t recvChunkBytes = std::min(recvBytes, recvChunkBytesMax); // -1 preserved ssize_t sendChunkBytes = std::min(sendBytes, sendChunkBytesMax); if (recvChunkBytes != 0) { @@ -879,6 +879,14 @@ static ncclResult_t reclaimPlan(struct ncclComm* comm, struct ncclCommCallback* if (plan->persistent) { comm->persistentRefs -= 1; NCCLCHECK(ncclCudaFree(plan->workHead)); + for (int c=0; c < plan->channelUbound; c++) { + struct ncclProxyOp* q = ncclIntruQueueHead(&plan->channels[c].proxyOpQueue); + while (q != nullptr) { + struct ncclProxyOp* q1 = q->enqNext; + ncclMemoryPoolFree(&plan->memPool_ncclProxyOp, q); + q = q1; + } + } while (!ncclIntruQueueEmpty(&plan->ipcMemQueue)) { struct ncclPointerList* q = ncclIntruQueueDequeue(&plan->ipcMemQueue); CUDACHECKIGNORE(cudaIpcCloseMemHandle(q->ptr)); @@ -1093,9 +1101,16 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan ncclResult_t ncclLaunchKernelAfter_NoCuda(struct ncclComm* comm, struct ncclKernelPlan* plan) { if (!(plan->persistent || comm->persistentRefs != 0 || ncclCudaLaunchBlocking)) { - // If this isn't being captured and there aren't any CUDA graphs alive - // then we don't need to do our proxyOp pushing on the host stream. + // We are not using the host stream for proxy ops and reclaimation submission. NCCLCHECK(hostStreamPlanTask(comm, plan)); + } else { + // We are using the host stream for proxy ops and reclaimation submission. + // Only plans with proxy ops have a callback pushed by ncclLaunchPrepare. + // Since non-persistent plans also require reclaimation, we have to do it + // here. + if (!plan->persistent && !plan->hasProxyOps) { + ncclIntruQueueMpscEnqueue(&comm->callbackQueue, &plan->reclaimer); + } } return ncclSuccess; } diff --git a/src/graph/xml.cc b/src/graph/xml.cc index 47fda1f..f06f65d 100644 --- a/src/graph/xml.cc +++ b/src/graph/xml.cc @@ -9,6 +9,7 @@ #include #include #include +#include #include "core.h" #include "nvmlwrap.h" #include "xml.h" @@ -500,11 +501,11 @@ ncclResult_t ncclTopoGetXmlFromSys(struct ncclXmlNode* pciNode, struct ncclXml* if (index == -1) { if (path) { char deviceSpeedStr[MAX_STR_LEN]; - float deviceSpeed; + float deviceSpeed = FLT_MAX; NCCLCHECK(ncclTopoGetStrFromSys(path, "max_link_speed", deviceSpeedStr)); sscanf(deviceSpeedStr, "%f GT/s", &deviceSpeed); char portSpeedStr[MAX_STR_LEN]; - float portSpeed; + float portSpeed = FLT_MAX; NCCLCHECK(ncclTopoGetStrFromSys(path, "../max_link_speed", portSpeedStr)); sscanf(portSpeedStr, "%f GT/s", &portSpeed); NCCLCHECK(xmlSetAttr(pciNode, "link_speed", portSpeed < deviceSpeed ? portSpeedStr : deviceSpeedStr)); diff --git a/src/include/comm.h b/src/include/comm.h index bc5a9c5..328ffef 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -299,7 +299,7 @@ struct ncclComm { // Flag to ask NCCL kernels to abort volatile uint32_t *abortFlag; volatile uint32_t *childAbortFlag; - volatile uint32_t *abortFlagRefCount; + uint32_t *abortFlagRefCount; // Device side of the communicator (for cudaFree's) struct ncclDevComm* devComm; // actually = &ncclDevCommAndChannels::comm @@ -342,8 +342,6 @@ struct ncclComm { int nvlsRegSupport; /* sharable NVLS resource. */ struct ncclNvlsSharedRes* nvlsResources; - struct ncclShmemCollBuff nvlsShmem; - void *nvlsShmemHandle; ssize_t channelSize; // User requested work size (bytes) for channel partitions diff --git a/src/include/proxy.h b/src/include/proxy.h index daf3885..8093c0c 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -194,7 +194,6 @@ struct ncclProxyRpcResponseHeader { }; struct ncclProxyState { - int internalRefCount; int refCount; int tpRank; int tpnRanks; @@ -209,11 +208,10 @@ struct ncclProxyState { ncclNet_t* ncclNet; ncclCollNet_t* ncclCollNet; volatile uint32_t* abortFlag; - volatile uint32_t* abortFlagRefCount; // Service thread pthread_t thread; struct ncclSocket* listenSock; - volatile int stop; + int stop; CUcontext cudaCtx; ncclResult_t asyncResult; @@ -294,6 +292,5 @@ ncclResult_t ncclProxyClientGetFdBlocking(struct ncclComm* comm, struct ncclProx ncclResult_t ncclProxyStop(struct ncclComm* comm); ncclResult_t ncclProxyShmUnlink(struct ncclComm* comm); -ncclResult_t ncclProxyDestroy(struct ncclProxyState *proxyState); -ncclResult_t ncclProxyTryDetach(struct ncclProxyState *proxyState); +ncclResult_t ncclProxyDestroy(struct ncclComm* comm); #endif diff --git a/src/include/transport.h b/src/include/transport.h index d0cd974..27529df 100644 --- a/src/include/transport.h +++ b/src/include/transport.h @@ -67,6 +67,8 @@ struct ncclNvlsSharedRes { char shareableHandle[NVLS_HANDLE_SIZE]; size_t ucGran; int nChannels; + struct ncclShmemCollBuff nvlsShmem; + void *nvlsShmemHandle; }; #endif /* CUDART_VERSION >= 12010 */ diff --git a/src/init.cc b/src/init.cc index c681f2a..e82e64e 100644 --- a/src/init.cc +++ b/src/init.cc @@ -179,13 +179,7 @@ static ncclResult_t commFree(ncclComm_t comm) { * free all intra-process communicators; therefore, we only need to focus on local * resource cleanup in commFree(). */ if (comm->proxyState && comm->proxyRefCountOld == 0 && comm->proxyState->thread) { - if (*comm->abortFlag == 0) { - /* regular thread join */ - pthread_join(comm->proxyState->thread, nullptr); - } else { - /* try to detach thread due to abort */ - ncclProxyTryDetach(comm->proxyState); - } + pthread_join(comm->proxyState->thread, nullptr); } delete[] comm->userRedOps; @@ -219,7 +213,7 @@ static ncclResult_t commFree(ncclComm_t comm) { free(comm->sharedRes->tpRankToLocalRank); NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->hostStream)); NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->deviceStream)); - NCCLCHECK(ncclProxyDestroy(comm->sharedRes->proxyState)); + NCCLCHECK(ncclProxyDestroy(comm)); free(comm->sharedRes); } } @@ -237,7 +231,7 @@ static ncclResult_t commFree(ncclComm_t comm) { if (ncclAtomicRefCountDecrement(comm->abortFlagRefCount) == 0) { NCCLCHECK(ncclCudaHostFree((void *)comm->abortFlag)); - free((void*)comm->abortFlagRefCount); + free(comm->abortFlagRefCount); } free((void*)comm->config.netName); @@ -1645,7 +1639,7 @@ exit: fail: if (comm) { if (comm->abortFlag) ncclCudaHostFree((void *)comm->abortFlag); - if (comm->abortFlagRefCount) free((void*)comm->abortFlagRefCount); + if (comm->abortFlagRefCount) free(comm->abortFlagRefCount); free(comm); } if (newcomm) *newcomm = NULL; @@ -2086,7 +2080,7 @@ fail: if (childComm) { if (comm && !comm->config.splitShare) { if (childComm->abortFlag) ncclCudaHostFree((void*)childComm->abortFlag); - if (childComm->abortFlagRefCount) free((void*)childComm->abortFlagRefCount); + if (childComm->abortFlagRefCount) free(childComm->abortFlagRefCount); } free(childComm); } diff --git a/src/proxy.cc b/src/proxy.cc index 976b1d3..db36a15 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -18,14 +18,6 @@ #include #include -#define PROGRESS_RUNNING 0 -#define PROGRESS_REQUEST_STOP 1 -#define PROGRESS_ABORT 2 -#define PROGRESS_COMPLETE 3 - -#define SERVICE_RUNNING 0 -#define SERVICE_COMPLETE 1 - enum { proxyRecv=0, proxySend=1 }; static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) { @@ -720,13 +712,13 @@ static ncclResult_t ncclProxyGetPostedOps(struct ncclProxyState* proxyState, int if (state->active == NULL) { pthread_mutex_lock(&pool->mutex); - while (pool->nextOps == -1 && state->stop == PROGRESS_RUNNING) { + while (pool->nextOps == -1 && !state->stop) { struct ncclProxyArgs profArgs; // Only used for profiling purposes ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileSleep); pthread_cond_wait(&pool->cond, &pool->mutex); ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileWakeup); } - if (state->stop != PROGRESS_RUNNING) { // We might have been woken up to stop. + if (state->stop) { // We might have been woken up to stop. pthread_mutex_unlock(&pool->mutex); return ncclSuccess; } @@ -864,7 +856,7 @@ void* ncclProxyProgress(void *proxyState_) { * frequency of calling ncclProxyGetPostedOps() and reduce the perf impact. */ int proxyOpAppendCounter = 0; struct ncclProxyArgs profArgs; // Only used for profiling purposes - while (state->stop == PROGRESS_RUNNING || (state->stop == PROGRESS_REQUEST_STOP && state->active)) { + while ((state->stop == 0 || (state->stop == 1 && state->active)) && *proxyState->abortFlag == 0) { int idle = 1; ncclResult_t ret = progressOps(proxyState, state, state->active, &idle); if (ret != ncclSuccess) { @@ -878,7 +870,7 @@ void* ncclProxyProgress(void *proxyState_) { int added = 0; proxyOpAppendCounter = 0; TIME_START(3); - if (state->stop == PROGRESS_RUNNING) + if (state->stop == 0) ret = ncclProxyGetPostedOps(proxyState, &added); if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); } if (ret != ncclSuccess) { @@ -891,9 +883,6 @@ void* ncclProxyProgress(void *proxyState_) { } lastIdle = idle; } - - /* progress serive thread should be waiting for me, I need to notify it. */ - __atomic_store_n(&state->stop, PROGRESS_COMPLETE, __ATOMIC_RELEASE); return NULL; } @@ -916,11 +905,7 @@ ncclResult_t ncclProxyStart(struct ncclComm* comm) { static ncclResult_t ncclProxyProgressCreate(struct ncclProxyState* proxyState) { struct ncclProxyProgressState* state = &proxyState->progressState; if (!state->thread) { - pthread_attr_t attr; - SYSCHECK(pthread_attr_init(&attr), "pthread_attr_init"); - SYSCHECK(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED), "pthread_attr_setdetachstate"); - SYSCHECK(pthread_create(&state->thread, &attr, ncclProxyProgress, proxyState), "pthread_create"); - SYSCHECK(pthread_attr_destroy(&attr), "pthread_attr_destroy"); + pthread_create(&state->thread, NULL, ncclProxyProgress, proxyState); ncclSetThreadName(state->thread, "NCCL Progress%2d", proxyState->tpLocalnRanks); } return ncclSuccess; @@ -932,17 +917,10 @@ ncclResult_t ncclProxyProgressDestroy(struct ncclProxyState* proxyState) { // Request the proxy to stop and then wake it if (state->opsPool) { pthread_mutex_lock(&state->opsPool->mutex); - if (*proxyState->abortFlag == 0) - state->stop = PROGRESS_REQUEST_STOP; - else - state->stop = PROGRESS_ABORT; + state->stop = 1; pthread_cond_signal(&state->opsPool->cond); pthread_mutex_unlock(&state->opsPool->mutex); - /* progress thread is always detached, wait for it to exit. */ - uint64_t t0 = clockNano(); - while (__atomic_load_n(&state->stop, __ATOMIC_ACQUIRE) != PROGRESS_COMPLETE) { - if (clockNano() - t0 >= 1000) sched_yield(); - } + pthread_join(state->thread, NULL); } // Free off any memory allocated for the proxy arg pools @@ -1582,19 +1560,6 @@ void* ncclProxyService(void* _args) { ncclSocketClose(proxyState->listenSock); free(proxyState->listenSock); proxyOpsFree(proxyState); - - if (*proxyState->abortFlag) { - /* abort happened, need to notify main thread I am done. */ - __atomic_store_n(&proxyState->stop, SERVICE_COMPLETE, __ATOMIC_RELEASE); - } - - if (ncclAtomicRefCountDecrement(proxyState->abortFlagRefCount) == 0) { - ncclCudaHostFree((void *)proxyState->abortFlag); - free((void*)proxyState->abortFlagRefCount); - } - - /* proxy itself holds one internal ref count, needs to call ncclProxyDestroy */ - ncclProxyDestroy(proxyState); return NULL; } @@ -1603,8 +1568,6 @@ ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union NCCLCHECK(ncclCalloc(&comm->sharedRes->proxyState, 1)); comm->proxyState = comm->sharedRes->proxyState; comm->proxyState->refCount = 1; - /* ref count for communicator and proxy service thread. */ - comm->proxyState->internalRefCount = 2; comm->proxyState->listenSock = sock; comm->proxyState->peerAddresses = peerAddresses; // Seed the random number generator for UDS filename generation @@ -1627,8 +1590,6 @@ ncclResult_t ncclProxyCreate(struct ncclComm* comm) { proxyState->tpLocalnRanks = comm->localRanks; proxyState->cudaDev = comm->cudaDev; proxyState->abortFlag = comm->abortFlag; - proxyState->abortFlagRefCount = comm->abortFlagRefCount; - ncclAtomicRefCountIncrement(comm->abortFlagRefCount); proxyState->p2pnChannels = comm->p2pnChannels; proxyState->p2pChunkSize = comm->p2pChunkSize; proxyState->nChannels = comm->nChannels; @@ -1686,41 +1647,15 @@ ncclResult_t ncclProxyStop(struct ncclComm* comm) { return ncclSuccess; } -ncclResult_t ncclProxyDestroy(struct ncclProxyState *proxyState) { - if (__atomic_sub_fetch(&proxyState->internalRefCount, 1, __ATOMIC_ACQ_REL) == 0) { - free(proxyState->peerAddresses); - free(proxyState->peerSocks); - free(proxyState->proxyOps); - free(proxyState->sharedDevMems); - expectedProxyResponseFree(proxyState); - free(proxyState); - } - return ncclSuccess; -} +ncclResult_t ncclProxyDestroy(struct ncclComm* comm) { + struct ncclProxyState* sharedProxyState = comm->sharedRes->proxyState; -/* detach all proxy threads in case of abort */ -ncclResult_t ncclProxyTryDetach(struct ncclProxyState *proxyState) { - if (proxyState && proxyState->thread) { - /* proxy service thread can call cudaFreeHost to free pinned host mem, but - * it can cause a hang if main thread is issuing other cuda calls. To solution - * should be allocate/free pinned host mem using cuMem* driver API, this waiting - * 5 secs is just a workaround for now. */ - bool join = false; - struct timespec start, now; - clock_gettime(CLOCK_MONOTONIC, &start); - do { - clock_gettime(CLOCK_MONOTONIC, &now); - if (__atomic_load_n(&proxyState->stop, __ATOMIC_ACQUIRE) == SERVICE_COMPLETE) { - /* proxy thread is done, join it. */ - pthread_join(proxyState->thread, NULL); - join = true; - break; - } - } while(now.tv_sec - start.tv_sec < 5); - - if (join == false) { - pthread_detach(proxyState->thread); - } - } + assert(sharedProxyState->refCount == 0); + free(sharedProxyState->peerAddresses); + free(sharedProxyState->peerSocks); + free(sharedProxyState->proxyOps); + free(sharedProxyState->sharedDevMems); + expectedProxyResponseFree(sharedProxyState); + free(sharedProxyState); return ncclSuccess; } diff --git a/src/transport.cc b/src/transport.cc index c66a81e..a465d6b 100644 --- a/src/transport.cc +++ b/src/transport.cc @@ -65,13 +65,28 @@ void dumpData(struct ncclConnect* data, int ndata) { } } +NCCL_PARAM(ConnectRoundMaxPeers, "CONNECT_ROUND_MAX_PEERS", 128); +NCCL_PARAM(ReportConnectProgress, "REPORT_CONNECT_PROGRESS", 0); +#include + ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, int connIndex, int* highestTransportType/*=NULL*/) { // Stream used during transport setup; need for P2P pre-connect + CUDA Graph ncclResult_t ret = ncclSuccess; int highestType = TRANSPORT_P2P; // track highest transport type - struct ncclConnect** data = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Store intermediate send/recvData structs for connect - struct ncclConnect** recvData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given recv connection within a channel - struct ncclConnect** sendData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given send connection within a channel + struct ncclConnect** data; // Store intermediate send/recvData structs for connect + struct ncclConnect** recvData; // Points to entries inside data for given recv connection within a channel + struct ncclConnect** sendData; // Points to entries inside data for given send connection within a channel + int done = 0; + + int maxPeers = ncclParamConnectRoundMaxPeers(); + NCCLCHECK(ncclCalloc(&data, maxPeers)); + NCCLCHECK(ncclCalloc(&recvData, maxPeers)); + NCCLCHECK(ncclCalloc(&sendData, maxPeers)); + + struct timeval timeStart, timeLast; + gettimeofday(&timeStart, NULL); + timeLast = timeStart; // struct copy + bool timeReported = false; NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->hostStream), ret, fail); // First time initialization @@ -87,23 +102,24 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* // The first N entries contain recvData, connection information for recv connections // The next M entries contain sendData, connection information for send connections // It's not guaranteed that each entry of data has the same number of total or send/recv specific connections - data[i] = (ncclConnect*) malloc(sizeof(ncclConnect) * 2*MAXCHANNELS); - recvData[i] = data[i]; + int p = i-(done+1); + if (recvMask || sendMask) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS)); + recvData[p] = data[p]; int sendChannels = 0, recvChannels = 0; int type; TIME_START(0); for (int c=0; c(comm, graph, recvData[i]+recvChannels++, c, recvPeer, connIndex, &type), ret, fail); + NCCLCHECKGOTO(selectTransport<0>(comm, graph, recvData[p]+recvChannels++, c, recvPeer, connIndex, &type), ret, fail); if (type > highestType) highestType = type; } } TIME_STOP(0); TIME_START(1); - sendData[i] = recvData[i]+recvChannels; + sendData[p] = recvData[p]+recvChannels; for (int c=0; c(comm, graph, sendData[i]+sendChannels++, c, sendPeer, connIndex, &type), ret, fail); + NCCLCHECKGOTO(selectTransport<1>(comm, graph, sendData[p]+sendChannels++, c, sendPeer, connIndex, &type), ret, fail); if (type > highestType) highestType = type; } } @@ -112,70 +128,100 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* TIME_START(2); if (sendPeer == recvPeer) { if (recvChannels+sendChannels) { - NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); - NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); - sendData[i] = data[i]; - recvData[i] = data[i]+sendChannels; + NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); + NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); + sendData[p] = data[p]; + recvData[p] = data[p]+sendChannels; } } else { - if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail); - if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail); - if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail); - if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail); + if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail); + if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail); + if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail); + if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail); } TIME_STOP(2); + + if (i-done == maxPeers || i == comm->nRanks-1) { + // Loop until all channels with all ranks have been connected + bool allChannelsConnected; + allChannelsConnected = false; + while (!allChannelsConnected) { + allChannelsConnected = true; + for (int j=done+1; j<=i; j++) { + int recvPeer = (comm->rank - j + comm->nRanks) % comm->nRanks; + int sendPeer = (comm->rank + j) % comm->nRanks; + uint64_t recvMask = comm->connectRecv[recvPeer]; + uint64_t sendMask = comm->connectSend[sendPeer]; + + int p = j-(done+1); + int sendDataOffset = 0; + int recvDataOffset = 0; + for (int c=0; cchannels[c].peers[sendPeer]->send + connIndex; + // This connector hasn't completed connection yet + if (conn->connected == 0) { + NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[p] + sendDataOffset++, 1, comm->rank, conn), ret, fail); + if (ret == ncclSuccess) { + conn->connected = 1; + /* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */ + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); + } else if (ret == ncclInProgress) { + allChannelsConnected = false; + } + } + } + TIME_STOP(3); + + // Start with recv channels + TIME_START(4); + if (recvMask & (1UL<channels[c].peers[recvPeer]->recv + connIndex; + // This connector hasn't completed connection yet + if (conn->connected == 0) { + NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[p] + recvDataOffset++, 1, comm->rank, conn), ret, fail); + if (ret == ncclSuccess) { + conn->connected = 1; + /* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */ + CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[recvPeer]->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); + } else if (ret == ncclInProgress) { + allChannelsConnected = false; + } + } + } + TIME_STOP(4); + } + if (sendMask || recvMask) { + free(data[p]); + data[p] = NULL; + } + } + if (ncclParamReportConnectProgress() && comm->rank == 0) { + struct timeval now; + gettimeofday(&now, NULL); + if (((now.tv_sec - timeLast.tv_sec)*1.0 + (now.tv_usec-timeLast.tv_usec)*1e-6) > 1) { + float elapsed = (now.tv_sec - timeStart.tv_sec)*1.0 + (now.tv_usec-timeStart.tv_usec)*1e-6; + float remaining = elapsed*(comm->nRanks-done)/done; + printf("%sP2p connect: %g%% Elapsed %d:%02d Remaining %d:%02d ", + timeReported ? "\r" : "", done*100.0/comm->nRanks, ((int)elapsed)/60, ((int)elapsed)%60, ((int)remaining)/60, ((int)remaining)%60); + fflush(stdout); + timeReported = true; + timeLast = now; // struct copy; + } + } + } + done = i; + } } - // Loop until all channels with all ranks have been connected - bool allChannelsConnected; - allChannelsConnected = false; - while (!allChannelsConnected) { - allChannelsConnected = true; - for (int i=1; inRanks; i++) { - int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; - int sendPeer = (comm->rank + i) % comm->nRanks; - uint64_t recvMask = comm->connectRecv[recvPeer]; - uint64_t sendMask = comm->connectSend[sendPeer]; - - int sendDataOffset = 0; - int recvDataOffset = 0; - for (int c=0; cchannels[c].peers[sendPeer]->send + connIndex; - // This connector hasn't completed connection yet - if (conn->connected == 0) { - NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[i] + sendDataOffset++, 1, comm->rank, conn), ret, fail); - if (ret == ncclSuccess) { - conn->connected = 1; - /* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */ - CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[sendPeer]->send[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); - } else if (ret == ncclInProgress) { - allChannelsConnected = false; - } - } - } - TIME_STOP(3); - - // Start with recv channels - TIME_START(4); - if (recvMask & (1UL<channels[c].peers[recvPeer]->recv + connIndex; - // This connector hasn't completed connection yet - if (conn->connected == 0) { - NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[i] + recvDataOffset++, 1, comm->rank, conn), ret, fail); - if (ret == ncclSuccess) { - conn->connected = 1; - /* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */ - CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[recvPeer]->recv[connIndex], &conn->conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), ret, fail); - } else if (ret == ncclInProgress) { - allChannelsConnected = false; - } - } - } - TIME_STOP(4); - } - } + if (timeReported) { + struct timeval now; + gettimeofday(&now, NULL); + float elapsed = (now.tv_sec - timeStart.tv_sec)*1.0 + (now.tv_usec-timeStart.tv_usec)*1e-6; + printf("\rP2p connect done in %d:%02d \n", + ((int)elapsed)/60, ((int)elapsed)%60); + fflush(stdout); } /* We need to sync ranks here since some ranks might run too fast after connection setup @@ -205,7 +251,6 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* } } comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0UL; - free(data[i]); } free(data); diff --git a/src/transport/nvls.cc b/src/transport/nvls.cc index c9a3bbc..4dfae51 100644 --- a/src/transport/nvls.cc +++ b/src/transport/nvls.cc @@ -393,18 +393,18 @@ ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) { typeSize = sizeof(struct localRegData); if (comm->localRank == 0) { shmPath[0] = '\0'; - NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, comm->localRanks - 1, &comm->nvlsShmemHandle), res, cleanup); + NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, comm->localRanks - 1, &comm->nvlsResources->nvlsShmemHandle), res, cleanup); NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup); } else { NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup); - NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, -1, &comm->nvlsShmemHandle), res, cleanup); + NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, -1, &comm->nvlsResources->nvlsShmemHandle), res, cleanup); } /* need 2 pools and a shared counter for shmem-based collectives */ - comm->nvlsShmem.cnt[0] = (size_t*)nvlsShmem; - comm->nvlsShmem.ptr[0] = (void*)((char*)comm->nvlsShmem.cnt[0] + sizeof(size_t)); - comm->nvlsShmem.cnt[1] = (size_t*)((char*)comm->nvlsShmem.ptr[0] + typeSize * comm->localRanks); - comm->nvlsShmem.ptr[1] = (void*)((char*)comm->nvlsShmem.cnt[1] + sizeof(size_t)); - comm->nvlsShmem.round = 0; + comm->nvlsResources->nvlsShmem.cnt[0] = (size_t*)nvlsShmem; + comm->nvlsResources->nvlsShmem.ptr[0] = (void*)((char*)comm->nvlsResources->nvlsShmem.cnt[0] + sizeof(size_t)); + comm->nvlsResources->nvlsShmem.cnt[1] = (size_t*)((char*)comm->nvlsResources->nvlsShmem.ptr[0] + typeSize * comm->localRanks); + comm->nvlsResources->nvlsShmem.ptr[1] = (void*)((char*)comm->nvlsResources->nvlsShmem.cnt[1] + sizeof(size_t)); + comm->nvlsResources->nvlsShmem.round = 0; return res; @@ -418,6 +418,7 @@ ncclResult_t ncclNvlsFree(struct ncclComm* comm) { if (resources == NULL) return ncclSuccess; if (ncclAtomicRefCountDecrement(&resources->refCount) == 0) { + NCCLCHECK(ncclShmClose(resources->nvlsShmemHandle)); NCCLCHECK(nvlsGroupUnbind(comm, resources)); NCCLCHECK(nvlsGroupUnmapMem(comm, resources)); free(resources); @@ -476,7 +477,7 @@ ncclResult_t tryRegisterBuffer(struct ncclComm *comm, struct localRequestData *r /* get all buffer addresses */ NCCLCHECKGOTO(ncclCalloc(®Record->addrs, comm->localRanks), ret, fail); regRecord->addrs[comm->localRank] = regRecord->buff; - NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsShmem, regRecord->addrs + comm->localRank, regRecord->addrs, sizeof(uintptr_t)), ret, fail); + NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsResources->nvlsShmem, regRecord->addrs + comm->localRank, regRecord->addrs, sizeof(uintptr_t)), ret, fail); /* enqueue record */ ncclIntruQueueEnqueue(&comm->regRecordQueue, regRecord); @@ -551,7 +552,7 @@ ncclResult_t ncclNvlsLocalRegisterBuffer(struct ncclComm *comm, const void *send regRequestHead = regRequestHead->next; } - NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsShmem, regData + comm->localRank, regData, sizeof(struct localRegData)), ret, fail); + NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsResources->nvlsShmem, regData + comm->localRank, regData, sizeof(struct localRegData)), ret, fail); /* first check whether all local ranks find their registered buffer */ for (int i = 0; i < comm->localRanks; ++i) {