Split transport connect phase into multiple steps to avoid port
exhaustion when connecting alltoall at large scale. Defaults to 128
peers per round.
Fix memory leaks on CUDA graph capture.
Fix alltoallv crash on self-sendrecv.
Make topology detection more deterministic when PCI speeds are not
available (fix issue #1020).
Properly close shared memory in NVLS resources.
Revert proxy detach after 5 seconds.
Add option to print progress during transport connect.
Add option to set NCCL_DEBUG to INFO on first WARN.
This commit is contained in:
Sylvain Jeaugey 2023-11-13 10:26:55 -08:00
parent 0e35f5d390
commit 88d44d777f
11 changed files with 174 additions and 183 deletions

View File

@ -1,6 +1,6 @@
##### version ##### version
NCCL_MAJOR := 2 NCCL_MAJOR := 2
NCCL_MINOR := 19 NCCL_MINOR := 19
NCCL_PATCH := 3 NCCL_PATCH := 4
NCCL_SUFFIX := NCCL_SUFFIX :=
PKG_REVISION := 1 PKG_REVISION := 1

View File

@ -139,6 +139,8 @@ void ncclDebugInit() {
pthread_mutex_unlock(&ncclDebugLock); pthread_mutex_unlock(&ncclDebugLock);
} }
NCCL_PARAM(WarnSetDebugInfo, "WARN_ENABLE_DEBUG_INFO", 0);
/* Common logging function used by the INFO, WARN and TRACE macros /* Common logging function used by the INFO, WARN and TRACE macros
* Also exported to the dynamically loadable Net transport modules so * Also exported to the dynamically loadable Net transport modules so
* they can share the debugging mechanisms and output files * they can share the debugging mechanisms and output files
@ -172,6 +174,7 @@ void ncclDebugLog(ncclDebugLogLevel level, unsigned long flags, const char *file
if (level == NCCL_LOG_WARN) { if (level == NCCL_LOG_WARN) {
len = snprintf(buffer, sizeof(buffer), "\n%s:%d:%d [%d] %s:%d NCCL WARN ", len = snprintf(buffer, sizeof(buffer), "\n%s:%d:%d [%d] %s:%d NCCL WARN ",
hostname, pid, tid, cudaDev, filefunc, line); hostname, pid, tid, cudaDev, filefunc, line);
if (ncclParamWarnSetDebugInfo()) ncclDebugLevel = NCCL_LOG_INFO;
} else if (level == NCCL_LOG_INFO) { } else if (level == NCCL_LOG_INFO) {
len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] NCCL INFO ", hostname, pid, tid, cudaDev); len = snprintf(buffer, sizeof(buffer), "%s:%d:%d [%d] NCCL INFO ", hostname, pid, tid, cudaDev);
} else if (level == NCCL_LOG_TRACE && flags == NCCL_CALL) { } else if (level == NCCL_LOG_TRACE && flags == NCCL_CALL) {

View File

@ -627,13 +627,12 @@ static ncclResult_t scheduleP2pTasksToPlan(
while (nChannelsMax*nRanks > comm->p2pnChannels*4 && nChannelsMax > 1) nChannelsMax /= 2; while (nChannelsMax*nRanks > comm->p2pnChannels*4 && nChannelsMax > 1) nChannelsMax /= 2;
} }
bool fuseOk; bool fuseOk = false;
// We can perform 8 send/recv per round per CTA. Make sure we jump between fused blocks at node boundaries. // We can perform 8 send/recv per round per CTA. Make sure we jump between fused blocks at node boundaries.
while (tasks->nTasksP2p != 0) { while (tasks->nTasksP2p != 0) {
for (int i=0; i < tasks->p2pOrderSteps; i++) { for (int i=0; i < tasks->p2pOrderSteps; i++) {
int sendPeer = sendOrder[i]; int sendPeer = sendOrder[i];
int recvPeer = recvOrder[i]; int recvPeer = recvOrder[i];
if ((i % (NCCL_MAX_WORK_ELEMENTS_P2P/2)) == 0) fuseOk = false;
struct ncclTaskP2p* send = sendPeer != -1 ? ncclIntruQueueHead(&peers[sendPeer].sendQueue) : NULL; struct ncclTaskP2p* send = sendPeer != -1 ? ncclIntruQueueHead(&peers[sendPeer].sendQueue) : NULL;
struct ncclTaskP2p* recv = recvPeer != -1 ? ncclIntruQueueHead(&peers[recvPeer].recvQueue) : NULL; struct ncclTaskP2p* recv = recvPeer != -1 ? ncclIntruQueueHead(&peers[recvPeer].recvQueue) : NULL;
if (sendPeer == comm->rank) { if (sendPeer == comm->rank) {
@ -669,6 +668,7 @@ static ncclResult_t scheduleP2pTasksToPlan(
if (send) sendBytes -= send->chunk*sendChunkBytesMax; if (send) sendBytes -= send->chunk*sendChunkBytesMax;
do { do {
if ((i % (NCCL_MAX_WORK_ELEMENTS_P2P/2)) == 0) fuseOk = false;
ssize_t recvChunkBytes = std::min(recvBytes, recvChunkBytesMax); // -1 preserved ssize_t recvChunkBytes = std::min(recvBytes, recvChunkBytesMax); // -1 preserved
ssize_t sendChunkBytes = std::min(sendBytes, sendChunkBytesMax); ssize_t sendChunkBytes = std::min(sendBytes, sendChunkBytesMax);
if (recvChunkBytes != 0) { if (recvChunkBytes != 0) {
@ -879,6 +879,14 @@ static ncclResult_t reclaimPlan(struct ncclComm* comm, struct ncclCommCallback*
if (plan->persistent) { if (plan->persistent) {
comm->persistentRefs -= 1; comm->persistentRefs -= 1;
NCCLCHECK(ncclCudaFree(plan->workHead)); NCCLCHECK(ncclCudaFree(plan->workHead));
for (int c=0; c < plan->channelUbound; c++) {
struct ncclProxyOp* q = ncclIntruQueueHead(&plan->channels[c].proxyOpQueue);
while (q != nullptr) {
struct ncclProxyOp* q1 = q->enqNext;
ncclMemoryPoolFree(&plan->memPool_ncclProxyOp, q);
q = q1;
}
}
while (!ncclIntruQueueEmpty(&plan->ipcMemQueue)) { while (!ncclIntruQueueEmpty(&plan->ipcMemQueue)) {
struct ncclPointerList* q = ncclIntruQueueDequeue(&plan->ipcMemQueue); struct ncclPointerList* q = ncclIntruQueueDequeue(&plan->ipcMemQueue);
CUDACHECKIGNORE(cudaIpcCloseMemHandle(q->ptr)); CUDACHECKIGNORE(cudaIpcCloseMemHandle(q->ptr));
@ -1093,9 +1101,16 @@ ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan
ncclResult_t ncclLaunchKernelAfter_NoCuda(struct ncclComm* comm, struct ncclKernelPlan* plan) { ncclResult_t ncclLaunchKernelAfter_NoCuda(struct ncclComm* comm, struct ncclKernelPlan* plan) {
if (!(plan->persistent || comm->persistentRefs != 0 || ncclCudaLaunchBlocking)) { if (!(plan->persistent || comm->persistentRefs != 0 || ncclCudaLaunchBlocking)) {
// If this isn't being captured and there aren't any CUDA graphs alive // We are not using the host stream for proxy ops and reclaimation submission.
// then we don't need to do our proxyOp pushing on the host stream.
NCCLCHECK(hostStreamPlanTask(comm, plan)); NCCLCHECK(hostStreamPlanTask(comm, plan));
} else {
// We are using the host stream for proxy ops and reclaimation submission.
// Only plans with proxy ops have a callback pushed by ncclLaunchPrepare.
// Since non-persistent plans also require reclaimation, we have to do it
// here.
if (!plan->persistent && !plan->hasProxyOps) {
ncclIntruQueueMpscEnqueue(&comm->callbackQueue, &plan->reclaimer);
}
} }
return ncclSuccess; return ncclSuccess;
} }

View File

@ -9,6 +9,7 @@
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include <ctype.h> #include <ctype.h>
#include <float.h>
#include "core.h" #include "core.h"
#include "nvmlwrap.h" #include "nvmlwrap.h"
#include "xml.h" #include "xml.h"
@ -500,11 +501,11 @@ ncclResult_t ncclTopoGetXmlFromSys(struct ncclXmlNode* pciNode, struct ncclXml*
if (index == -1) { if (index == -1) {
if (path) { if (path) {
char deviceSpeedStr[MAX_STR_LEN]; char deviceSpeedStr[MAX_STR_LEN];
float deviceSpeed; float deviceSpeed = FLT_MAX;
NCCLCHECK(ncclTopoGetStrFromSys(path, "max_link_speed", deviceSpeedStr)); NCCLCHECK(ncclTopoGetStrFromSys(path, "max_link_speed", deviceSpeedStr));
sscanf(deviceSpeedStr, "%f GT/s", &deviceSpeed); sscanf(deviceSpeedStr, "%f GT/s", &deviceSpeed);
char portSpeedStr[MAX_STR_LEN]; char portSpeedStr[MAX_STR_LEN];
float portSpeed; float portSpeed = FLT_MAX;
NCCLCHECK(ncclTopoGetStrFromSys(path, "../max_link_speed", portSpeedStr)); NCCLCHECK(ncclTopoGetStrFromSys(path, "../max_link_speed", portSpeedStr));
sscanf(portSpeedStr, "%f GT/s", &portSpeed); sscanf(portSpeedStr, "%f GT/s", &portSpeed);
NCCLCHECK(xmlSetAttr(pciNode, "link_speed", portSpeed < deviceSpeed ? portSpeedStr : deviceSpeedStr)); NCCLCHECK(xmlSetAttr(pciNode, "link_speed", portSpeed < deviceSpeed ? portSpeedStr : deviceSpeedStr));

View File

@ -299,7 +299,7 @@ struct ncclComm {
// Flag to ask NCCL kernels to abort // Flag to ask NCCL kernels to abort
volatile uint32_t *abortFlag; volatile uint32_t *abortFlag;
volatile uint32_t *childAbortFlag; volatile uint32_t *childAbortFlag;
volatile uint32_t *abortFlagRefCount; uint32_t *abortFlagRefCount;
// Device side of the communicator (for cudaFree's) // Device side of the communicator (for cudaFree's)
struct ncclDevComm* devComm; // actually = &ncclDevCommAndChannels::comm struct ncclDevComm* devComm; // actually = &ncclDevCommAndChannels::comm
@ -342,8 +342,6 @@ struct ncclComm {
int nvlsRegSupport; int nvlsRegSupport;
/* sharable NVLS resource. */ /* sharable NVLS resource. */
struct ncclNvlsSharedRes* nvlsResources; struct ncclNvlsSharedRes* nvlsResources;
struct ncclShmemCollBuff nvlsShmem;
void *nvlsShmemHandle;
ssize_t channelSize; // User requested work size (bytes) for channel partitions ssize_t channelSize; // User requested work size (bytes) for channel partitions

