From 0524aef7a0333bc79d885e392812519087eab71f Mon Sep 17 00:00:00 2001 From: Kamil Iskra Date: Tue, 22 Apr 2025 13:50:40 -0700 Subject: [PATCH] NCCL 2.26.3-1 Minimize the performance impact of the device kernel profiling support when the profiler plugin is not loaded. Reduce the overheads of CUDA graph capturing, which increased in NCCL 2.26.2 for large graphs. Fix the exchange of enhanced connection establishment (ECE) options to address potential slowdowns on networks utilizing RoCE. Test if cuMem host allocations work and if not, disable them. Enabled by default since NCCL 2.24 if the CUDA driver version is at least 12.6, such allocations rely on NUMA support, which is by default not available under Docker. We recommend invoking Docker with "--cap-add SYS_NICE" to enable it. Fix an initialization error when running with NCCL_NET_GDR_C2C=1 on multiple MNNVL domains with non-uniform network configurations across nodes. Fix the printing of sub-seconds in the debug log when using a custom NCCL_DEBUG_TIMESTAMP_FORMAT setting. --- makefiles/version.mk | 2 +- src/debug.cc | 6 +++- src/device/common.h | 57 +++++++++++++++++++++++++++++++++----- src/enqueue.cc | 27 ++++++++++++++---- src/graph/paths.cc | 6 ++-- src/include/device.h | 2 ++ src/include/graph.h | 2 +- src/include/profiler.h | 1 + src/include/proxy.h | 6 ++++ src/include/strongstream.h | 3 ++ src/misc/cudawrap.cc | 31 +++++++++++++++++++++ src/misc/strongstream.cc | 32 +++++++++++++++++++++ src/plugin/profiler.cc | 6 +++- src/proxy.cc | 20 +++++++------ src/transport/coll_net.cc | 2 +- src/transport/net.cc | 2 +- src/transport/net_ib.cc | 11 ++++---- 17 files changed, 182 insertions(+), 34 deletions(-) diff --git a/makefiles/version.mk b/makefiles/version.mk index df3ee5c..93a71d4 100644 --- a/makefiles/version.mk +++ b/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 NCCL_MINOR := 26 -NCCL_PATCH := 2 +NCCL_PATCH := 3 NCCL_SUFFIX := PKG_REVISION := 1 diff --git a/src/debug.cc b/src/debug.cc index 2eb8d77..e2cc4f8 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -195,6 +195,10 @@ static void ncclDebugInit() { } } + // Replace underscore with spaces... it is hard to put spaces in command line parameters. + for (int i=0; ncclDebugTimestampFormat[i] != '\0'; ++i) { + if (ncclDebugTimestampFormat[i]=='_') ncclDebugTimestampFormat[i] = ' '; + } // Cache pid and hostname getHostName(hostname, 1024, '.'); @@ -301,7 +305,7 @@ void ncclDebugLog(ncclDebugLogLevel level, unsigned long flags, const char *file snprintf(localTimestampFormat + ncclDebugTimestampSubsecondsStart, ncclDebugTimestampSubsecondDigits+1, "%0*ld", ncclDebugTimestampSubsecondDigits, - ts.tv_nsec / (1000000UL/ncclDebugTimestampMaxSubseconds)); + ts.tv_nsec / (1000000000UL/ncclDebugTimestampMaxSubseconds)); strcpy( localTimestampFormat+ncclDebugTimestampSubsecondsStart+ncclDebugTimestampSubsecondDigits, ncclDebugTimestampFormat+ncclDebugTimestampSubsecondsStart+ncclDebugTimestampSubsecondDigits); } diff --git a/src/device/common.h b/src/device/common.h index 2dca70d..855db73 100644 --- a/src/device/common.h +++ b/src/device/common.h @@ -54,6 +54,7 @@ struct ncclShmemData { int workSize; uint32_t workConsumed; uint64_t workCounter; + bool profilerEnabled; struct ncclShmemGroup groups[NCCL_MAX_GROUPS]; uint64_t redOpArgs[NCCL_MAX_NVLS_ARITY+1]; @@ -291,6 +292,48 @@ struct RunWorkBatch { } }; +#define START 0 +#define STOP 1 +#define FINI 2 + +__device__ __forceinline__ bool profilerEnabled(void) { + // Check if any of the workItems in the batch is profiled. If so, there is an equivalent + // profiler ProxyOp waiting for the counter update in the host thread. If this check was + // done only for the first workItem the profiler counter for other workItems in the batch + // could never be updated, leaving the host thread spinning forever for the counter update + // and causing a hang. + bool enabled = false; + for (int i = 0; i < ncclShmem.nWorks && !enabled; i++) { + if (ncclShmem.workType == ncclDevWorkTypeP2p) + enabled = ((struct ncclDevWorkP2p*)ncclShmem.workStorage)[i].profilerEnabled; + else + enabled = ((struct ncclDevWorkColl*)ncclShmem.workStorage)[i].profilerEnabled; + } + return enabled; +} + +__device__ __forceinline__ void profiler(int action) { + if (action == START) { + if (threadIdx.x == 0) { + // increment workCounter regardless of the profiler being active or not + ncclShmem.channel.workCounter += ncclShmem.nWorks; + if(!profilerEnabled()) return; + ncclShmem.comm.workStarted[ncclShmem.channelId] = ncclShmem.channel.workCounter; + } + } else if (action == STOP) { + if (threadIdx.x == 0 && profilerEnabled()) { + ncclShmem.comm.workCompleted[ncclShmem.channelId] = ncclShmem.channel.workCounter; + } + } else { // FINI + if (threadIdx.x == 0) { + // store the workCounter back to vidmem regardless of the profiler being active or not + ((ncclDevCommAndChannels*)ncclShmem.args.comm)->channels[ncclShmem.channelId].workCounter = ncclShmem.channel.workCounter; + if (!profilerEnabled()) return; + ncclShmem.comm.workCompleted[ncclShmem.channelId] = ncclShmem.channel.workCounter; + } + } +} + template __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* args) { int tid = threadIdx.x; @@ -312,7 +355,10 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a } __syncthreads(); // publish ncclShmem.{args, channelId} /* set abort flag to 0 */ - if (tid == 0) ncclShmem.aborted = 0; + if (tid == 0) { + ncclShmem.aborted = 0; + ncclShmem.channel.workCounter = ((ncclDevCommAndChannels*)ncclShmem.args.comm)->channels[ncclShmem.channelId].workCounter; + } // Use first 2 warps to load comm and channel, and remaining load work batch. switch (tid/WARP_SIZE) { @@ -348,7 +394,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a } while (ncclShmem.aborted == 0) { - if (tid == 0) ncclShmem.comm.workStarted[ncclShmem.channelId] = (ncclShmem.channel.workCounter += ncclShmem.nWorks); + profiler(START); if (0 <= SpecializedFnId && ncclShmem.funcId == (unsigned)SpecializedFnId) { SpecializedRunWorkBatch().run(); } else { @@ -358,7 +404,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a if (ncclShmem.nextBatchIx == -1) break; int batchIx = ncclShmem.nextBatchIx; __syncthreads(); - if (tid == 0) ncclShmem.comm.workCompleted[ncclShmem.channelId] = ncclShmem.channel.workCounter; + profiler(STOP); loadWorkBatchToShmem(tid, tn, args, batchIx); __syncthreads(); @@ -367,10 +413,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a ncclShmem.comm.workConsumed[ncclShmem.channelId] = ncclShmem.workConsumed; } } - if (tid == 0) { - ncclShmem.comm.workCompleted[ncclShmem.channelId] = ncclShmem.channel.workCounter; - ((ncclDevCommAndChannels*)ncclShmem.args.comm)->channels[ncclShmem.channelId].workCounter = ncclShmem.channel.workCounter; - } + profiler(FINI); } __global__ void ncclDevKernel_Generic(ncclDevKernelArgs4K NCCL_GRID_CONSTANT const args4K); diff --git a/src/enqueue.cc b/src/enqueue.cc index 5e0b213..4e8a211 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -288,6 +288,7 @@ ncclResult_t ncclTasksRegAndEnqueue(struct ncclComm* comm) { devWork.oneNode = (comm->nNodes == 1); devWork.isOneRPN = comm->isOneRPN; devWork.netRegUsed = devWork.regUsed = 0; + devWork.profilerEnabled = ncclProfilerPluginLoaded() && (task->eActivationMask & ncclProfileKernelCh); if (task->regBufType & NCCL_NET_REG_BUFFER) devWork.netRegUsed = 1; if (task->regBufType & (NCCL_IPC_REG_BUFFER | NCCL_NVLS_REG_BUFFER)) @@ -445,6 +446,7 @@ ncclResult_t ncclPrepareTasks(struct ncclComm* comm, bool* algoNeedConnect, bool devWork.redOpArgIsPtr = task->opDev.scalarArgIsPtr; devWork.oneNode = (comm->nNodes == 1); devWork.netRegUsed = devWork.regUsed = 0; + devWork.profilerEnabled = ncclProfilerPluginLoaded() && (task->eActivationMask & ncclProfileKernelCh); if (task->regBufType & NCCL_NET_REG_BUFFER) devWork.netRegUsed = 1; if (task->regBufType & (NCCL_IPC_REG_BUFFER | NCCL_NVLS_REG_BUFFER)) @@ -557,7 +559,7 @@ static ncclResult_t scheduleCollTasksToPlan( proxyOp.task.coll = task; proxyOp.rank = comm->rank; proxyOp.eActivationMask = task->eActivationMask; - proxyOp.workCounter = ++comm->profiler.workCounter[c]; + proxyOp.incWorkCounter = true; addWorkBatchToPlan(comm, plan, c, workNode->workType, task->devFuncId, plan->workBytes); // Set pattern to profiler to add a proxy profiler for kernel events NCCLCHECK(addProxyOpIfNeeded(comm, plan, &proxyOp)); @@ -681,7 +683,7 @@ static ncclResult_t scheduleCollTasksToPlan( proxyOp->ringAlgo->incRefCount(); } proxyOp->eActivationMask = task->eActivationMask; - proxyOp->workCounter = ++comm->profiler.workCounter[c]; + proxyOp->incWorkCounter = true; addWorkBatchToPlan(comm, plan, c, workNode->workType, task->devFuncId, plan->workBytes); // Coverity reports "proxyOp->connection" as being possibly uninitialized. It's hard to // determine if that's actually true but it's also not clear if that would be an issue. @@ -886,6 +888,7 @@ static ncclResult_t addP2pToPlan( work->recvRank = recvRank; work->recvAddr = recvAddr; work->recvBytes = recvBytes==-1 ? 0 : recvBytes; + work->profilerEnabled = ncclProfilerPluginLoaded() && ((p2pTasks[0] ? p2pTasks[0] : p2pTasks[1])->eActivationMask & ncclProfileKernelCh); struct ncclProxyOp proxyOps[2] = {}; int nProxyOps = selfSend ? 0 : 2; @@ -910,6 +913,7 @@ static ncclResult_t addP2pToPlan( nChannelsMax = std::max(nChannels[0], nChannels[1]); for (int part=0; part < nChannelsMax; part++) { + int incWorkCounter = -1; int channelId = ncclP2pChannelForPart(comm->p2pnChannels, base, part); plan->channelMask |= uint64_t(1)< pair rather than individual p2p + if (proxyOps[dir].nsteps && incWorkCounter < 0) { + proxyOps[dir].incWorkCounter = true; + incWorkCounter = dir; + } + if (proxyOps[dir].nsteps != 0) { // Calculate the opCount after adding batch since then the batch count will // equal one plus the batch index this p2p settled in. proxyOps[dir].channelId = channelId; proxyOps[dir].opCount = uint64_t(comm->planner.wipPlan.channels[channelId].nWorkBatchesP2p)<<1 | 1; - proxyOps[dir].workCounter = comm->profiler.workCounter[channelId]+1; NCCLCHECK(addProxyOpIfNeeded(comm, plan, &proxyOps[dir])); NCCLCHECK(addProfilerProxyOpIfNeeded(comm, plan, &proxyOps[dir])); } } - comm->profiler.workCounter[channelId] += (proxyOps[0].nsteps || proxyOps[1].nsteps) ? 1 : 0; } return ncclSuccess; @@ -1592,7 +1600,16 @@ ncclResult_t ncclLaunchFinish(struct ncclComm* comm) { CUDACHECK(cudaEventRecord(comm->sharedRes->scratchEvent, launchStream)); // deviceStream waits on userStream[0] NCCLCHECK(ncclStrongStreamAcquiredWorkStream(planner->capturingGraph, &comm->sharedRes->deviceStream, /*concurrent=*/false, &deviceStream)); - CUDACHECK(cudaStreamWaitEvent(deviceStream, comm->sharedRes->scratchEvent, 0)); + + // We know that deviceStream is strictly behind the launchStream because launchStream + // synced with it before kernel launch. This allows us to to see deviceStream waiting + // on launchStream as a fast-forward. When building CUDA graphs fast forwards should + // be handled specially so as not to create graphs with a blowup in the number of edges. + // So we could do this: + // CUDACHECK(cudaStreamWaitEvent(deviceStream, comm->sharedRes->scratchEvent, 0)); + // But instead we do: + NCCLCHECK(ncclStreamAdvanceToEvent(planner->capturingGraph, deviceStream, comm->sharedRes->scratchEvent)); + // Each userStream[i] waits on userStream[0] for (struct ncclCudaStreamList* l=planner->streams->next; l != nullptr; l = l->next) { CUDACHECK(cudaStreamWaitEvent(l->stream, comm->sharedRes->scratchEvent, 0)); diff --git a/src/graph/paths.cc b/src/graph/paths.cc index ace4476..9983712 100644 --- a/src/graph/paths.cc +++ b/src/graph/paths.cc @@ -367,7 +367,7 @@ ncclResult_t ncclTopoCheckMNNVL(struct ncclTopoSystem* system, struct ncclPeerIn if ((((long *)&fabricInfo2->clusterUuid)[0]|((long *)fabricInfo2->clusterUuid)[1]) == 0) return ncclSuccess; if ((memcmp(fabricInfo1->clusterUuid, fabricInfo2->clusterUuid, NVML_GPU_FABRIC_UUID_LEN) == 0) && (fabricInfo1->cliqueId == fabricInfo2->cliqueId)) { - INFO(NCCL_NET, "MNNVL matching peer 0x%lx UUID %lx.%lx cliqueId 0x%x", + TRACE(NCCL_NET, "MNNVL matching peer 0x%lx UUID %lx.%lx cliqueId 0x%x", info2->busId, ((long *)fabricInfo2->clusterUuid)[0], ((long *)fabricInfo2->clusterUuid)[1], fabricInfo2->cliqueId); *ret = 1; } @@ -473,7 +473,7 @@ ncclResult_t ncclTopoIsGdrAvail(struct ncclTopoSystem* system, int rank, bool *a NCCL_PARAM(NetForceFlush, "NET_FORCE_FLUSH", 0); // Determine whether we need to flush the GDR recv buffers -ncclResult_t ncclTopoNeedFlush(struct ncclComm* comm, int netDev, int rank, int* flush) { +ncclResult_t ncclTopoNeedFlush(struct ncclComm* comm, int64_t netId, int netDev, int rank, int* flush) { *flush = 1; ncclNetProperties_t props; NCCLCHECK(comm->ncclNet->getProperties(netDev, &props)); @@ -488,7 +488,7 @@ ncclResult_t ncclTopoNeedFlush(struct ncclComm* comm, int netDev, int rank, int* // flags would go through C2C. In that case, force a flush. int c, n; NCCLCHECK(ncclGetLocalCpu(system, g, &c)); - NCCLCHECK(ncclTopoIdToIndex(system, NET, netDev, &n)); + NCCLCHECK(ncclTopoIdToIndex(system, NET, netId, &n)); if (gpu->paths[NET][n].type <= PATH_PXB && gpu->paths[CPU][c].type == PATH_C2C) { *flush = 1; } diff --git a/src/include/device.h b/src/include/device.h index 0763a57..f6ca51b 100644 --- a/src/include/device.h +++ b/src/include/device.h @@ -221,6 +221,7 @@ struct alignas(16) ncclDevWorkP2p { uint8_t sendProtoLL:1, recvProtoLL:1; uint8_t sendNetReg:1, recvNetReg:1; uint8_t sendIpcReg:1, recvIpcReg:1; + uint8_t profilerEnabled:1; }; // Compute the subset of the data transfer corresponding to the given part index. @@ -259,6 +260,7 @@ struct alignas(16) ncclDevWorkColl { uint32_t channelLo:8, channelHi:8; uint32_t nWarps:8; uint32_t redOpArgIsPtr:1, regUsed:1, netRegUsed:1, oneNode:1, direct:2, isOneRPN:1; + uint32_t profilerEnabled:1; uint32_t root; void* recvbuff; void* sendbuff; diff --git a/src/include/graph.h b/src/include/graph.h index b779773..a06556e 100644 --- a/src/include/graph.h +++ b/src/include/graph.h @@ -43,7 +43,7 @@ enum ncclTopoGdrMode { ncclTopoGdrModeNum = 3 }; ncclResult_t ncclTopoCheckGdr(struct ncclTopoSystem* topo, int rank, int64_t netId, int read, enum ncclTopoGdrMode* gdrMode); -ncclResult_t ncclTopoNeedFlush(struct ncclComm* comm, int netDev, int rank, int* flush); +ncclResult_t ncclTopoNeedFlush(struct ncclComm* comm, int64_t netId, int netDev, int rank, int* flush); ncclResult_t ncclTopoIsGdrAvail(struct ncclTopoSystem* system, int rank, bool *avail); ncclResult_t ncclTopoCheckNet(struct ncclTopoSystem* system, int rank1, int rank2, int* net); int ncclPxnDisable(struct ncclComm* comm); diff --git a/src/include/profiler.h b/src/include/profiler.h index 8d41079..bae0501 100644 --- a/src/include/profiler.h +++ b/src/include/profiler.h @@ -68,6 +68,7 @@ ncclResult_t ncclProfilerRecordProxyCtrlEventState(void*eHandle, int appended, n // Profiler utility functions ncclResult_t ncclProfilerAddPidToProxyOp(struct ncclProxyOp* op); bool ncclProfilerNeedsProxy(struct ncclComm* comm, struct ncclProxyOp* op); +bool ncclProfilerPluginLoaded(void); // Profiler callback for network plugin ncclResult_t ncclProfilerCallback(void** eHandle, int type, void* pHandle, int64_t pluginId, void* extData); diff --git a/src/include/proxy.h b/src/include/proxy.h index 225acb2..f90c802 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -88,6 +88,12 @@ struct ncclProxyOp { struct ncclTaskP2p* p2p; } task; + // Profiler work counter increment flag. Set to 'true' if the profiler work counter for this channel needs increment. + // Always 'true' for collective operations. Grouped p2p operations are fused into one pair in the GPU kernel, + // meaning the GPU profiler code increments the work counter for the pair rather than the individual p2p. For this + // reason, the incWorkCounter flag is used to avoid incrementing the work counter twice in the host code. This is done + // by setting incWorkCounter to 'true' only for one of the p2ps in the pair during enqueue. + bool incWorkCounter; int eActivationMask; void* taskEventHandle; int rank; diff --git a/src/include/strongstream.h b/src/include/strongstream.h index c56d5ac..393a1f0 100644 --- a/src/include/strongstream.h +++ b/src/include/strongstream.h @@ -102,6 +102,9 @@ ncclResult_t ncclStreamWaitStream( cudaStream_t a, cudaStream_t b, cudaEvent_t scratchEvent ); +// Like cudaStreamWaitEvent except `e` must be strictly ahead of everything in `s`. +ncclResult_t ncclStreamAdvanceToEvent(struct ncclCudaGraph g, cudaStream_t s, cudaEvent_t e); + // Synchrnoization does not need the strong stream to be acquired. ncclResult_t ncclStrongStreamSynchronize(struct ncclStrongStream* ss); diff --git a/src/misc/cudawrap.cc b/src/misc/cudawrap.cc index e5fec1e..64a84f5 100644 --- a/src/misc/cudawrap.cc +++ b/src/misc/cudawrap.cc @@ -4,6 +4,7 @@ * See LICENSE.txt for license information ************************************************************************/ +#include "alloc.h" #include "nccl.h" #include "debug.h" #include "param.h" @@ -67,6 +68,36 @@ int ncclCuMemHostEnable() { ncclCumemHostEnable = paramValue; else ncclCumemHostEnable = (cudaDriverVersion >= 12060) ? 1 : 0; + if (ncclCumemHostEnable) { + // Verify that host allocations actually work. Docker in particular is known to disable "get_mempolicy", + // causing such allocations to fail (this can be fixed by invoking Docker with "--cap-add SYS_NICE"). + int cudaDev; + CUdevice currentDev; + int cpuNumaNodeId = -1; + CUmemAllocationProp prop = {}; + size_t granularity = 0; + size_t size; + CUmemGenericAllocationHandle handle; + CUDACHECK(cudaGetDevice(&cudaDev)); + CUCHECK(cuDeviceGet(¤tDev, cudaDev)); + CUCHECK(cuDeviceGetAttribute(&cpuNumaNodeId, CU_DEVICE_ATTRIBUTE_HOST_NUMA_ID, currentDev)); + if (cpuNumaNodeId < 0) cpuNumaNodeId = 0; + prop.location.type = CU_MEM_LOCATION_TYPE_HOST_NUMA; + prop.type = CU_MEM_ALLOCATION_TYPE_PINNED; + prop.requestedHandleTypes = ncclCuMemHandleType; + prop.location.id = cpuNumaNodeId; + CUCHECK(cuMemGetAllocationGranularity(&granularity, &prop, CU_MEM_ALLOC_GRANULARITY_MINIMUM)); + size = 1; + ALIGN_SIZE(size, granularity); + if (CUPFN(cuMemCreate(&handle, size, &prop, 0)) != CUDA_SUCCESS) { + INFO(NCCL_INIT, "cuMem host allocations do not appear to be working; falling back to a /dev/shm/ based " + "implementation. This could be due to the container runtime disabling NUMA support. " + "To disable this warning, set NCCL_CUMEM_HOST_ENABLE=0"); + ncclCumemHostEnable = 0; + } else { + CUCHECK(cuMemRelease(handle)); + } + } } return ncclCumemHostEnable; error: diff --git a/src/misc/strongstream.cc b/src/misc/strongstream.cc index e6cce98..7d957d4 100644 --- a/src/misc/strongstream.cc +++ b/src/misc/strongstream.cc @@ -328,6 +328,38 @@ ncclResult_t ncclStreamWaitStream(cudaStream_t a, cudaStream_t b, cudaEvent_t sc return ncclSuccess; } +ncclResult_t ncclStreamAdvanceToEvent(struct ncclCudaGraph g, cudaStream_t s, cudaEvent_t e) { + if (g.graphId == ULLONG_MAX) { + CUDACHECK(cudaStreamWaitEvent(s, e, 0)); + } else { + cudaStream_t tmp; + CUDACHECK(cudaStreamCreateWithFlags(&tmp, cudaStreamNonBlocking)); + CUDACHECK(cudaStreamWaitEvent(tmp, e, 0)); + + cudaStreamCaptureStatus status; + cudaGraphNode_t const* nodes; + size_t count = 0; + cudaError_t res = cudaStreamGetCaptureInfo_v2(tmp, &status, nullptr, nullptr, &nodes, &count); + + #if CUDART_VERSION >= 12030 + if (res == cudaErrorLossyQuery) { // CUDA is telling us the dependencies have edge annotations. + cudaGraphEdgeData const* edges; + CUDACHECK(cudaStreamGetCaptureInfo_v3(tmp, &status, nullptr, nullptr, &nodes, &edges, &count)); + CUDACHECK(cudaStreamUpdateCaptureDependencies_v2(s, (cudaGraphNode_t*)nodes, edges, count, cudaStreamSetCaptureDependencies)); + } + #else + if (false) {} + #endif + else { + CUDACHECK(res /* = cudaStreamGetCaptureInfo_v2(...)*/); + CUDACHECK(cudaStreamUpdateCaptureDependencies(s, (cudaGraphNode_t*)nodes, count, cudaStreamSetCaptureDependencies)); + } + + CUDACHECK(cudaStreamDestroy(tmp)); + } + return ncclSuccess; +} + ncclResult_t ncclStrongStreamSynchronize(struct ncclStrongStream* ss) { #if CUDART_VERSION >= 11030 CUDACHECK(cudaStreamWaitEvent(ss->liveStream, ss->serialEvent, 0)); diff --git a/src/plugin/profiler.cc b/src/plugin/profiler.cc index 023a704..18b9b5c 100644 --- a/src/plugin/profiler.cc +++ b/src/plugin/profiler.cc @@ -536,11 +536,15 @@ exit: } bool ncclProfilerNeedsProxy(struct ncclComm* comm, struct ncclProxyOp* op) { - bool enabled = (__builtin_expect(ncclProfiler != NULL, 0) && (op->eActivationMask & ncclProfileKernelCh)); + bool enabled = ncclProfilerPluginLoaded() && (op->eActivationMask & ncclProfileKernelCh); if (enabled && !comm->profiler.initialized) (void)proxyProfilerConnect(comm, op); return enabled; } +bool ncclProfilerPluginLoaded(void) { + return (__builtin_expect(ncclProfiler != NULL, 0)); +} + ncclResult_t ncclProfilerCallback(void** eHandle, int type, void* pHandle, int64_t pluginId, void* extData) { if (__builtin_expect(ncclProfiler != NULL, 0)) { struct ncclProxySubArgs* sub = (struct ncclProxySubArgs*)pHandle; diff --git a/src/proxy.cc b/src/proxy.cc index 7e8021e..c27d234 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -9,7 +9,6 @@ #include "collectives.h" #include "socket.h" #include "shmutils.h" -#include "profiler.h" #define ENABLE_TIMER 0 #include "timer.h" #include "profiler.h" @@ -533,15 +532,21 @@ static ncclResult_t ncclLocalOpAppend(struct ncclComm* comm, struct ncclProxyCon return ncclSuccess; } +static void incWorkCounter(struct ncclComm* comm, struct ncclProxyOp* op) { + op->workCounter = (op->incWorkCounter) ? ++comm->profiler.workCounter[op->channelId] : comm->profiler.workCounter[op->channelId]; +} + static ncclResult_t SaveProxyProfiler(struct ncclComm* comm, struct ncclProxyOp* op, bool* justInquire) { struct ncclProxyConnector* proxyConn = (op->coll == ncclFuncRecv) ? &comm->profiler.recvProxyConn[op->channelId] : &comm->profiler.sendProxyConn[op->channelId]; - if (justInquire) *justInquire = true; - else { + if (justInquire) { + *justInquire = true; + if (!comm->planner.persistent) incWorkCounter(comm, op); + } else { op->sendbuff = (uint8_t *)comm->profiler.workStarted; op->recvbuff = (uint8_t *)comm->profiler.workCompleted; - NCCLCHECK(ncclLocalOpAppend(comm, proxyConn, op)); // Ensure that in graph capturing the proxy workCounter is incremented to keep up with kernel workCounter - op->workCounter += comm->profiler.workCounter[op->channelId]; + if (comm->planner.persistent) incWorkCounter(comm, op); + NCCLCHECK(ncclLocalOpAppend(comm, proxyConn, op)); } return ncclSuccess; } @@ -696,9 +701,8 @@ ncclResult_t ncclProxySaveOp(struct ncclComm* comm, struct ncclProxyOp* op, bool NCCLCHECK(SaveProxy(comm, channel, op->pattern == ncclPatternSend ? proxySend : proxyRecv, op->root, op, 1, justInquire)); } break; case ncclPatternProfiler: { - if (ncclProfilerNeedsProxy(comm, op)) { - NCCLCHECK(SaveProxyProfiler(comm, op, justInquire)); - } + if (ncclProfilerNeedsProxy(comm, op)) NCCLCHECK(SaveProxyProfiler(comm, op, justInquire)); + else incWorkCounter(comm, op); } break; } return ncclSuccess; diff --git a/src/transport/coll_net.cc b/src/transport/coll_net.cc index c1ccfca..84e1f84 100644 --- a/src/transport/coll_net.cc +++ b/src/transport/coll_net.cc @@ -192,7 +192,7 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->rank, netId, 0, &req.useGdr)); recv->conn.flags |= req.useGdr ? NCCL_DIRECT_NIC : 0; // Determine whether we need to flush the GDR buffer on recv or not - if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm, req.netDev, myInfo->rank, &req.needFlush)); + if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm, netId, req.netDev, myInfo->rank, &req.needFlush)); recv->proxyConn.tpLocalRank = comm->topParentLocalRanks[comm->localRank]; NCCLCHECK(ncclProxyConnect(comm, TRANSPORT_COLLNET, 0, myInfo->rank, &recv->proxyConn)); diff --git a/src/transport/net.cc b/src/transport/net.cc index 40d334f..61b15ce 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -250,7 +250,7 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph if (!req.useGdr && connIndex == 0) comm->useGdr = 0; // Determine whether we need to flush the GDR buffer on recv or not - if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm, req.netDev, myInfo->rank, &req.needFlush)); + if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm, netId, req.netDev, myInfo->rank, &req.needFlush)); // We don't support PXN on receive yet NCCLCHECK(ncclProxyConnect(comm, TRANSPORT_NET, 0, myInfo->rank, &recv->proxyConn)); diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc index bfff6e5..c049531 100644 --- a/src/transport/net_ib.cc +++ b/src/transport/net_ib.cc @@ -1641,17 +1641,18 @@ ib_recv: // However, this has been confirmed to be intentional. // coverity[copy_paste_error] NCCLCHECKGOTO(wrap_ibv_set_ece(qp->qp, &remMeta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported), ret, fail); - - // Query the reduced ece for this QP (matching enhancements between the requestor and the responder) - // Store this in our own qpInfo for returning to the requestor - if (meta.qpInfo[q].ece_supported) - NCCLCHECKGOTO(wrap_ibv_query_ece(qp->qp, &meta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported), ret, fail); } else { meta.qpInfo[q].ece_supported = 0; } NCCLCHECKGOTO(ncclIbRtrQp(qp->qp, &rCommDev->base.gidInfo, remMeta.qpInfo[q].qpn, remDevInfo, true, remMeta.tc, remMeta.sl), ret, fail); NCCLCHECKGOTO(ncclIbRtsQp(qp->qp), ret, fail); + + // Query the reduced ece for this QP (matching enhancements between the requestor and the responder) + // Store this in our own qpInfo for returning to the requestor + if (remMeta.qpInfo[q].ece_supported && meta.qpInfo[q].ece_supported) { + NCCLCHECKGOTO(wrap_ibv_query_ece(qp->qp, &meta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported), ret, fail); + } } rComm->flushEnabled = ((ncclIbGdrSupport() == ncclSuccess || ncclIbDmaBufSupport(lComm->dev) == ncclSuccess)