diff --git a/makefiles/version.mk b/makefiles/version.mk index df3ee5c..93a71d4 100644 --- a/makefiles/version.mk +++ b/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 NCCL_MINOR := 26 -NCCL_PATCH := 2 +NCCL_PATCH := 3 NCCL_SUFFIX := PKG_REVISION := 1 diff --git a/src/debug.cc b/src/debug.cc index 2eb8d77..e2cc4f8 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -195,6 +195,10 @@ static void ncclDebugInit() { } } + // Replace underscore with spaces... it is hard to put spaces in command line parameters. + for (int i=0; ncclDebugTimestampFormat[i] != '\0'; ++i) { + if (ncclDebugTimestampFormat[i]=='_') ncclDebugTimestampFormat[i] = ' '; + } // Cache pid and hostname getHostName(hostname, 1024, '.'); @@ -301,7 +305,7 @@ void ncclDebugLog(ncclDebugLogLevel level, unsigned long flags, const char *file snprintf(localTimestampFormat + ncclDebugTimestampSubsecondsStart, ncclDebugTimestampSubsecondDigits+1, "%0*ld", ncclDebugTimestampSubsecondDigits, - ts.tv_nsec / (1000000UL/ncclDebugTimestampMaxSubseconds)); + ts.tv_nsec / (1000000000UL/ncclDebugTimestampMaxSubseconds)); strcpy( localTimestampFormat+ncclDebugTimestampSubsecondsStart+ncclDebugTimestampSubsecondDigits, ncclDebugTimestampFormat+ncclDebugTimestampSubsecondsStart+ncclDebugTimestampSubsecondDigits); } diff --git a/src/device/common.h b/src/device/common.h index 2dca70d..855db73 100644 --- a/src/device/common.h +++ b/src/device/common.h @@ -54,6 +54,7 @@ struct ncclShmemData { int workSize; uint32_t workConsumed; uint64_t workCounter; + bool profilerEnabled; struct ncclShmemGroup groups[NCCL_MAX_GROUPS]; uint64_t redOpArgs[NCCL_MAX_NVLS_ARITY+1]; @@ -291,6 +292,48 @@ struct RunWorkBatch { } }; +#define START 0 +#define STOP 1 +#define FINI 2 + +__device__ __forceinline__ bool profilerEnabled(void) { + // Check if any of the workItems in the batch is profiled. If so, there is an equivalent + // profiler ProxyOp waiting for the counter update in the host thread. If this check was + // done only for the first workItem the profiler counter for other workItems in the batch + // could never be updated, leaving the host thread spinning forever for the counter update + // and causing a hang. + bool enabled = false; + for (int i = 0; i < ncclShmem.nWorks && !enabled; i++) { + if (ncclShmem.workType == ncclDevWorkTypeP2p) + enabled = ((struct ncclDevWorkP2p*)ncclShmem.workStorage)[i].profilerEnabled; + else + enabled = ((struct ncclDevWorkColl*)ncclShmem.workStorage)[i].profilerEnabled; + } + return enabled; +} + +__device__ __forceinline__ void profiler(int action) { + if (action == START) { + if (threadIdx.x == 0) { + // increment workCounter regardless of the profiler being active or not + ncclShmem.channel.workCounter += ncclShmem.nWorks; + if(!profilerEnabled()) return; + ncclShmem.comm.workStarted[ncclShmem.channelId] = ncclShmem.channel.workCounter; + } + } else if (action == STOP) { + if (threadIdx.x == 0 && profilerEnabled()) { + ncclShmem.comm.workCompleted[ncclShmem.channelId] = ncclShmem.channel.workCounter; + } + } else { // FINI + if (threadIdx.x == 0) { + // store the workCounter back to vidmem regardless of the profiler being active or not + ((ncclDevCommAndChannels*)ncclShmem.args.comm)->channels[ncclShmem.channelId].workCounter = ncclShmem.channel.workCounter; + if (!profilerEnabled()) return; + ncclShmem.comm.workCompleted[ncclShmem.channelId] = ncclShmem.channel.workCounter; + } + } +} + template __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* args) { int tid = threadIdx.x; @@ -312,7 +355,10 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a } __syncthreads(); // publish ncclShmem.{args, channelId} /* set abort flag to 0 */ - if (tid == 0) ncclShmem.aborted = 0; + if (tid == 0) { + ncclShmem.aborted = 0; + ncclShmem.channel.workCounter = ((ncclDevCommAndChannels*)ncclShmem.args.comm)->channels[ncclShmem.channelId].workCounter; + } // Use first 2 warps to load comm and channel, and remaining load work batch. switch (tid/WARP_SIZE) { @@ -348,7 +394,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a } while (ncclShmem.aborted == 0) { - if (tid == 0) ncclShmem.comm.workStarted[ncclShmem.channelId] = (ncclShmem.channel.workCounter += ncclShmem.nWorks); + profiler(START); if (0 <= SpecializedFnId && ncclShmem.funcId == (unsigned)SpecializedFnId) { SpecializedRunWorkBatch().run(); } else { @@ -358,7 +404,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a if (ncclShmem.nextBatchIx == -1) break; int batchIx = ncclShmem.nextBatchIx; __syncthreads(); - if (tid == 0) ncclShmem.comm.workCompleted[ncclShmem.channelId] = ncclShmem.channel.workCounter; + profiler(STOP); loadWorkBatchToShmem(tid, tn, args, batchIx); __syncthreads(); @@ -367,10 +413,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a ncclShmem.comm.workConsumed[ncclShmem.channelId] = ncclShmem.workConsumed; } } - if (tid == 0) { - ncclShmem.comm.workCompleted[ncclShmem.channelId] = ncclShmem.channel.workCounter; - ((ncclDevCommAndChannels*)ncclShmem.args.comm)->channels[ncclShmem.channelId].workCounter = ncclShmem.channel.workCounter; - } + profiler(FINI); } __global__ void ncclDevKernel_Generic(ncclDevKernelArgs4K NCCL_GRID_CONSTANT const args4K); diff --git a/src/enqueue.cc b/src/enqueue.cc index 5e0b213..4e8a211 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -288,6 +288,7 @@ ncclResult_t ncclTasksRegAndEnqueue(struct ncclComm* comm) { devWork.oneNode = (comm->nNodes == 1); devWork.isOneRPN = comm->isOneRPN; devWork.netRegUsed = devWork.regUsed = 0; + devWork.profilerEnabled = ncclProfilerPluginLoaded() && (task->eActivationMask & ncclProfileKernelCh); if (task->regBufType & NCCL_NET_REG_BUFFER) devWork.netRegUsed = 1; if (task->regBufType & (NCCL_IPC_REG_BUFFER | NCCL_NVLS_REG_BUFFER)) @@ -445,6 +446,7 @@ ncclResult_t ncclPrepareTasks(struct ncclComm* comm, bool* algoNeedConnect, bool devWork.redOpArgIsPtr = task->opDev.scalarArgIsPtr; devWork.oneNode = (comm->nNodes == 1); devWork.netRegUsed = devWork.regUsed = 0; + devWork.profilerEnabled = ncclProfilerPluginLoaded() && (task->eActivationMask & ncclProfileKernelCh); if (task->regBufType & NCCL_NET_REG_BUFFER) devWork.netRegUsed = 1; if (task->regBufType & (NCCL_IPC_REG_BUFFER | NCCL_NVLS_REG_BUFFER)) @@ -557,7 +559,7 @@ static ncclResult_t scheduleCollTasksToPlan( proxyOp.task.coll = task; proxyOp.rank = comm->rank; proxyOp.eActivationMask = task->eActivationMask; - proxyOp.workCounter = ++comm->profiler.workCounter[c]; + proxyOp.incWorkCounter = true; addWorkBatchToPlan(comm, plan, c, workNode->workType, task->devFuncId, plan->workBytes); // Set pattern to profiler to add a proxy profiler for kernel events NCCLCHECK(addProxyOpIfNeeded(comm, plan, &proxyOp)); @@ -681,7 +683,7 @@ static ncclResult_t scheduleCollTasksToPlan( proxyOp->ringAlgo->incRefCount(); } proxyOp->eActivationMask = task->eActivationMask; - proxyOp->workCounter = ++comm->profiler.workCounter[c]; + proxyOp->incWorkCounter = true; addWorkBatchToPlan(comm, plan, c, workNode->workType, task->devFuncId, plan->workBytes); // Coverity reports "proxyOp->connection" as being possibly uninitialized. It's hard to // determine if that's actually true but it's also not clear if that would be an issue. @@ -886,6 +888,7 @@ static ncclResult_t addP2pToPlan( work->recvRank = recvRank; work->recvAddr = recvAddr; work->recvBytes = recvBytes==-1 ? 0 : recvBytes; + work->profilerEnabled = ncclProfilerPluginLoaded() && ((p2pTasks[0] ? p2pTasks[0] : p2pTasks[1])->eActivationMask & ncclProfileKernelCh); struct ncclProxyOp proxyOps[2] = {}; int nProxyOps = selfSend ? 0 : 2; @@ -910,6 +913,7 @@ static ncclResult_t addP2pToPlan( nChannelsMax = std::max(nChannels[0], nChannels[1]); for (int part=0; part < nChannelsMax; part++) { + int incWorkCounter = -1; int channelId = ncclP2pChannelForPart(comm->p2pnChannels, base, part); plan->channelMask |= uint64_t(1)< pair rather than individual p2p + if (proxyOps[dir].nsteps && incWorkCounter < 0) { + proxyOps[dir].incWorkCounter = true; + incWorkCounter = dir; + } + if (proxyOps[dir].nsteps != 0) { // Calculate the opCount after adding batch since then the batch count will // equal one plus the batch index this p2p settled in. proxyOps[dir].channelId = channelId; proxyOps[dir].opCount = uint64_t(comm->planner.wipPlan.channels[channelId].nWorkBatchesP2p)<<1 | 1; - proxyOps[dir].workCounter = comm->profiler.workCounter[channelId]+1; NCCLCHECK(addProxyOpIfNeeded(comm, plan, &proxyOps[dir])); NCCLCHECK(addProfilerProxyOpIfNeeded(comm, plan, &proxyOps[dir])); } } - comm->profiler.workCounter[channelId] += (proxyOps[0].nsteps || proxyOps[1].nsteps) ? 1 : 0; } return ncclSuccess; @@ -1592,7 +1600,16 @@ ncclResult_t ncclLaunchFinish(struct ncclComm* comm) { CUDACHECK(cudaEventRecord(comm->sharedRes->scratchEvent, launchStream)); // deviceStream waits on userStream[0] NCCLCHECK(ncclStrongStreamAcquiredWorkStream(planner->capturingGraph, &comm->sharedRes->deviceStream, /*concurrent=*/false, &deviceStream)); - CUDACHECK(cudaStreamWaitEvent(deviceStream, comm->sharedRes->scratchEvent, 0)); + + // We know that deviceStream is strictly behind the launchStream because launchStream + // synced with it before kernel launch. This allows us to to see deviceStream waiting + // on launchStream as a fast-forward. When building CUDA graphs fast forwards should + // be handled specially so as not to create graphs with a blowup in the number of edges. + // So we could do this: + // CUDACHECK(cudaStreamWaitEvent(deviceStream, comm->sharedRes->scratchEvent, 0)); + // But instead we do: + NCCLCHECK(ncclStreamAdvanceToEvent(planner->capturingGraph, deviceStream, comm->sharedRes->scratchEvent)); + // Each userStream[i] waits on userStream[0] for (struct ncclCudaStreamList* l=planner->streams->next; l != nullptr; l = l->next) { CUDACHECK(cudaStreamWaitEvent(l->stream, comm->sharedRes->scratchEvent, 0)); diff --git a/src/graph/paths.cc b/src/graph/paths.cc index ace4476..9983712 100644 --- a/src/graph/paths.cc +++ b/src/graph/paths.cc @@ -367,7 +367,7 @@ ncclResult_t ncclTopoCheckMNNVL(struct ncclTopoSystem* system, struct ncclPeerIn if ((((long *)&fabricInfo2->clusterUuid)[0]|((long *)fabricInfo2->clusterUuid)[1]) == 0) return ncclSuccess; if ((memcmp(fabricInfo1->clusterUuid, fabricInfo2->clusterUuid, NVML_GPU_FABRIC_UUID_LEN) == 0) && (fabricInfo1->cliqueId == fabricInfo2->cliqueId)) { - INFO(NCCL_NET, "MNNVL matching peer 0x%lx UUID %lx.%lx cliqueId 0x%x", + TRACE(NCCL_NET, "MNNVL matching peer 0x%lx UUID %lx.%lx cliqueId 0x%x", info2->busId, ((long *)fabricInfo2->clusterUuid)[0], ((long *)fabricInfo2->clusterUuid)[1], fabricInfo2->cliqueId); *ret = 1; } @@ -473,7 +473,7 @@ ncclResult_t ncclTopoIsGdrAvail(struct ncclTopoSystem* system, int rank, bool *a NCCL_PARAM(NetForceFlush, "NET_FORCE_FLUSH", 0); // Determine whether we need to flush the GDR recv buffers -ncclResult_t ncclTopoNeedFlush(struct ncclComm* comm, int netDev, int rank, int* flush) { +ncclResult_t ncclTopoNeedFlush(struct ncclComm* comm, int64_t netId, int netDev, int rank, int* flush) { *flush = 1; ncclNetProperties_t props; NCCLCHECK(comm->ncclNet->getProperties(netDev, &props)); @@ -488,7 +488,7 @@ ncclResult_t ncclTopoNeedFlush(struct ncclComm* comm, int netDev, int rank, int* // flags would go through C2C. In that case, force a flush. int c, n; NCCLCHECK(ncclGetLocalCpu(system, g, &c)); - NCCLCHECK(ncclTopoIdToIndex(system, NET, netDev, &n)); + NCCLCHECK(ncclTopoIdToIndex(system, NET, netId, &n)); if (gpu->paths[NET][n].type <= PATH_PXB && gpu->paths[CPU][c].type == PATH_C2C) { *flush = 1; } diff --git a/src/include/device.h b/src/include/device.h index 0763a57..f6ca51b 100644 --- a/src/include/device.h +++ b/src/include/device.h @@ -221,6 +221,7 @@ struct alignas(16) ncclDevWorkP2p { uint8_t sendProtoLL:1, recvProtoLL:1; uint8_t sendNetReg:1, recvNetReg:1; uint8_t sendIpcReg:1, recvIpcReg:1; + uint8_t profilerEnabled:1; }; // Compute the subset of the data transfer corresponding to the given part index. @@ -259,6 +260,7 @@ struct alignas(16) ncclDevWorkColl { uint32_t channelLo:8, channelHi:8; uint32_t nWarps:8; uint32_t redOpArgIsPtr:1, regUsed:1, netRegUsed:1, oneNode:1, direct:2, isOneRPN:1; + uint32_t profilerEnabled:1; uint32_t root; void* recvbuff; void* sendbuff; diff --git a/src/include/graph.h b/src/include/graph.h index b779773..a06556e 100644 --- a/src/include/graph.h +++ b/src/include/graph.h @@ -43,7 +43,7 @@ enum ncclTopoGdrMode { ncclTopoGdrModeNum = 3 }; ncclResult_t ncclTopoCheckGdr(struct ncclTopoSystem* topo, int rank, int64_t netId, int read, enum ncclTopoGdrMode* gdrMode); -ncclResult_t ncclTopoNeedFlush(struct ncclComm* comm, int netDev, int rank, int* flush); +ncclResult_t ncclTopoNeedFlush(struct ncclComm* comm, int64_t netId, int netDev, int rank, int* flush); ncclResult_t ncclTopoIsGdrAvail(struct ncclTopoSystem* system, int rank, bool *avail); ncclResult_t ncclTopoCheckNet(struct ncclTopoSystem* system, int rank1, int rank2, int* net); int ncclPxnDisable(struct ncclComm* comm); diff --git a/src/include/profiler.h b/src/include/profiler.h index 8d41079..bae0501 100644 --- a/src/include/profiler.h +++ b/src/include/profiler.h @@ -68,6 +68,7 @@ ncclResult_t ncclProfilerRecordProxyCtrlEventState(void*eHandle, int appended, n // Profiler utility functions ncclResult_t ncclProfilerAddPidToProxyOp(struct ncclProxyOp* op); bool ncclProfilerNeedsProxy(struct ncclComm* comm, struct ncclProxyOp* op); +bool ncclProfilerPluginLoaded(void); // Profiler callback for network plugin ncclResult_t ncclProfilerCallback(void** eHandle, int type, void* pHandle, int64_t pluginId, void* extData); diff --git a/src/include/proxy.h b/src/include/proxy.h index 225acb2..f90c802 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -88,6 +88,12 @@ struct ncclProxyOp { struct ncclTaskP2p* p2p; } task; + // Profiler work counter increment flag. Set to 'true' if the profiler work counter for this channel needs increment. + // Always 'true' for collective operations. Grouped p2p operations are fused into one pair in the GPU kernel, + // meaning the GPU profiler code increments the work counter for the pair rather than the individual p2p. For this + // reason, the incWorkCounter flag is used to avoid incrementing the work counter twice in the host code. This is done + // by setting incWorkCounter to 'true' only for one of the p2ps in the pair during enqueue. + bool incWorkCounter; int eActivationMask; void* taskEventHandle; int rank; diff --git a/src/include/strongstream.h b/src/include/strongstream.h index c56d5ac..393a1f0 100644 --- a/src/include/strongstream.h +++ b/src/include/strongstream.h @@ -102,6 +102,9 @@ ncclResult_t ncclStreamWaitStream( cudaStream_t a, cudaStream_t b, cudaEvent_t scratchEvent ); +// Like cudaStreamWaitEvent except `e` must be strictly ahead of everything in `s`. +ncclResult_t ncclStreamAdvanceToEvent(struct ncclCudaGraph g, cudaStream_t s, cudaEvent_t e); + // Synchrnoization does not need the strong stream to be acquired. ncclResult_t ncclStrongStreamSynchronize(struct ncclStrongStream* ss); diff --git a/src/misc/cudawrap.cc b/src/misc/cudawrap.cc index e5fec1e..64a84f5 100644 --- a/src/misc/cudawrap.cc +++ b/src/misc/cudawrap.cc @@ -4,6 +4,7 @@ * See LICENSE.txt for license information ************************************************************************/ +#include "alloc.h" #include "nccl.h" #include "debug.h" #include "param.h" @@ -67,6 +68,36 @@ int ncclCuMemHostEnable() { ncclCumemHostEnable = paramValue; else ncclCumemHostEnable = (cudaDriverVersion >= 12060) ? 1 : 0; + if (ncclCumemHostEnable) { + // Verify that host allocations actually work. Docker in particular is known to disable "get_mempolicy", + // causing such allocations to fail (this can be fixed by invoking Docker with "--cap-add SYS_NICE"). + int cudaDev; + CUdevice currentDev; + int cpuNumaNodeId = -1; + CUmemAllocationProp prop = {}; + size_t granularity = 0; + size_t size; + CUmemGenericAllocationHandle handle; + CUDACHECK(cudaGetDevice(&cudaDev)); + CUCHECK(cuDeviceGet(¤tDev, cudaDev)); + CUCHECK(cuDeviceGetAttribute(&cpuNumaNodeId, CU_DEVICE_ATTRIBUTE_HOST_NUMA_ID, currentDev)); + if (cpuNumaNodeId < 0) cpuNumaNodeId = 0; + prop.location.type = CU_MEM_LOCATION_TYPE_HOST_NUMA; + prop.type = CU_MEM_ALLOCATION_TYPE_PINNED; + prop.requestedHandleTypes = ncclCuMemHandleType; + prop.location.id = cpuNumaNodeId; + CUCHECK(cuMemGetAllocationGranularity(&granularity, &prop, CU_MEM_ALLOC_GRANULARITY_MINIMUM)); + size = 1; + ALIGN_SIZE(size, granularity); + if (CUPFN(cuMemCreate(&handle, size, &prop, 0)) != CUDA_SUCCESS) { + INFO(NCCL_INIT, "cuMem host allocations do not appear to be working; falling back to a /dev/shm/ based " + "implementation. This could be due to the container runtime disabling NUMA support. " + "To disable this warning, set NCCL_CUMEM_HOST_ENABLE=0"); + ncclCumemHostEnable = 0; + } else { + CUCHECK(cuMemRelease(handle)); + } + } } return ncclCumemHostEnable; error: diff --git a/src/misc/strongstream.cc b/src/misc/strongstream.cc index e6cce98..7d957d4 100644 --- a/src/misc/strongstream.cc +++ b/src/misc/strongstream.cc @@ -328,6 +328,38 @@ ncclResult_t ncclStreamWaitStream(cudaStream_t a, cudaStream_t b, cudaEvent_t sc return ncclSuccess; } +ncclResult_t ncclStreamAdvanceToEvent(struct ncclCudaGraph g, cudaStream_t s, cudaEvent_t e) { + if (g.graphId == ULLONG_MAX) { + CUDACHECK(cudaStreamWaitEvent(s, e, 0)); + } else { + cudaStream_t tmp; + CUDACHECK(cudaStreamCreateWithFlags(&tmp, cudaStreamNonBlocking)); + CUDACHECK(cudaStreamWaitEvent(tmp, e, 0)); + + cudaStreamCaptureStatus status; + cudaGraphNode_t const* nodes; + size_t count = 0; + cudaError_t res = cudaStreamGetCaptureInfo_v2(tmp, &status, nullptr, nullptr, &nodes, &count); + + #if CUDART_VERSION >= 12030 + if (res == cudaErrorLossyQuery) { // CUDA is telling us the dependencies have edge annotations. + cudaGraphEdgeData const* edges; + CUDACHECK(cudaStreamGetCaptureInfo_v3(tmp, &status, nullptr, nullptr, &nodes, &edges, &count)); + CUDACHECK(cudaStreamUpdateCaptureDependencies_v2(s, (cudaGraphNode_t*)nodes, edges, count, cudaStreamSetCaptureDependencies)); + } + #else + if (false) {} + #endif + else { + CUDACHECK(res /* = cudaStreamGetCaptureInfo_v2(...)*/); + CUDACHECK(cudaStreamUpdateCaptureDependencies(s, (cudaGraphNode_t*)nodes, count, cudaStreamSetCaptureDependencies)); + } + + CUDACHECK(cudaStreamDestroy(tmp)); + } + return ncclSuccess; +} + ncclResult_t ncclStrongStreamSynchronize(struct ncclStrongStream* ss) { #if CUDART_VERSION >= 11030 CUDACHECK(cudaStreamWaitEvent(ss->liveStream, ss->serialEvent, 0)); diff --git a/src/plugin/profiler.cc b/src/plugin/profiler.cc index 023a704..18b9b5c 100644 --- a/src/plugin/profiler.cc +++ b/src/plugin/profiler.cc @@ -536,11 +536,15 @@ exit: } bool ncclProfilerNeedsProxy(struct ncclComm* comm, struct ncclProxyOp* op) { - bool enabled = (__builtin_expect(ncclProfiler != NULL, 0) && (op->eActivationMask & ncclProfileKernelCh)); + bool enabled = ncclProfilerPluginLoaded() && (op->eActivationMask & ncclProfileKernelCh); if (enabled && !comm->profiler.initialized) (void)proxyProfilerConnect(comm, op); return enabled; } +bool ncclProfilerPluginLoaded(void) { + return (__builtin_expect(ncclProfiler != NULL, 0)); +} + ncclResult_t ncclProfilerCallback(void** eHandle, int type, void* pHandle, int64_t pluginId, void* extData) { if (__builtin_expect(ncclProfiler != NULL, 0)) { struct ncclProxySubArgs* sub = (struct ncclProxySubArgs*)pHandle; diff --git a/src/proxy.cc b/src/proxy.cc index 7e8021e..c27d234 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -9,7 +9,6 @@ #include "collectives.h" #include "socket.h" #include "shmutils.h" -#include "profiler.h" #define ENABLE_TIMER 0 #include "timer.h" #include "profiler.h" @@ -533,15 +532,21 @@ static ncclResult_t ncclLocalOpAppend(struct ncclComm* comm, struct ncclProxyCon return ncclSuccess; } +static void incWorkCounter(struct ncclComm* comm, struct ncclProxyOp* op) { + op->workCounter = (op->incWorkCounter) ? ++comm->profiler.workCounter[op->channelId] : comm->profiler.workCounter[op->channelId]; +} + static ncclResult_t SaveProxyProfiler(struct ncclComm* comm, struct ncclProxyOp* op, bool* justInquire) { struct ncclProxyConnector* proxyConn = (op->coll == ncclFuncRecv) ? &comm->profiler.recvProxyConn[op->channelId] : &comm->profiler.sendProxyConn[op->channelId]; - if (justInquire) *justInquire = true; - else { + if (justInquire) { + *justInquire = true; + if (!comm->planner.persistent) incWorkCounter(comm, op); + } else { op->sendbuff = (uint8_t *)comm->profiler.workStarted; op->recvbuff = (uint8_t *)comm->profiler.workCompleted; - NCCLCHECK(ncclLocalOpAppend(comm, proxyConn, op)); // Ensure that in graph capturing the proxy workCounter is incremented to keep up with kernel workCounter - op->workCounter += comm->profiler.workCounter[op->channelId]; + if (comm->planner.persistent) incWorkCounter(comm, op); + NCCLCHECK(ncclLocalOpAppend(comm, proxyConn, op)); } return ncclSuccess; } @@ -696,9 +701,8 @@ ncclResult_t ncclProxySaveOp(struct ncclComm* comm, struct ncclProxyOp* op, bool NCCLCHECK(SaveProxy(comm, channel, op->pattern == ncclPatternSend ? proxySend : proxyRecv, op->root, op, 1, justInquire)); } break; case ncclPatternProfiler: { - if (ncclProfilerNeedsProxy(comm, op)) { - NCCLCHECK(SaveProxyProfiler(comm, op, justInquire)); - } + if (ncclProfilerNeedsProxy(comm, op)) NCCLCHECK(SaveProxyProfiler(comm, op, justInquire)); + else incWorkCounter(comm, op); } break; } return ncclSuccess; diff --git a/src/transport/coll_net.cc b/src/transport/coll_net.cc index c1ccfca..84e1f84 100644 --- a/src/transport/coll_net.cc +++ b/src/transport/coll_net.cc @@ -192,7 +192,7 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->rank, netId, 0, &req.useGdr)); recv->conn.flags |= req.useGdr ? NCCL_DIRECT_NIC : 0; // Determine whether we need to flush the GDR buffer on recv or not - if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm, req.netDev, myInfo->rank, &req.needFlush)); + if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm, netId, req.netDev, myInfo->rank, &req.needFlush)); recv->proxyConn.tpLocalRank = comm->topParentLocalRanks[comm->localRank]; NCCLCHECK(ncclProxyConnect(comm, TRANSPORT_COLLNET, 0, myInfo->rank, &recv->proxyConn)); diff --git a/src/transport/net.cc b/src/transport/net.cc index 40d334f..61b15ce 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -250,7 +250,7 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph if (!req.useGdr && connIndex == 0) comm->useGdr = 0; // Determine whether we need to flush the GDR buffer on recv or not - if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm, req.netDev, myInfo->rank, &req.needFlush)); + if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm, netId, req.netDev, myInfo->rank, &req.needFlush)); // We don't support PXN on receive yet NCCLCHECK(ncclProxyConnect(comm, TRANSPORT_NET, 0, myInfo->rank, &recv->proxyConn)); diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc index bfff6e5..c049531 100644 --- a/src/transport/net_ib.cc +++ b/src/transport/net_ib.cc @@ -1641,17 +1641,18 @@ ib_recv: // However, this has been confirmed to be intentional. // coverity[copy_paste_error] NCCLCHECKGOTO(wrap_ibv_set_ece(qp->qp, &remMeta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported), ret, fail); - - // Query the reduced ece for this QP (matching enhancements between the requestor and the responder) - // Store this in our own qpInfo for returning to the requestor - if (meta.qpInfo[q].ece_supported) - NCCLCHECKGOTO(wrap_ibv_query_ece(qp->qp, &meta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported), ret, fail); } else { meta.qpInfo[q].ece_supported = 0; } NCCLCHECKGOTO(ncclIbRtrQp(qp->qp, &rCommDev->base.gidInfo, remMeta.qpInfo[q].qpn, remDevInfo, true, remMeta.tc, remMeta.sl), ret, fail); NCCLCHECKGOTO(ncclIbRtsQp(qp->qp), ret, fail); + + // Query the reduced ece for this QP (matching enhancements between the requestor and the responder) + // Store this in our own qpInfo for returning to the requestor + if (remMeta.qpInfo[q].ece_supported && meta.qpInfo[q].ece_supported) { + NCCLCHECKGOTO(wrap_ibv_query_ece(qp->qp, &meta.qpInfo[q].ece, &meta.qpInfo[q].ece_supported), ret, fail); + } } rComm->flushEnabled = ((ncclIbGdrSupport() == ncclSuccess || ncclIbDmaBufSupport(lComm->dev) == ncclSuccess)