View File

@ -194,7 +194,6 @@ struct ncclProxyRpcResponseHeader {
}; };
struct ncclProxyState { struct ncclProxyState {
int internalRefCount;
int refCount; int refCount;
int tpRank; int tpRank;
int tpnRanks; int tpnRanks;
@ -209,11 +208,10 @@ struct ncclProxyState {
ncclNet_t* ncclNet; ncclNet_t* ncclNet;
ncclCollNet_t* ncclCollNet; ncclCollNet_t* ncclCollNet;
volatile uint32_t* abortFlag; volatile uint32_t* abortFlag;
volatile uint32_t* abortFlagRefCount;
// Service thread // Service thread
pthread_t thread; pthread_t thread;
struct ncclSocket* listenSock; struct ncclSocket* listenSock;
volatile int stop; int stop;
CUcontext cudaCtx; CUcontext cudaCtx;
ncclResult_t asyncResult; ncclResult_t asyncResult;
@ -294,6 +292,5 @@ ncclResult_t ncclProxyClientGetFdBlocking(struct ncclComm* comm, struct ncclProx
ncclResult_t ncclProxyStop(struct ncclComm* comm); ncclResult_t ncclProxyStop(struct ncclComm* comm);
ncclResult_t ncclProxyShmUnlink(struct ncclComm* comm); ncclResult_t ncclProxyShmUnlink(struct ncclComm* comm);
ncclResult_t ncclProxyDestroy(struct ncclProxyState *proxyState); ncclResult_t ncclProxyDestroy(struct ncclComm* comm);
ncclResult_t ncclProxyTryDetach(struct ncclProxyState *proxyState);
#endif #endif

View File

@ -67,6 +67,8 @@ struct ncclNvlsSharedRes {
char shareableHandle[NVLS_HANDLE_SIZE]; char shareableHandle[NVLS_HANDLE_SIZE];
size_t ucGran; size_t ucGran;
int nChannels; int nChannels;
struct ncclShmemCollBuff nvlsShmem;
void *nvlsShmemHandle;
}; };
#endif /* CUDART_VERSION >= 12010 */ #endif /* CUDART_VERSION >= 12010 */

View File

@ -179,13 +179,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
* free all intra-process communicators; therefore, we only need to focus on local * free all intra-process communicators; therefore, we only need to focus on local
* resource cleanup in commFree(). */ * resource cleanup in commFree(). */
if (comm->proxyState && comm->proxyRefCountOld == 0 && comm->proxyState->thread) { if (comm->proxyState && comm->proxyRefCountOld == 0 && comm->proxyState->thread) {
if (*comm->abortFlag == 0) {
/* regular thread join */
pthread_join(comm->proxyState->thread, nullptr); pthread_join(comm->proxyState->thread, nullptr);
} else {
/* try to detach thread due to abort */
ncclProxyTryDetach(comm->proxyState);
}
} }
delete[] comm->userRedOps; delete[] comm->userRedOps;
@ -219,7 +213,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
free(comm->sharedRes->tpRankToLocalRank); free(comm->sharedRes->tpRankToLocalRank);
NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->hostStream)); NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->hostStream));
NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->deviceStream)); NCCLCHECK(ncclStrongStreamDestruct(&comm->sharedRes->deviceStream));
NCCLCHECK(ncclProxyDestroy(comm->sharedRes->proxyState)); NCCLCHECK(ncclProxyDestroy(comm));
free(comm->sharedRes); free(comm->sharedRes);
} }
} }
@ -237,7 +231,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
if (ncclAtomicRefCountDecrement(comm->abortFlagRefCount) == 0) { if (ncclAtomicRefCountDecrement(comm->abortFlagRefCount) == 0) {
NCCLCHECK(ncclCudaHostFree((void *)comm->abortFlag)); NCCLCHECK(ncclCudaHostFree((void *)comm->abortFlag));
free((void*)comm->abortFlagRefCount); free(comm->abortFlagRefCount);
} }
free((void*)comm->config.netName); free((void*)comm->config.netName);
@ -1645,7 +1639,7 @@ exit:
fail: fail:
if (comm) { if (comm) {
if (comm->abortFlag) ncclCudaHostFree((void *)comm->abortFlag); if (comm->abortFlag) ncclCudaHostFree((void *)comm->abortFlag);
if (comm->abortFlagRefCount) free((void*)comm->abortFlagRefCount); if (comm->abortFlagRefCount) free(comm->abortFlagRefCount);
free(comm); free(comm);
} }
if (newcomm) *newcomm = NULL; if (newcomm) *newcomm = NULL;
@ -2086,7 +2080,7 @@ fail:
if (childComm) { if (childComm) {
if (comm && !comm->config.splitShare) { if (comm && !comm->config.splitShare) {
if (childComm->abortFlag) ncclCudaHostFree((void*)childComm->abortFlag); if (childComm->abortFlag) ncclCudaHostFree((void*)childComm->abortFlag);
if (childComm->abortFlagRefCount) free((void*)childComm->abortFlagRefCount); if (childComm->abortFlagRefCount) free(childComm->abortFlagRefCount);
} }
free(childComm); free(childComm);
} }

View File

@ -18,14 +18,6 @@
#include <unistd.h> #include <unistd.h>
#include <sys/time.h> #include <sys/time.h>
#define PROGRESS_RUNNING 0
#define PROGRESS_REQUEST_STOP 1
#define PROGRESS_ABORT 2
#define PROGRESS_COMPLETE 3
#define SERVICE_RUNNING 0
#define SERVICE_COMPLETE 1
enum { proxyRecv=0, proxySend=1 }; enum { proxyRecv=0, proxySend=1 };
static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) { static bool NeedProxy(int type, int pattern, int root, struct ncclRing* ring, int nranks) {
@ -720,13 +712,13 @@ static ncclResult_t ncclProxyGetPostedOps(struct ncclProxyState* proxyState, int
if (state->active == NULL) { if (state->active == NULL) {
pthread_mutex_lock(&pool->mutex); pthread_mutex_lock(&pool->mutex);
while (pool->nextOps == -1 && state->stop == PROGRESS_RUNNING) { while (pool->nextOps == -1 && !state->stop) {
struct ncclProxyArgs profArgs; // Only used for profiling purposes struct ncclProxyArgs profArgs; // Only used for profiling purposes
ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileSleep); ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileSleep);
pthread_cond_wait(&pool->cond, &pool->mutex); pthread_cond_wait(&pool->cond, &pool->mutex);
ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileWakeup); ncclProfilingRecord(&profArgs, 0, 0, ncclProxyProfileWakeup);
} }
if (state->stop != PROGRESS_RUNNING) { // We might have been woken up to stop. if (state->stop) { // We might have been woken up to stop.
pthread_mutex_unlock(&pool->mutex); pthread_mutex_unlock(&pool->mutex);
return ncclSuccess; return ncclSuccess;
} }
@ -864,7 +856,7 @@ void* ncclProxyProgress(void *proxyState_) {
* frequency of calling ncclProxyGetPostedOps() and reduce the perf impact. */ * frequency of calling ncclProxyGetPostedOps() and reduce the perf impact. */
int proxyOpAppendCounter = 0; int proxyOpAppendCounter = 0;
struct ncclProxyArgs profArgs; // Only used for profiling purposes struct ncclProxyArgs profArgs; // Only used for profiling purposes
while (state->stop == PROGRESS_RUNNING || (state->stop == PROGRESS_REQUEST_STOP && state->active)) { while ((state->stop == 0 || (state->stop == 1 && state->active)) && *proxyState->abortFlag == 0) {
int idle = 1; int idle = 1;
ncclResult_t ret = progressOps(proxyState, state, state->active, &idle); ncclResult_t ret = progressOps(proxyState, state, state->active, &idle);
if (ret != ncclSuccess) { if (ret != ncclSuccess) {
@ -878,7 +870,7 @@ void* ncclProxyProgress(void *proxyState_) {
int added = 0; int added = 0;
proxyOpAppendCounter = 0; proxyOpAppendCounter = 0;
TIME_START(3); TIME_START(3);
if (state->stop == PROGRESS_RUNNING) if (state->stop == 0)
ret = ncclProxyGetPostedOps(proxyState, &added); ret = ncclProxyGetPostedOps(proxyState, &added);
if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); } if (added) { TIME_STOP(3); } else { TIME_CANCEL(3); }
if (ret != ncclSuccess) { if (ret != ncclSuccess) {
@ -891,9 +883,6 @@ void* ncclProxyProgress(void *proxyState_) {
} }
lastIdle = idle; lastIdle = idle;
} }
/* progress serive thread should be waiting for me, I need to notify it. */
__atomic_store_n(&state->stop, PROGRESS_COMPLETE, __ATOMIC_RELEASE);
return NULL; return NULL;
} }
@ -916,11 +905,7 @@ ncclResult_t ncclProxyStart(struct ncclComm* comm) {
static ncclResult_t ncclProxyProgressCreate(struct ncclProxyState* proxyState) { static ncclResult_t ncclProxyProgressCreate(struct ncclProxyState* proxyState) {
struct ncclProxyProgressState* state = &proxyState->progressState; struct ncclProxyProgressState* state = &proxyState->progressState;
if (!state->thread) { if (!state->thread) {
pthread_attr_t attr; pthread_create(&state->thread, NULL, ncclProxyProgress, proxyState);
SYSCHECK(pthread_attr_init(&attr), "pthread_attr_init");
SYSCHECK(pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED), "pthread_attr_setdetachstate");
SYSCHECK(pthread_create(&state->thread, &attr, ncclProxyProgress, proxyState), "pthread_create");
SYSCHECK(pthread_attr_destroy(&attr), "pthread_attr_destroy");
ncclSetThreadName(state->thread, "NCCL Progress%2d", proxyState->tpLocalnRanks); ncclSetThreadName(state->thread, "NCCL Progress%2d", proxyState->tpLocalnRanks);
} }
return ncclSuccess; return ncclSuccess;
@ -932,17 +917,10 @@ ncclResult_t ncclProxyProgressDestroy(struct ncclProxyState* proxyState) {
// Request the proxy to stop and then wake it // Request the proxy to stop and then wake it
if (state->opsPool) { if (state->opsPool) {
pthread_mutex_lock(&state->opsPool->mutex); pthread_mutex_lock(&state->opsPool->mutex);
if (*proxyState->abortFlag == 0) state->stop = 1;
state->stop = PROGRESS_REQUEST_STOP;
else
state->stop = PROGRESS_ABORT;
pthread_cond_signal(&state->opsPool->cond); pthread_cond_signal(&state->opsPool->cond);
pthread_mutex_unlock(&state->opsPool->mutex); pthread_mutex_unlock(&state->opsPool->mutex);
/* progress thread is always detached, wait for it to exit. */ pthread_join(state->thread, NULL);
uint64_t t0 = clockNano();
while (__atomic_load_n(&state->stop, __ATOMIC_ACQUIRE) != PROGRESS_COMPLETE) {
if (clockNano() - t0 >= 1000) sched_yield();
}
} }
// Free off any memory allocated for the proxy arg pools // Free off any memory allocated for the proxy arg pools
@ -1582,19 +1560,6 @@ void* ncclProxyService(void* _args) {
ncclSocketClose(proxyState->listenSock); ncclSocketClose(proxyState->listenSock);
free(proxyState->listenSock); free(proxyState->listenSock);
proxyOpsFree(proxyState); proxyOpsFree(proxyState);
if (*proxyState->abortFlag) {
/* abort happened, need to notify main thread I am done. */
__atomic_store_n(&proxyState->stop, SERVICE_COMPLETE, __ATOMIC_RELEASE);
}
if (ncclAtomicRefCountDecrement(proxyState->abortFlagRefCount) == 0) {
ncclCudaHostFree((void *)proxyState->abortFlag);
free((void*)proxyState->abortFlagRefCount);
}
/* proxy itself holds one internal ref count, needs to call ncclProxyDestroy */
ncclProxyDestroy(proxyState);
return NULL; return NULL;
} }
@ -1603,8 +1568,6 @@ ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union
NCCLCHECK(ncclCalloc(&comm->sharedRes->proxyState, 1)); NCCLCHECK(ncclCalloc(&comm->sharedRes->proxyState, 1));
comm->proxyState = comm->sharedRes->proxyState; comm->proxyState = comm->sharedRes->proxyState;
comm->proxyState->refCount = 1; comm->proxyState->refCount = 1;
/* ref count for communicator and proxy service thread. */
comm->proxyState->internalRefCount = 2;
comm->proxyState->listenSock = sock; comm->proxyState->listenSock = sock;
comm->proxyState->peerAddresses = peerAddresses; comm->proxyState->peerAddresses = peerAddresses;
// Seed the random number generator for UDS filename generation // Seed the random number generator for UDS filename generation
@ -1627,8 +1590,6 @@ ncclResult_t ncclProxyCreate(struct ncclComm* comm) {
proxyState->tpLocalnRanks = comm->localRanks; proxyState->tpLocalnRanks = comm->localRanks;
proxyState->cudaDev = comm->cudaDev; proxyState->cudaDev = comm->cudaDev;
proxyState->abortFlag = comm->abortFlag; proxyState->abortFlag = comm->abortFlag;
proxyState->abortFlagRefCount = comm->abortFlagRefCount;
ncclAtomicRefCountIncrement(comm->abortFlagRefCount);
proxyState->p2pnChannels = comm->p2pnChannels; proxyState->p2pnChannels = comm->p2pnChannels;
proxyState->p2pChunkSize = comm->p2pChunkSize; proxyState->p2pChunkSize = comm->p2pChunkSize;
proxyState->nChannels = comm->nChannels; proxyState->nChannels = comm->nChannels;
@ -1686,41 +1647,15 @@ ncclResult_t ncclProxyStop(struct ncclComm* comm) {
return ncclSuccess; return ncclSuccess;
} }
ncclResult_t ncclProxyDestroy(struct ncclProxyState *proxyState) { ncclResult_t ncclProxyDestroy(struct ncclComm* comm) {
if (__atomic_sub_fetch(&proxyState->internalRefCount, 1, __ATOMIC_ACQ_REL) == 0) { struct ncclProxyState* sharedProxyState = comm->sharedRes->proxyState;
free(proxyState->peerAddresses);
free(proxyState->peerSocks); assert(sharedProxyState->refCount == 0);
free(proxyState->proxyOps); free(sharedProxyState->peerAddresses);
free(proxyState->sharedDevMems); free(sharedProxyState->peerSocks);
expectedProxyResponseFree(proxyState); free(sharedProxyState->proxyOps);
free(proxyState); free(sharedProxyState->sharedDevMems);
} expectedProxyResponseFree(sharedProxyState);
return ncclSuccess; free(sharedProxyState);
}
/* detach all proxy threads in case of abort */
ncclResult_t ncclProxyTryDetach(struct ncclProxyState *proxyState) {
if (proxyState && proxyState->thread) {
/* proxy service thread can call cudaFreeHost to free pinned host mem, but
* it can cause a hang if main thread is issuing other cuda calls. To solution
* should be allocate/free pinned host mem using cuMem* driver API, this waiting
* 5 secs is just a workaround for now. */
bool join = false;
struct timespec start, now;
clock_gettime(CLOCK_MONOTONIC, &start);
do {
clock_gettime(CLOCK_MONOTONIC, &now);
if (__atomic_load_n(&proxyState->stop, __ATOMIC_ACQUIRE) == SERVICE_COMPLETE) {
/* proxy thread is done, join it. */
pthread_join(proxyState->thread, NULL);
join = true;
break;
}
} while(now.tv_sec - start.tv_sec < 5);
if (join == false) {
pthread_detach(proxyState->thread);
}
}
return ncclSuccess; return ncclSuccess;
} }

View File

@ -65,13 +65,28 @@ void dumpData(struct ncclConnect* data, int ndata) {
} }
} }
NCCL_PARAM(ConnectRoundMaxPeers, "CONNECT_ROUND_MAX_PEERS", 128);
NCCL_PARAM(ReportConnectProgress, "REPORT_CONNECT_PROGRESS", 0);
#include <sys/time.h>
ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, int connIndex, int* highestTransportType/*=NULL*/) { ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, int connIndex, int* highestTransportType/*=NULL*/) {
// Stream used during transport setup; need for P2P pre-connect + CUDA Graph // Stream used during transport setup; need for P2P pre-connect + CUDA Graph
ncclResult_t ret = ncclSuccess; ncclResult_t ret = ncclSuccess;
int highestType = TRANSPORT_P2P; // track highest transport type int highestType = TRANSPORT_P2P; // track highest transport type
struct ncclConnect** data = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Store intermediate send/recvData structs for connect struct ncclConnect** data; // Store intermediate send/recvData structs for connect
struct ncclConnect** recvData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given recv connection within a channel struct ncclConnect** recvData; // Points to entries inside data for given recv connection within a channel
struct ncclConnect** sendData = (ncclConnect**) malloc(sizeof(ncclConnect*) * comm->nRanks); // Points to entries inside data for given send connection within a channel struct ncclConnect** sendData; // Points to entries inside data for given send connection within a channel
int done = 0;
int maxPeers = ncclParamConnectRoundMaxPeers();
NCCLCHECK(ncclCalloc(&data, maxPeers));
NCCLCHECK(ncclCalloc(&recvData, maxPeers));
NCCLCHECK(ncclCalloc(&sendData, maxPeers));
struct timeval timeStart, timeLast;
gettimeofday(&timeStart, NULL);
timeLast = timeStart; // struct copy
bool timeReported = false;
NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->hostStream), ret, fail); NCCLCHECKGOTO(ncclStrongStreamAcquireUncaptured(&comm->sharedRes->hostStream), ret, fail);
// First time initialization // First time initialization
@ -87,23 +102,24 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
// The first N entries contain recvData, connection information for recv connections // The first N entries contain recvData, connection information for recv connections
// The next M entries contain sendData, connection information for send connections // The next M entries contain sendData, connection information for send connections
// It's not guaranteed that each entry of data has the same number of total or send/recv specific connections // It's not guaranteed that each entry of data has the same number of total or send/recv specific connections
data[i] = (ncclConnect*) malloc(sizeof(ncclConnect) * 2*MAXCHANNELS); int p = i-(done+1);
recvData[i] = data[i]; if (recvMask || sendMask) NCCLCHECK(ncclCalloc(data+p, 2*MAXCHANNELS));
recvData[p] = data[p];
int sendChannels = 0, recvChannels = 0; int sendChannels = 0, recvChannels = 0;
int type; int type;
TIME_START(0); TIME_START(0);
for (int c=0; c<MAXCHANNELS; c++) { for (int c=0; c<MAXCHANNELS; c++) {
if (recvMask & (1UL<<c)) { if (recvMask & (1UL<<c)) {
NCCLCHECKGOTO(selectTransport<0>(comm, graph, recvData[i]+recvChannels++, c, recvPeer, connIndex, &type), ret, fail); NCCLCHECKGOTO(selectTransport<0>(comm, graph, recvData[p]+recvChannels++, c, recvPeer, connIndex, &type), ret, fail);
if (type > highestType) highestType = type; if (type > highestType) highestType = type;
} }
} }
TIME_STOP(0); TIME_STOP(0);
TIME_START(1); TIME_START(1);
sendData[i] = recvData[i]+recvChannels; sendData[p] = recvData[p]+recvChannels;
for (int c=0; c<MAXCHANNELS; c++) { for (int c=0; c<MAXCHANNELS; c++) {
if (sendMask & (1UL<<c)) { if (sendMask & (1UL<<c)) {
NCCLCHECKGOTO(selectTransport<1>(comm, graph, sendData[i]+sendChannels++, c, sendPeer, connIndex, &type), ret, fail); NCCLCHECKGOTO(selectTransport<1>(comm, graph, sendData[p]+sendChannels++, c, sendPeer, connIndex, &type), ret, fail);
if (type > highestType) highestType = type; if (type > highestType) highestType = type;
} }
} }
@ -112,31 +128,32 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
TIME_START(2); TIME_START(2);
if (sendPeer == recvPeer) { if (sendPeer == recvPeer) {
if (recvChannels+sendChannels) { if (recvChannels+sendChannels) {
NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail);
NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[i], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail); NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, data[p], sizeof(struct ncclConnect)*(recvChannels+sendChannels)), ret, fail);
sendData[i] = data[i]; sendData[p] = data[p];
recvData[i] = data[i]+sendChannels; recvData[p] = data[p]+sendChannels;
} }
} else { } else {
if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail); if (recvChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail);
if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail); if (sendChannels) NCCLCHECKGOTO(bootstrapSend(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail);
if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[i], sizeof(struct ncclConnect)*sendChannels), ret, fail); if (sendChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, sendPeer, bootstrapTag, sendData[p], sizeof(struct ncclConnect)*sendChannels), ret, fail);
if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[i], sizeof(struct ncclConnect)*recvChannels), ret, fail); if (recvChannels) NCCLCHECKGOTO(bootstrapRecv(comm->bootstrap, recvPeer, bootstrapTag, recvData[p], sizeof(struct ncclConnect)*recvChannels), ret, fail);
} }
TIME_STOP(2); TIME_STOP(2);
}
if (i-done == maxPeers || i == comm->nRanks-1) {
// Loop until all channels with all ranks have been connected // Loop until all channels with all ranks have been connected
bool allChannelsConnected; bool allChannelsConnected;
allChannelsConnected = false; allChannelsConnected = false;
while (!allChannelsConnected) { while (!allChannelsConnected) {
allChannelsConnected = true; allChannelsConnected = true;
for (int i=1; i<comm->nRanks; i++) { for (int j=done+1; j<=i; j++) {
int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; int recvPeer = (comm->rank - j + comm->nRanks) % comm->nRanks;
int sendPeer = (comm->rank + i) % comm->nRanks; int sendPeer = (comm->rank + j) % comm->nRanks;
uint64_t recvMask = comm->connectRecv[recvPeer]; uint64_t recvMask = comm->connectRecv[recvPeer];
uint64_t sendMask = comm->connectSend[sendPeer]; uint64_t sendMask = comm->connectSend[sendPeer];
int p = j-(done+1);
int sendDataOffset = 0; int sendDataOffset = 0;
int recvDataOffset = 0; int recvDataOffset = 0;
for (int c=0; c<MAXCHANNELS; c++) { for (int c=0; c<MAXCHANNELS; c++) {
@ -145,7 +162,7 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
struct ncclConnector* conn = comm->channels[c].peers[sendPeer]->send + connIndex; struct ncclConnector* conn = comm->channels[c].peers[sendPeer]->send + connIndex;
// This connector hasn't completed connection yet // This connector hasn't completed connection yet
if (conn->connected == 0) { if (conn->connected == 0) {
NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[i] + sendDataOffset++, 1, comm->rank, conn), ret, fail); NCCLCHECKGOTO(conn->transportComm->connect(comm, sendData[p] + sendDataOffset++, 1, comm->rank, conn), ret, fail);
if (ret == ncclSuccess) { if (ret == ncclSuccess) {
conn->connected = 1; conn->connected = 1;
/* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */ /* comm->channels[c].devPeers[sendPeer]->send[connIndex] is a device memory access. */
@ -163,7 +180,7 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
struct ncclConnector* conn = comm->channels[c].peers[recvPeer]->recv + connIndex; struct ncclConnector* conn = comm->channels[c].peers[recvPeer]->recv + connIndex;
// This connector hasn't completed connection yet // This connector hasn't completed connection yet
if (conn->connected == 0) { if (conn->connected == 0) {
NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[i] + recvDataOffset++, 1, comm->rank, conn), ret, fail); NCCLCHECKGOTO(conn->transportComm->connect(comm, recvData[p] + recvDataOffset++, 1, comm->rank, conn), ret, fail);
if (ret == ncclSuccess) { if (ret == ncclSuccess) {
conn->connected = 1; conn->connected = 1;
/* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */ /* comm->channels[c].devPeers[recvPeer]->recv[connIndex] is a device memory access. */
@ -175,8 +192,37 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
} }
TIME_STOP(4); TIME_STOP(4);
} }
if (sendMask || recvMask) {
free(data[p]);
data[p] = NULL;
} }
} }
if (ncclParamReportConnectProgress() && comm->rank == 0) {
struct timeval now;
gettimeofday(&now, NULL);
if (((now.tv_sec - timeLast.tv_sec)*1.0 + (now.tv_usec-timeLast.tv_usec)*1e-6) > 1) {
float elapsed = (now.tv_sec - timeStart.tv_sec)*1.0 + (now.tv_usec-timeStart.tv_usec)*1e-6;
float remaining = elapsed*(comm->nRanks-done)/done;
printf("%sP2p connect: %g%% Elapsed %d:%02d Remaining %d:%02d ",
timeReported ? "\r" : "", done*100.0/comm->nRanks, ((int)elapsed)/60, ((int)elapsed)%60, ((int)remaining)/60, ((int)remaining)%60);
fflush(stdout);
timeReported = true;
timeLast = now; // struct copy;
}
}
}
done = i;
}
}
if (timeReported) {
struct timeval now;
gettimeofday(&now, NULL);
float elapsed = (now.tv_sec - timeStart.tv_sec)*1.0 + (now.tv_usec-timeStart.tv_usec)*1e-6;
printf("\rP2p connect done in %d:%02d \n",
((int)elapsed)/60, ((int)elapsed)%60);
fflush(stdout);
}
/* We need to sync ranks here since some ranks might run too fast after connection setup /* We need to sync ranks here since some ranks might run too fast after connection setup
* and start to destroy the connection after returning from this function; however, the * and start to destroy the connection after returning from this function; however, the
@ -205,7 +251,6 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph*
} }
} }
comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0UL; comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0UL;
free(data[i]);
} }
free(data); free(data);

View File

@ -393,18 +393,18 @@ ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {
typeSize = sizeof(struct localRegData); typeSize = sizeof(struct localRegData);
if (comm->localRank == 0) { if (comm->localRank == 0) {
shmPath[0] = '\0'; shmPath[0] = '\0';
NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, comm->localRanks - 1, &comm->nvlsShmemHandle), res, cleanup); NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, comm->localRanks - 1, &comm->nvlsResources->nvlsShmemHandle), res, cleanup);
NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup); NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup);
} else { } else {
NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup); NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shmPath, sizeof(shmPath)), res, cleanup);
NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, -1, &comm->nvlsShmemHandle), res, cleanup); NCCLCHECKGOTO(ncclShmOpen(shmPath, (sizeof(size_t) + typeSize * comm->localRanks) * 2, (void**)&nvlsShmem, NULL, -1, &comm->nvlsResources->nvlsShmemHandle), res, cleanup);
} }
/* need 2 pools and a shared counter for shmem-based collectives */ /* need 2 pools and a shared counter for shmem-based collectives */
comm->nvlsShmem.cnt[0] = (size_t*)nvlsShmem; comm->nvlsResources->nvlsShmem.cnt[0] = (size_t*)nvlsShmem;
comm->nvlsShmem.ptr[0] = (void*)((char*)comm->nvlsShmem.cnt[0] + sizeof(size_t)); comm->nvlsResources->nvlsShmem.ptr[0] = (void*)((char*)comm->nvlsResources->nvlsShmem.cnt[0] + sizeof(size_t));
comm->nvlsShmem.cnt[1] = (size_t*)((char*)comm->nvlsShmem.ptr[0] + typeSize * comm->localRanks); comm->nvlsResources->nvlsShmem.cnt[1] = (size_t*)((char*)comm->nvlsResources->nvlsShmem.ptr[0] + typeSize * comm->localRanks);
comm->nvlsShmem.ptr[1] = (void*)((char*)comm->nvlsShmem.cnt[1] + sizeof(size_t)); comm->nvlsResources->nvlsShmem.ptr[1] = (void*)((char*)comm->nvlsResources->nvlsShmem.cnt[1] + sizeof(size_t));
comm->nvlsShmem.round = 0; comm->nvlsResources->nvlsShmem.round = 0;
return res; return res;
@ -418,6 +418,7 @@ ncclResult_t ncclNvlsFree(struct ncclComm* comm) {
if (resources == NULL) return ncclSuccess; if (resources == NULL) return ncclSuccess;
if (ncclAtomicRefCountDecrement(&resources->refCount) == 0) { if (ncclAtomicRefCountDecrement(&resources->refCount) == 0) {
NCCLCHECK(ncclShmClose(resources->nvlsShmemHandle));
NCCLCHECK(nvlsGroupUnbind(comm, resources)); NCCLCHECK(nvlsGroupUnbind(comm, resources));
NCCLCHECK(nvlsGroupUnmapMem(comm, resources)); NCCLCHECK(nvlsGroupUnmapMem(comm, resources));
free(resources); free(resources);
@ -476,7 +477,7 @@ ncclResult_t tryRegisterBuffer(struct ncclComm *comm, struct localRequestData *r
/* get all buffer addresses */ /* get all buffer addresses */
NCCLCHECKGOTO(ncclCalloc(&regRecord->addrs, comm->localRanks), ret, fail); NCCLCHECKGOTO(ncclCalloc(&regRecord->addrs, comm->localRanks), ret, fail);
regRecord->addrs[comm->localRank] = regRecord->buff; regRecord->addrs[comm->localRank] = regRecord->buff;
NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsShmem, regRecord->addrs + comm->localRank, regRecord->addrs, sizeof(uintptr_t)), ret, fail); NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsResources->nvlsShmem, regRecord->addrs + comm->localRank, regRecord->addrs, sizeof(uintptr_t)), ret, fail);
/* enqueue record */ /* enqueue record */
ncclIntruQueueEnqueue(&comm->regRecordQueue, regRecord); ncclIntruQueueEnqueue(&comm->regRecordQueue, regRecord);
@ -551,7 +552,7 @@ ncclResult_t ncclNvlsLocalRegisterBuffer(struct ncclComm *comm, const void *send
regRequestHead = regRequestHead->next; regRequestHead = regRequestHead->next;
} }
NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsShmem, regData + comm->localRank, regData, sizeof(struct localRegData)), ret, fail); NCCLCHECKGOTO(ncclShmemAllgather(comm, &comm->nvlsResources->nvlsShmem, regData + comm->localRank, regData, sizeof(struct localRegData)), ret, fail);
/* first check whether all local ranks find their registered buffer */ /* first check whether all local ranks find their registered buffer */
for (int i = 0; i < comm->localRanks; ++i) { for (int i = 0; i < comm->localRanks; ++i) {