diff --git a/makefiles/common.mk b/makefiles/common.mk index 1a1c2b6..0c0d04a 100644 --- a/makefiles/common.mk +++ b/makefiles/common.mk @@ -31,13 +31,17 @@ CUDA8_GENCODE = -gencode=arch=compute_35,code=sm_35 \ -gencode=arch=compute_61,code=sm_61 CUDA9_GENCODE = -gencode=arch=compute_70,code=sm_70 CUDA11_GENCODE = -gencode=arch=compute_80,code=sm_80 +CUDA11_8_GENCODE = -gencode=arch=compute_90,code=sm_90 CUDA8_PTX = -gencode=arch=compute_61,code=compute_61 CUDA9_PTX = -gencode=arch=compute_70,code=compute_70 CUDA11_PTX = -gencode=arch=compute_80,code=compute_80 +CUDA11_8_PTX = -gencode=arch=compute_90,code=compute_90 -# Include Ampere support if we're using CUDA11 or above -ifeq ($(shell test "0$(CUDA_MAJOR)" -ge 11; echo $$?),0) +ifeq ($(shell test "0$(CUDA_MAJOR)" -eq 11 -a "0$(CUDA_MINOR)" -ge 8 -o "0$(CUDA_MAJOR)" -gt 11; echo $$?),0) +# Include Hopper support if we're using CUDA11.8 or above + NVCC_GENCODE ?= $(CUDA8_GENCODE) $(CUDA9_GENCODE) $(CUDA11_GENCODE) $(CUDA11_8_GENCODE) $(CUDA11_8_PTX) +else ifeq ($(shell test "0$(CUDA_MAJOR)" -ge 11; echo $$?),0) NVCC_GENCODE ?= $(CUDA8_GENCODE) $(CUDA9_GENCODE) $(CUDA11_GENCODE) $(CUDA11_PTX) # Include Volta support if we're using CUDA9 or above else ifeq ($(shell test "0$(CUDA_MAJOR)" -ge 9; echo $$?),0) @@ -45,7 +49,7 @@ else ifeq ($(shell test "0$(CUDA_MAJOR)" -ge 9; echo $$?),0) else NVCC_GENCODE ?= $(CUDA8_GENCODE) $(CUDA8_PTX) endif -#$(info NVCC_GENCODE is ${NVCC_GENCODE}) +$(info NVCC_GENCODE is ${NVCC_GENCODE}) CXXFLAGS := -DCUDA_MAJOR=$(CUDA_MAJOR) -DCUDA_MINOR=$(CUDA_MINOR) -fPIC -fvisibility=hidden \ -Wall -Wno-unused-function -Wno-sign-compare -std=c++11 -Wvla \ diff --git a/makefiles/version.mk b/makefiles/version.mk index 55fa6cc..977d763 100644 --- a/makefiles/version.mk +++ b/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 -NCCL_MINOR := 14 -NCCL_PATCH := 3 +NCCL_MINOR := 15 +NCCL_PATCH := 1 NCCL_SUFFIX := PKG_REVISION := 1 diff --git a/src/channel.cc b/src/channel.cc index 9587008..c1254f1 100644 --- a/src/channel.cc +++ b/src/channel.cc @@ -27,7 +27,7 @@ ncclResult_t initChannel(struct ncclComm* comm, int channelId) { NCCLCHECK(ncclCudaCallocAsync(&channel->devRingUserRanks, nRanks, comm->deviceStream.stream)); ncclCommPushCudaFree(comm, channel->devRingUserRanks); - NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNull(), &comm->deviceStream)); + NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &comm->deviceStream)); for (int r=0; r < nRanks+1; ++r) { for (int b=0; b < NCCL_MAX_CONNS; b++) { diff --git a/src/enqueue.cc b/src/enqueue.cc index 7c6d835..0db55bf 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -940,14 +940,33 @@ ncclResult_t ncclLaunchPrepare(struct ncclComm* comm) { struct ncclKernelPlan* planHead = ncclIntruQueueHead(&comm->planQueue); comm->unlaunchedPlansHead = planHead; + // Semantically we want these dependencies for the kernels launched: + // 1. Launch host task on hostStream. + // 2. Launch kernel, depends on all of {deviceStream, hostStream, userStream[i]...} + // 3. {deviceStream, userStream[i]...} depend on kernel. + // We achieve this by: + // 1. userStream[0] waits on deviceStream + // 2. deviceStream waits on each of userStream[1...] + // 3. host task launch on hostStream + // 4. userStream[0] waits on hostStream + // 5. kernel launch on userStream[0] + // 6. deviceStream waits on userStream[0] + // 7. userStream[1...] each waits on deviceStream + // The two-level fan-in fan-out is because ncclStrongStreamWaitStream() requires + // at least one of the two streams to be strong-stream. + cudaStream_t launchStream = tasks->streams->stream; NCCLCHECKGOTO(ncclStrongStreamAcquire(tasks->capturingGraph, &comm->deviceStream), result, failure); - // Create dependency for nccl device work on user streams. - for (struct ncclCudaStreamList* l=tasks->streams; l != nullptr; l = l->next) { + // Create dependency for device stream on user streams. First from extra user + // streams to deviceStream. Then deviceStream to first user stream. + for (struct ncclCudaStreamList* l=tasks->streams->next; l != nullptr; l = l->next) { NCCLCHECKGOTO(ncclStrongStreamWaitStream(tasks->capturingGraph, &comm->deviceStream, l->stream), result, failure); } + NCCLCHECKGOTO(ncclStrongStreamWaitStream(tasks->capturingGraph, launchStream, &comm->deviceStream), result, failure); if (persistent || comm->persistentRefs != 0) { + // We have to launch host tasks to push proxy args. We are careful to only + // do this if necessary since host tasks impose a high performance cost in CUDA. bool acquired = false; for (struct ncclKernelPlan* plan=planHead; plan != nullptr; plan = plan->next) { if (plan->hasProxyOps) { @@ -959,6 +978,8 @@ ncclResult_t ncclLaunchPrepare(struct ncclComm* comm) { } } if (acquired) { + // Make to-be-launched kernels dependent on just-launched host stream tasks. + NCCLCHECKGOTO(ncclStrongStreamWaitStream(tasks->capturingGraph, launchStream, &comm->hostStream), result, failure); NCCLCHECKGOTO(ncclStrongStreamRelease(tasks->capturingGraph, &comm->hostStream), result, failure); } } @@ -984,14 +1005,67 @@ ncclResult_t ncclLaunchKernelBefore_NoUncapturedCuda(struct ncclComm* comm, stru return ncclSuccess; } +#if CUDART_VERSION >= 11080 +#define NCCL_MAX_CGA_CLUSTER_SIZE 8 +NCCL_PARAM(CGAClusterSize, "CGA_CLUSTER_SIZE", 0); +#endif + ncclResult_t ncclLaunchKernel(struct ncclComm* comm, struct ncclKernelPlan* plan) { struct ncclTasks* tasks = &comm->tasks; + void *fn = plan->kernelFn; + cudaStream_t launchStream = tasks->streams->stream; dim3 grid = {(unsigned)plan->channelCount, 1, 1}; dim3 block = {(unsigned)plan->threadPerBlock, 1, 1}; void *args[3] = {&comm->devComm, &plan->channelMask, &plan->workHead}; - NCCLCHECK(ncclStrongStreamLaunchKernel( - tasks->capturingGraph, &comm->deviceStream, plan->kernelFn, grid, block, args, 0 - )); + + #if CUDART_VERSION >= 11080 + int driverVersion; + NCCLCHECK(ncclCudaDriverVersion(&driverVersion)); + + unsigned int clusterSize = 0; + clusterSize = ncclParamCGAClusterSize(); + if (clusterSize > NCCL_MAX_CGA_CLUSTER_SIZE) { + static bool warned = false; + if (warned == false) { + WARN("NCCL_CGA_CLUSTER_SIZE value %d is too big. Limiting value to %d.", + clusterSize, NCCL_MAX_CGA_CLUSTER_SIZE); + warned = true; + } + clusterSize = NCCL_MAX_CGA_CLUSTER_SIZE; + } + + if (clusterSize && driverVersion >= 11080) { + cudaLaunchConfig_t launchConfig = {0}; + cudaLaunchAttribute launchAttrs[2]; + /* Cooperative Group Array (CGA) + * On sm90 and later we have an extra level of hierarchy where we + * can group together several blocks within the Grid, called + * Thread Block Clusters. + * Clusters enable multiple thread blocks running concurrently + * across multiple SMs to synchronize and collaboratively fetch + * and exchange data. A cluster of blocks are guaranteed to be + * concurrently scheduled onto a group of SMs. + * The maximum value is 8 and it must be divisible into the grid dimensions + */ + // Grid dimension must be divisible by clusterSize + if (grid.x % clusterSize) clusterSize = 1; + launchAttrs[0].id = cudaLaunchAttributeClusterDimension; + launchAttrs[0].val.clusterDim = {clusterSize, 1, 1}; + launchAttrs[1].id = cudaLaunchAttributeClusterSchedulingPolicyPreference; + launchAttrs[1].val.clusterSchedulingPolicyPreference = cudaClusterSchedulingPolicySpread; + + launchConfig.gridDim = grid; + launchConfig.blockDim = block; + launchConfig.attrs = launchAttrs; + launchConfig.numAttrs = sizeof(launchAttrs)/sizeof(launchAttrs[0]); + launchConfig.stream = launchStream; + + CUDACHECK(cudaLaunchKernelExC(&launchConfig, fn, args)); + return ncclSuccess; + } + #endif + // Standard kernel launch + CUDACHECK(cudaLaunchKernel(fn, grid, block, args, 0, launchStream)); return ncclSuccess; } @@ -1017,17 +1091,21 @@ ncclResult_t ncclLaunchFinish(struct ncclComm* comm) { // Reset queue to empty without destroying plans since those will be sent // back to us for reclaiming via callbackQueue. ncclIntruQueueConstruct(&comm->planQueue); - // Close strong stream "transaction" encompassing cuda launches - NCCLCHECKGOTO(ncclStrongStreamRelease(tasks->capturingGraph, &comm->deviceStream), result, resume1); + cudaStream_t launchStream = tasks->streams->stream; // First user stream gets launch + // Create dependency for deviceStream on launchStream. + NCCLCHECKGOTO(ncclStrongStreamWaitStream(tasks->capturingGraph, &comm->deviceStream, launchStream), result, resume1); resume1: - // Create dependency for user streams on nccl device work. - struct ncclCudaStreamList* sl = tasks->streams; - tasks->streams = nullptr; // reset streams to empty + // Create dependency for other user streams (skip launch stream). + struct ncclCudaStreamList* sl = tasks->streams->next; + tasks->streams = nullptr; // Reset comm->tasks.streams to empty. while (sl != nullptr) { NCCLCHECKGOTO(ncclStrongStreamWaitStream(tasks->capturingGraph, sl->stream, &comm->deviceStream), result, resume2); resume2: sl = sl->next; } + // Release device stream as acquired in ncclLaunchPrepare() + NCCLCHECKGOTO(ncclStrongStreamRelease(tasks->capturingGraph, &comm->deviceStream), result, resume3); + resume3:; } return result; } @@ -1364,12 +1442,12 @@ static ncclResult_t taskAppend(struct ncclComm* comm, struct ncclInfo const* inf NCCLCHECK(ncclChannelComputeFromBase(comm, channelBaseId, c, &channelId)); if (isSendNotRecv) { if (comm->channels[channelId].peers[peer].send[1].connected == 0) { // P2P uses only 1 connector - comm->connectSend[peer] |= (1<connectSend[peer] |= (1UL<channels[channelId].peers[peer].recv[1].connected == 0) { // P2P uses only 1 connector - comm->connectRecv[peer] |= (1<connectRecv[peer] |= (1UL<stream == info->stream) break; // Already seen stream. + l = l->next; } } return ncclSuccess; diff --git a/src/graph/paths.cc b/src/graph/paths.cc index f8918b1..01f1582 100644 --- a/src/graph/paths.cc +++ b/src/graph/paths.cc @@ -399,6 +399,19 @@ ncclResult_t ncclTopoCheckGdr(struct ncclTopoSystem* system, int64_t busId, int return ncclSuccess; } +// Set to 0 to disable the flush on Hopper when using GDR +NCCL_PARAM(NetForceFlush, "NET_FORCE_FLUSH", 1); + +// Determine whether we need to flush the GDR recv buffers +ncclResult_t ncclTopoNeedFlush(struct ncclTopoSystem* system, int64_t busId, int* flush) { + int g; + NCCLCHECK(ncclTopoIdToIndex(system, GPU, busId, &g)); + struct ncclTopoNode* gpu = system->nodes[GPU].nodes+g; + // Flush is required on Ampere and earlier + *flush = gpu->gpu.cudaCompCap < 90 ? 1 : ncclParamNetForceFlush(); + return ncclSuccess; +} + NCCL_PARAM(NetDisableIntra, "NET_DISABLE_INTRA", 0); // Check whether going through the network would be faster than going through P2P/SHM. diff --git a/src/graph/search.cc b/src/graph/search.cc index 27a8e43..eb0b7dd 100644 --- a/src/graph/search.cc +++ b/src/graph/search.cc @@ -727,7 +727,7 @@ ncclResult_t ncclTopoGetXmlFromGraphs(int ngraphs, struct ncclTopoGraph** graphs } float speedArrayIntra[] = { 44.0, 30.0, 22.0, 18.0, 15.0, 12.0, 10.0, 9.0, 7.0, 6.0, 5.0, 4.0, 3.0 }; -float speedArrayInter[] = { 48.0, 30.0, 24.0, 22.0, 18.0, 15.0, 12.0, 10.0, 9.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.4, 1.2, 0.24, 0.12 }; +float speedArrayInter[] = { 48.0, 30.0, 28.0, 24.0, 22.0, 18.0, 15.0, 12.0, 10.0, 9.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.4, 1.2, 0.24, 0.12 }; #define NSPEEDSINTRA (sizeof(speedArrayIntra)/sizeof(float)) #define NSPEEDSINTER (sizeof(speedArrayInter)/sizeof(float)) diff --git a/src/graph/tuning.cc b/src/graph/tuning.cc index bc5e969..07a2104 100644 --- a/src/graph/tuning.cc +++ b/src/graph/tuning.cc @@ -72,10 +72,24 @@ static float hwLat [3][NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = /* CollNetDirect (Simple)*/ { 0, 0, 10.7 }, /* CollNetChain (Simple)*/ { 0, 0, 10.7 } } }; +/* Array indexes used below */ +#define VOLTA_COMPCAP_IDX 0 +#define AMPERE_COMPCAP_IDX 1 +#define HOPPER_COMPCAP_IDX 2 + // LL128 max BW per channel static const double ll128MaxBwPerCh = 20.0; -static const double llMaxBws[2][3] = { /* Volta-N1/Intel-N2/Intel-N4) */ {39.0, 39.0, 20.4}, /* Ampere-N1/AMD-N2/AMD-N4) */ {87.7, 22.5 /*avg of ring & tree*/, 19.0} }; -static const double perChMaxTreeBws[2][3] = { /* Volta (N1/N2/N4) */ {26.5, 18.5, 10.0}, /* Ampere (N1/N2/N4) */ {24.0, 23.6, 17.8} }; +static const double llMaxBws[3][3] = { + /* Volta-N1/Intel-N2/Intel-N4) */ {39.0, 39.0, 20.4}, + /* Ampere-N1/AMD-N2/AMD-N4) */ {87.7, 22.5 /*avg of ring & tree*/, 19.0}, + /* Hopper-N1/AMD-N2/AMD-N4) */ {87.7, 22.5 /*avg of ring & tree*/, 19.0} +}; + +static const double perChMaxTreeBws[3][3] = { + /* Volta (N1/N2/N4) */ {26.5, 18.5, 10.0}, + /* Ampere (N1/N2/N4) */ {24.0, 23.6, 17.8}, + /* Hopper (N1/N2/N4) */ {24.0, 23.6, 17.8}, +}; ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCompCap, struct ncclTopoGraph* treeGraph, struct ncclTopoGraph* ringGraph, struct ncclTopoGraph* collNetGraph) { int simpleDefaultThreads = (ringGraph->bwIntra*ringGraph->nChannels <= PCI_BW) ? 256 : NCCL_SIMPLE_MAX_NTHREADS; @@ -94,14 +108,14 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom int nRanks = comm->nRanks; if (nRanks <= 1) return ncclSuccess; - int compCap80 = minCompCap == 80 && maxCompCap == 80 ? 1 : 0; + int compCapIndex = (minCompCap == 80 && maxCompCap == 80) ? AMPERE_COMPCAP_IDX : ((minCompCap == 90 && maxCompCap == 90) ? HOPPER_COMPCAP_IDX : VOLTA_COMPCAP_IDX); int cpuArch, cpuVendor, cpuModel; NCCLCHECK(ncclTopoCpuType(comm->topo, &cpuArch, &cpuVendor, &cpuModel)); int index2 = nNodes <= 2 ? nNodes-1 : 2; // LL: for single node, we look at GPU type; for multi-node, we look at CPU type - int index1 = nNodes == 1 ? compCap80 : cpuVendor == NCCL_TOPO_CPU_VENDOR_AMD ? 1 : 0; + int index1 = nNodes == 1 ? compCapIndex : cpuVendor == NCCL_TOPO_CPU_VENDOR_AMD ? 1 : 0; double llMaxBw = llMaxBws[index1][index2]; - double perChMaxTreeBw = perChMaxTreeBws[compCap80][index2]; + double perChMaxTreeBw = perChMaxTreeBws[compCapIndex][index2]; // De-penalize Tree/Simple latency on Power systems to favor Tree than Ring if (cpuArch == NCCL_TOPO_CPU_ARCH_POWER) hwLat[NCCL_HW_PCI][NCCL_ALGO_TREE][NCCL_PROTO_SIMPLE] = hwLat[NCCL_HW_PCI][NCCL_ALGO_RING][NCCL_PROTO_SIMPLE]; float ppn = (float)nRanks / nNodes; // if ppn < 2, then we are sending/receiving at the same GPU through the NIC, apply some bw discount @@ -128,7 +142,7 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom float busBw = graphs[a]->nChannels * bw; // Various model refinements - if (compCap80) busBw = std::min(busBw, 235.0f); + if (compCapIndex == AMPERE_COMPCAP_IDX) busBw = std::min(busBw, 235.0f); if (a == NCCL_ALGO_RING && p == NCCL_PROTO_LL) { busBw = std::min(llMaxBw, busBw * ((nNodes > 1 || coll == ncclFuncAllReduce || coll == ncclFuncReduce) ? 1.0/4.0 : 1.0/3.0)); } if (a == NCCL_ALGO_RING && p == NCCL_PROTO_LL128) busBw = std::min(busBw * (ppn < 2 ? 0.7 : 0.92 /*120.0/128.0*/), ll128MaxBwPerCh*graphs[a]->nChannels); if (a == NCCL_ALGO_TREE) busBw = std::min(busBw*.92, graphs[a]->nChannels*perChMaxTreeBw); @@ -136,13 +150,13 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom if (a == NCCL_ALGO_TREE && p == NCCL_PROTO_LL128) busBw = std::min(busBw * (nNodes == 1 ? 7.0/9.0 : 120.0/128.0), ll128MaxBwPerCh*graphs[a]->nChannels); if (a == NCCL_ALGO_COLLNET_DIRECT && p != NCCL_PROTO_SIMPLE) busBw = 0; // Not used if (a == NCCL_ALGO_COLLNET_CHAIN && p != NCCL_PROTO_SIMPLE) busBw = 0; // Not used - if (a == NCCL_ALGO_COLLNET_DIRECT && p == NCCL_PROTO_SIMPLE) { + if (a == NCCL_ALGO_COLLNET_DIRECT && p == NCCL_PROTO_SIMPLE) { // Collnet+Direct requires all GPUs to have a local NIC to work at full speed float factor = ppn / (1.0*graphs[a]->nChannels); // GPU/NIC ratio - factor -= (factor-1)/2; + factor -= (factor-1)/2; busBw /= factor; - } - if (a == NCCL_ALGO_COLLNET_CHAIN && p == NCCL_PROTO_SIMPLE) busBw *= .75; + } + if (a == NCCL_ALGO_COLLNET_CHAIN && p == NCCL_PROTO_SIMPLE) busBw *= .75; // Convert bus BW to algorithm BW float ratio = (a != NCCL_ALGO_RING) ? .5 : (1.0 * nRanks) / nsteps; @@ -212,9 +226,9 @@ ncclResult_t ncclTopoTuneModel(struct ncclComm* comm, int minCompCap, int maxCom for (int c=0; ctypeInter <= PATH_PXB) && graphs[a]->typeIntra <= PATH_NVL && - ((minCompCap == 70 && maxCompCap == 70) || (minCompCap == 80 && maxCompCap == 80)) ? 1 : 0; + ((minCompCap == 70 && maxCompCap == 70) || (minCompCap == 80 && maxCompCap == 80) || (minCompCap == 90 && maxCompCap == 90)) ? 1 : 0; } if (pEnable == 0) comm->bandwidths[c][a][p] = 0; // Only disable algo for Allreduce since others only have one diff --git a/src/graph/xml.cc b/src/graph/xml.cc index 838a7f5..316d20f 100644 --- a/src/graph/xml.cc +++ b/src/graph/xml.cc @@ -628,7 +628,7 @@ ncclResult_t ncclTopoGetXmlFromGpu(struct ncclXmlNode* pciNode, nvmlDevice_t nvm NCCLCHECK(xmlGetSub(gpuNode, "nvlink", &nvlNode)); if (nvlNode == NULL) { // NVML NVLink detection - int maxNvLinks = (sm < 60) ? 0 : (sm < 70) ? 4 : (sm < 80) ? 6 : 12; + int maxNvLinks = (sm < 60) ? 0 : (sm < 70) ? 4 : (sm < 80) ? 6 : (sm < 90) ? 12 : 18; if (maxNvLinks > 0 && nvmlDev == NULL) { WARN("No NVML device handle. Skipping nvlink detection."); @@ -641,8 +641,21 @@ ncclResult_t ncclTopoGetXmlFromGpu(struct ncclXmlNode* pciNode, nvmlDevice_t nvm if ((ncclNvmlDeviceGetNvLinkCapability(nvmlDev, l, NVML_NVLINK_CAP_P2P_SUPPORTED, &canP2P) != ncclSuccess) || !canP2P) continue; // Make sure the Nvlink is up. The previous call should have trained the link. - nvmlEnableState_t isActive; - if ((ncclNvmlDeviceGetNvLinkState(nvmlDev, l, &isActive) != ncclSuccess) || (isActive != NVML_FEATURE_ENABLED)) continue; + nvmlEnableState_t isActive = NVML_FEATURE_DISABLED; +#if CUDART_VERSION >= 11080 + if (sm >= 90) { + nvmlFieldValue_t fv; + fv.fieldId = NVML_FI_DEV_NVLINK_GET_STATE; + fv.scopeId = l; + // fv.value will contain NV_FEATURE_ENABLED or NV_FEATURE_DISABLED + if ((ncclNvmlDeviceGetFieldValues(nvmlDev, 1, &fv) == ncclSuccess) && (fv.nvmlReturn == NVML_SUCCESS)) + isActive = (nvmlEnableState_t) fv.value.uiVal; + } else /* FALLTHRU to GetNvLinkState if before SM90 */ +#endif + { + (void) ncclNvmlDeviceGetNvLinkState(nvmlDev, l, &isActive); + } + if (isActive != NVML_FEATURE_ENABLED) continue; // Try to figure out what's on the other side of the NVLink nvmlPciInfo_t remoteProc; diff --git a/src/group.cc b/src/group.cc index 590068d..d246f28 100644 --- a/src/group.cc +++ b/src/group.cc @@ -211,8 +211,8 @@ static void groupCleanup(struct ncclComm** groupCommHeadPtr, struct ncclComm** g for (int i = 0; i < comm->nRanks; i++) { comm->tasks.peers[i].sendSeen = false; comm->tasks.peers[i].recvSeen = false; - comm->connectSend[i] = 0; - comm->connectRecv[i] = 0; + comm->connectSend[i] = 0UL; + comm->connectRecv[i] = 0UL; } comm->unlaunchedPlansHead = nullptr; // Reclaim abandoned kernel plan memory. Note ncclWork structs were already diff --git a/src/include/comm.h b/src/include/comm.h index 2adce32..16e95b3 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -168,8 +168,8 @@ struct ncclComm { ncclCollNet_t* ncclCollNet; void* bootstrap; // Bitmasks for ncclTransportP2pSetup - uint32_t* connectSend; - uint32_t* connectRecv; + uint64_t* connectSend; + uint64_t* connectRecv; int rank; // my rank in the communicator int nRanks; // number of GPUs in communicator diff --git a/src/include/cudawrap.h b/src/include/cudawrap.h index 2bd3b4d..0fd5945 100644 --- a/src/include/cudawrap.h +++ b/src/include/cudawrap.h @@ -8,6 +8,8 @@ #define NCCL_CUDAWRAP_H_ #include +#include +#include "checks.h" #if CUDART_VERSION >= 11030 #include @@ -83,6 +85,18 @@ DECLARE_CUDA_PFN_EXTERN(cuDriverGetVersion, 2020); DECLARE_CUDA_PFN_EXTERN(cuGetProcAddress, 11030); -ncclResult_t cudaLibraryInit(void); +ncclResult_t ncclCudaLibraryInit(void); + +extern int ncclCudaDriverVersionCache; + +inline ncclResult_t ncclCudaDriverVersion(int* driver) { + int version = __atomic_load_n(&ncclCudaDriverVersionCache, __ATOMIC_RELAXED); + if (version == -1) { + CUDACHECK(cudaDriverGetVersion(&version)); + __atomic_store_n(&ncclCudaDriverVersionCache, version, __ATOMIC_RELAXED); + } + *driver = version; + return ncclSuccess; +} #endif diff --git a/src/include/graph.h b/src/include/graph.h index 63b05b1..26c1e76 100644 --- a/src/include/graph.h +++ b/src/include/graph.h @@ -33,6 +33,7 @@ ncclResult_t ncclTopoGetNvbGpus(struct ncclTopoSystem* system, int rank, int* nr ncclResult_t ncclTopoGetNetDev(struct ncclComm* comm, int rank, struct ncclTopoGraph* graph, int channelId, int peerRank, int* net, int* proxyRank); ncclResult_t ncclTopoCheckP2p(struct ncclTopoSystem* system, int64_t id1, int64_t id2, int* p2p, int *read, int* intermediateRank); ncclResult_t ncclTopoCheckGdr(struct ncclTopoSystem* topo, int64_t busId, int netDev, int read, int* useGdr); +ncclResult_t ncclTopoNeedFlush(struct ncclTopoSystem* system, int64_t busId, int* flush); ncclResult_t ncclTopoCheckNet(struct ncclTopoSystem* system, int64_t id1, int64_t id2, int* net); int ncclPxnDisable(struct ncclComm* comm); ncclResult_t ncclTopoGetPxnRanks(struct ncclComm* comm, int** intermediateRanks, int* nranks); diff --git a/src/include/nvmlwrap.h b/src/include/nvmlwrap.h index 29731dd..fa1f5cf 100644 --- a/src/include/nvmlwrap.h +++ b/src/include/nvmlwrap.h @@ -107,6 +107,75 @@ typedef enum nvmlGpuP2PCapsIndex_enum NVML_P2P_CAPS_INDEX_UNKNOWN } nvmlGpuP2PCapsIndex_t; +/** + * Represents the type for sample value returned + */ +typedef enum nvmlValueType_enum +{ + NVML_VALUE_TYPE_DOUBLE = 0, + NVML_VALUE_TYPE_UNSIGNED_INT = 1, + NVML_VALUE_TYPE_UNSIGNED_LONG = 2, + NVML_VALUE_TYPE_UNSIGNED_LONG_LONG = 3, + NVML_VALUE_TYPE_SIGNED_LONG_LONG = 4, + + // Keep this last + NVML_VALUE_TYPE_COUNT +}nvmlValueType_t; + + +/** + * Union to represent different types of Value + */ +typedef union nvmlValue_st +{ + double dVal; //!< If the value is double + unsigned int uiVal; //!< If the value is unsigned int + unsigned long ulVal; //!< If the value is unsigned long + unsigned long long ullVal; //!< If the value is unsigned long long + signed long long sllVal; //!< If the value is signed long long +}nvmlValue_t; + +/** + * Field Identifiers. + * + * All Identifiers pertain to a device. Each ID is only used once and is guaranteed never to change. + */ + +/* NVLink Speed */ +#define NVML_FI_DEV_NVLINK_SPEED_MBPS_COMMON 90 //!< Common NVLink Speed in MBps for active links +#define NVML_FI_DEV_NVLINK_LINK_COUNT 91 //!< Number of NVLinks present on the device + +/** + * Remote device NVLink ID + * + * Link ID needs to be specified in the scopeId field in nvmlFieldValue_t. + */ +#define NVML_FI_DEV_NVLINK_REMOTE_NVLINK_ID 146 //!< Remote device NVLink ID + +/** + * NVSwitch: connected NVLink count + */ +#define NVML_FI_DEV_NVSWITCH_CONNECTED_LINK_COUNT 147 //!< Number of NVLinks connected to NVSwitch + +#define NVML_FI_DEV_NVLINK_GET_SPEED 164 +#define NVML_FI_DEV_NVLINK_GET_STATE 165 +#define NVML_FI_DEV_NVLINK_GET_VERSION 166 +#define NVML_FI_MAX 167 //!< One greater than the largest field ID defined above + +/** + * Information for a Field Value Sample + */ +typedef struct nvmlFieldValue_st +{ + unsigned int fieldId; //!< ID of the NVML field to retrieve. This must be set before any call that uses this struct. See the constants starting with NVML_FI_ above. + unsigned int scopeId; //!< Scope ID can represent data used by NVML depending on fieldId's context. For example, for NVLink throughput counter data, scopeId can represent linkId. + long long timestamp; //!< CPU Timestamp of this value in microseconds since 1970 + long long latencyUsec; //!< How long this field value took to update (in usec) within NVML. This may be averaged across several fields that are serviced by the same driver call. + nvmlValueType_t valueType; //!< Type of the value stored in value + nvmlReturn_t nvmlReturn; //!< Return code for retrieving this value. This must be checked before looking at value, as value is undefined if nvmlReturn != NVML_SUCCESS + nvmlValue_t value; //!< Value for this field. This is only valid if nvmlReturn == NVML_SUCCESS +} nvmlFieldValue_t; + /* End of nvml.h */ #endif // NCCL_NVML_DIRECT @@ -135,4 +204,6 @@ ncclResult_t ncclNvmlDeviceGetNvLinkRemotePciInfo(nvmlDevice_t device, unsigned ncclResult_t ncclNvmlDeviceGetNvLinkCapability(nvmlDevice_t device, unsigned int link, nvmlNvLinkCapability_t capability, unsigned int *capResult); ncclResult_t ncclNvmlDeviceGetCudaComputeCapability(nvmlDevice_t device, int* major, int* minor); ncclResult_t ncclNvmlDeviceGetP2PStatus(nvmlDevice_t device1, nvmlDevice_t device2, nvmlGpuP2PCapsIndex_t p2pIndex, nvmlGpuP2PStatus_t* p2pStatus); +ncclResult_t ncclNvmlDeviceGetFieldValues(nvmlDevice_t device, int valuesCount, nvmlFieldValue_t *values); + #endif // End include guard diff --git a/src/include/strongstream.h b/src/include/strongstream.h index b72f77c..74df610 100644 --- a/src/include/strongstream.h +++ b/src/include/strongstream.h @@ -22,7 +22,7 @@ struct ncclCudaGraph { #endif }; -inline struct ncclCudaGraph ncclCudaGraphNull() { +inline struct ncclCudaGraph ncclCudaGraphNone() { struct ncclCudaGraph tmp; #if CUDART_VERSION >= 11030 tmp.graph = nullptr; @@ -50,7 +50,6 @@ inline bool ncclCudaGraphSame(struct ncclCudaGraph a, struct ncclCudaGraph b) { ncclResult_t ncclCudaGetCapturingGraph(struct ncclCudaGraph* graph, cudaStream_t stream); ncclResult_t ncclCudaGraphAddDestructor(struct ncclCudaGraph graph, cudaHostFn_t fn, void* arg); - /* ncclStrongStream: An abstraction over CUDA streams that do not lose their * identity while being captured. Regular streams have the deficiency that the * captured form of a stream in one graph launch has no relation to the @@ -88,7 +87,7 @@ ncclResult_t ncclStrongStreamAcquire( // Acquire-fence the strong stream assuming no graph is capturing. This permits // the caller to enqueue directly to the `ss->stream` member using native CUDA // calls. Strong stream must be released via: -// ncclStrongStreamRelease(ncclCudaGraphNull(), graphRefs, ss); +// ncclStrongStreamRelease(ncclCudaGraphNone(), ss); ncclResult_t ncclStrongStreamAcquireUncaptured(struct ncclStrongStream* ss); // Release-fence of the strong stream. diff --git a/src/init.cc b/src/init.cc index 8023c49..397946e 100644 --- a/src/init.cc +++ b/src/init.cc @@ -416,7 +416,7 @@ static ncclResult_t devCommSetup(ncclComm_t comm) { NCCLCHECK(ncclCudaMemcpyAsync(devCommAndChans, &tmpCommAndChans, 1, comm->deviceStream.stream)); CUDACHECK(cudaStreamSynchronize(comm->deviceStream.stream)); - NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNull(), &comm->deviceStream)); + NCCLCHECK(ncclStrongStreamRelease(ncclCudaGraphNone(), &comm->deviceStream)); return ncclSuccess; } @@ -955,13 +955,13 @@ collnet_cleanup: for (int c=0; cp2pnChannelsPerPeer; c++) { NCCLCHECK(ncclChannelCompute(comm, peer, c, ncclFuncSend, &channelId)); if (comm->channels[channelId].peers[peer].send[1].connected == 0) { - comm->connectSend[peer] |= (1<connectSend[peer] |= (1UL<p2pnChannelsPerPeer; c++) { NCCLCHECK(ncclChannelCompute(comm, peer, c, ncclFuncRecv, &channelId)); if (comm->channels[channelId].peers[peer].recv[1].connected == 0) { - comm->connectRecv[peer] |= (1<connectRecv[peer] |= (1UL<blocking = blockingEnv; - (void) cudaLibraryInit(); + (void)ncclCudaLibraryInit(); CUDACHECKGOTO(cudaGetDevice(&cudaDev), ret, exit); NCCLCHECKGOTO(ncclCommInitRankDev(newcomm, nranks, commId, myrank, cudaDev, internalConfigPtr), ret, fail); diff --git a/src/misc/cudawrap.cc b/src/misc/cudawrap.cc index 52663b5..b1786f4 100644 --- a/src/misc/cudawrap.cc +++ b/src/misc/cudawrap.cc @@ -38,7 +38,7 @@ DECLARE_CUDA_PFN(cuGetProcAddress, 11030); #define CUDA_DRIVER_MIN_VERSION 11030 static void *cudaLib; -static int cudaDriverVersion; +int ncclCudaDriverVersionCache = -1; #if CUDART_VERSION >= 11030 /* @@ -107,16 +107,17 @@ static void initOnceFunc() { goto error; } - res = pfn_cuDriverGetVersion(&cudaDriverVersion); + int driverVersion; + res = pfn_cuDriverGetVersion(&driverVersion); if (res != 0) { WARN("cuDriverGetVersion failed with %d", res); goto error; } - INFO(NCCL_INIT, "cudaDriverVersion %d", cudaDriverVersion); + INFO(NCCL_INIT, "cudaDriverVersion %d", driverVersion); - if (cudaDriverVersion < CUDA_DRIVER_MIN_VERSION) { - // WARN("CUDA Driver version found is %d. Minimum requirement is %d", cudaDriverVersion, CUDA_DRIVER_MIN_VERSION); + if (driverVersion < CUDA_DRIVER_MIN_VERSION) { + // WARN("CUDA Driver version found is %d. Minimum requirement is %d", driverVersion, CUDA_DRIVER_MIN_VERSION); // Silently ignore version check mismatch for backwards compatibility goto error; } @@ -148,7 +149,7 @@ error: return; } -ncclResult_t cudaLibraryInit() { +ncclResult_t ncclCudaLibraryInit() { pthread_once(&initOnceControl, initOnceFunc); return initResult; } diff --git a/src/misc/nvmlwrap.cc b/src/misc/nvmlwrap.cc index 5db7c6b..2de993a 100644 --- a/src/misc/nvmlwrap.cc +++ b/src/misc/nvmlwrap.cc @@ -38,6 +38,7 @@ namespace { NCCL_NVML_FN(nvmlDeviceGetNvLinkCapability, nvmlReturn_t, (nvmlDevice_t device, unsigned int link, nvmlNvLinkCapability_t capability, unsigned int *capResult)) NCCL_NVML_FN(nvmlDeviceGetCudaComputeCapability, nvmlReturn_t, (nvmlDevice_t device, int* major, int* minor)) NCCL_NVML_FN(nvmlDeviceGetP2PStatus, nvmlReturn_t, (nvmlDevice_t device1, nvmlDevice_t device2, nvmlGpuP2PCapsIndex_t p2pIndex, nvmlGpuP2PStatus_t* p2pStatus)) + NCCL_NVML_FN(nvmlDeviceGetFieldValues, nvmlReturn_t, (nvmlDevice_t device, int valuesCount, nvmlFieldValue_t *values)) std::mutex lock; // NVML has had some thread safety bugs bool initialized = false; @@ -80,7 +81,8 @@ ncclResult_t ncclNvmlEnsureInitialized() { {(void**)&pfn_nvmlDeviceGetNvLinkRemotePciInfo, "nvmlDeviceGetNvLinkRemotePciInfo"}, {(void**)&pfn_nvmlDeviceGetNvLinkCapability, "nvmlDeviceGetNvLinkCapability"}, {(void**)&pfn_nvmlDeviceGetCudaComputeCapability, "nvmlDeviceGetCudaComputeCapability"}, - {(void**)&pfn_nvmlDeviceGetP2PStatus, "nvmlDeviceGetP2PStatus"} + {(void**)&pfn_nvmlDeviceGetP2PStatus, "nvmlDeviceGetP2PStatus"}, + {(void**)&pfn_nvmlDeviceGetFieldValues, "nvmlDeviceGetFieldValues"} }; for(Symbol sym: symbols) { *sym.ppfn = dlsym(libhandle, sym.name); @@ -260,3 +262,10 @@ ncclResult_t ncclNvmlDeviceGetP2PStatus( } return ncclSuccess; } + +ncclResult_t ncclNvmlDeviceGetFieldValues(nvmlDevice_t device, int valuesCount, nvmlFieldValue_t *values) { + NCCLCHECK(ncclNvmlEnsureInitialized()); + std::lock_guard locked(lock); + NVMLTRY(nvmlDeviceGetFieldValues, device, valuesCount, values); + return ncclSuccess; +} diff --git a/src/misc/strongstream.cc b/src/misc/strongstream.cc index 4933799..0524223 100644 --- a/src/misc/strongstream.cc +++ b/src/misc/strongstream.cc @@ -5,6 +5,7 @@ ************************************************************************/ #include "strongstream.h" +#include "cudawrap.h" #include "checks.h" #include "param.h" @@ -14,10 +15,8 @@ ncclResult_t ncclCudaGetCapturingGraph( struct ncclCudaGraph* graph, cudaStream_t stream ) { #if CUDART_VERSION >= 11030 - thread_local int driver = -1; - if (driver == -1) { - CUDACHECK(cudaDriverGetVersion(&driver)); - } + int driver; + NCCLCHECK(ncclCudaDriverVersion(&driver)); if (driver < 11030) { cudaStreamCaptureStatus status; unsigned long long gid; @@ -192,11 +191,11 @@ ncclResult_t ncclStrongStreamWaitStream( CUDACHECK(cudaEventRecord(b->event, b->stream)); } CUDACHECK(cudaStreamWaitEvent(a->stream, b->event, 0)); - a->eventIsLagging = 1; } else { cudaGraphNode_t pair[2] = {a->node, b->node}; CUDACHECK(cudaGraphAddEmptyNode(&a->node, graph.graph, pair, 2)); } + a->eventIsLagging = 1; #else CUDACHECK(cudaEventRecord(b->event, b->stream)); CUDACHECK(cudaStreamWaitEvent(a->stream, b->event, 0)); @@ -232,9 +231,8 @@ ncclResult_t ncclStrongStreamWaitStream( } cudaGraphNode_t pair[2] = {a->node, tie}; CUDACHECK(cudaGraphAddEmptyNode(&a->node, graph.graph, pair, 2)); + a->eventIsLagging = 1; } - // a->eventIsLagging doesn't change since we are just updating the - // dependencies of a->node. } #else CUDACHECK(cudaEventRecord(a->event, b)); diff --git a/src/proxy.cc b/src/proxy.cc index 1a2f361..696f57f 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -1055,8 +1055,8 @@ void* ncclProxyService(void* _args) { int asyncOpCount = 0; while ((stop == 0 || (stop == 1 && npeers > 0)) && *comm->abortFlag == 0) { /* never let proxy service thread blocks in poll, or it cannot receive abortFlag. */ - if (int error = poll(pollfds, NCCL_MAX_LOCAL_RANKS+1, asyncOpCount ? 0 : 500) < 0) { - WARN("[Proxy Service] Poll failed with error %d", error); + if (poll(pollfds, NCCL_MAX_LOCAL_RANKS+1, asyncOpCount ? 0 : 500) < 0) { + WARN("[Proxy Service] Poll failed: %s\n", strerror(errno)); return NULL; } if (pollfds[NCCL_MAX_LOCAL_RANKS].revents) { diff --git a/src/transport.cc b/src/transport.cc index 7ebaf27..b3ca90d 100644 --- a/src/transport.cc +++ b/src/transport.cc @@ -42,7 +42,7 @@ static ncclResult_t selectTransport(struct ncclComm* comm, struct ncclTopoGraph* ncclResult_t ncclTransportP2pConnect(struct ncclComm* comm, int channelId, int nrecv, int* peerRecv, int nsend, int* peerSend, int connIndex) { TRACE(NCCL_INIT, "nsend %d nrecv %d", nsend, nrecv); struct ncclChannel* channel = &comm->channels[channelId]; - uint32_t mask = 1 << channelId; + uint64_t mask = 1UL << channel->id; for (int i=0; i= comm->nRanks || peer == comm->rank || channel->peers[peer].recv[connIndex].connected) continue; @@ -77,15 +77,15 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* int bootstrapTag = (i<<8) + (graph ? graph->id+1 : 0); int recvPeer = (comm->rank - i + comm->nRanks) % comm->nRanks; int sendPeer = (comm->rank + i) % comm->nRanks; - uint32_t recvMask = comm->connectRecv[recvPeer]; - uint32_t sendMask = comm->connectSend[sendPeer]; + uint64_t recvMask = comm->connectRecv[recvPeer]; + uint64_t sendMask = comm->connectSend[sendPeer]; struct ncclConnect* recvData = data; int sendChannels = 0, recvChannels = 0; int type; TIME_START(0); for (int c=0; c(comm, graph, recvData+recvChannels++, c, recvPeer, connIndex, &type)); if (type > highestType) highestType = type; } @@ -94,7 +94,7 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* TIME_START(1); struct ncclConnect* sendData = recvData+recvChannels; for (int c=0; c(comm, graph, sendData+sendChannels++, c, sendPeer, connIndex, &type)); if (type > highestType) highestType = type; } @@ -119,7 +119,7 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* TIME_START(3); for (int c=0; cchannels[c].peers[sendPeer].send + connIndex; NCCLCHECK(conn->transportComm->connect(comm, sendData++, 1, comm->rank, conn)); conn->connected = 1; @@ -129,7 +129,7 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* TIME_STOP(3); TIME_START(4); for (int c=0; cchannels[c].peers[recvPeer].recv + connIndex; NCCLCHECK(conn->transportComm->connect(comm, recvData++, 1, comm->rank, conn)); conn->connected = 1; @@ -137,7 +137,7 @@ ncclResult_t ncclTransportP2pSetup(struct ncclComm* comm, struct ncclTopoGraph* } } TIME_STOP(4); - comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0; + comm->connectRecv[recvPeer] = comm->connectSend[sendPeer] = 0UL; } CUDACHECK(cudaStreamSynchronize(transportSetupStream)); CUDACHECK(cudaStreamDestroy(transportSetupStream)); diff --git a/src/transport/coll_net.cc b/src/transport/coll_net.cc index 432511c..de10f2f 100644 --- a/src/transport/coll_net.cc +++ b/src/transport/coll_net.cc @@ -121,6 +121,7 @@ struct recvResources { int netDev; int useGdr; int useDmaBuf; + int needFlush; uint64_t* gdcSync; uint64_t* gdcFlush; void* gdrDesc; @@ -139,6 +140,7 @@ static ncclResult_t canConnect(int* ret, struct ncclTopoSystem* topo, struct ncc struct setupReq { int netDev; int useGdr; + int needFlush; }; @@ -151,6 +153,8 @@ static ncclResult_t sendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph NCCLCHECK(ncclTopoGetNetDev(comm, myInfo->rank, graph, channelId, -1, &req.netDev, &proxyRank)); NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, req.netDev, 1, &req.useGdr)); send->conn.direct |= req.useGdr ? NCCL_DIRECT_NIC : 0; + // Determine whether we need to flush the GDR buffer on recv or not + if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush)); NCCLCHECK(ncclTopoGetLocalRank(comm->topo, myInfo->rank, &send->proxyConn.localRank)); NCCLCHECK(ncclProxyConnect(comm, TRANSPORT_COLLNET, 1, myInfo->rank, &send->proxyConn)); @@ -392,6 +396,7 @@ static ncclResult_t recvProxySetup(struct ncclProxyConnection* connection, struc resources->netDev = req->netDev; resources->useGdr = req->useGdr; + resources->needFlush = req->needFlush; ncclNetProperties_t props; NCCLCHECK(collNetGetProperties(comm, req->netDev, &props)); /* DMA-BUF support */ @@ -754,7 +759,7 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg TRACE(NCCL_NET, "recvProxy [%d/%d/%d] received, size %d", sub->received, group, buffSlot, totalSize); sub->received += args->sliceSteps; sub->requests[buffSlot] = NULL; - if (1 && reqFifo[group][buffSlot].size > 0 && resources->useGdr) { + if (reqFifo[group][buffSlot].size > 0 && resources->useGdr && resources->needFlush) { // GDRCOPY support if (resources->gdcFlush) { #if defined (__x86_64__) diff --git a/src/transport/net.cc b/src/transport/net.cc index 12390c0..a3a1579 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -118,6 +118,7 @@ struct recvResources { int netDev; int useGdr; int useDmaBuf; + int needFlush; int maxRecvs; uint64_t* gdcSync; uint64_t* gdcFlush; @@ -152,6 +153,7 @@ struct setupReq { int shared; int netDev; int useGdr; + int needFlush; int channelId; int connIndex; }; @@ -205,6 +207,9 @@ static ncclResult_t recvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph NCCLCHECK(ncclTopoGetNetDev(comm, myInfo->rank, graph, channelId, myInfo->rank, &req.netDev, &proxyRank)); NCCLCHECK(ncclTopoCheckGdr(comm->topo, myInfo->busId, req.netDev, 0, &req.useGdr)); + // Determine whether we need to flush the GDR buffer on recv or not + if (req.useGdr) NCCLCHECK(ncclTopoNeedFlush(comm->topo, myInfo->busId, &req.needFlush)); + // We don't support PXN on receive yet NCCLCHECK(ncclProxyConnect(comm, TRANSPORT_NET, 0, myInfo->rank, &recv->proxyConn)); @@ -470,6 +475,7 @@ static ncclResult_t recvProxySetup(struct ncclProxyConnection* connection, struc resources->netDev = req->netDev; resources->shared = connection->shared = req->shared; resources->useGdr = req->useGdr; + resources->needFlush = req->needFlush; resources->channelId = req->channelId; resources->connIndex = req->connIndex; ncclNetProperties_t props; @@ -1033,7 +1039,7 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg for (int i=0; irequests[step%NCCL_STEPS], &done, sizes)); if (done) { - int useGdr = 0; + int needFlush = 0; int totalSize = 0; for (int i=0; igroupSize; i++) { @@ -1042,11 +1048,11 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg for (uint64_t step=sub->received-args->sliceSteps; stepreceived; step++) ncclProfilingRecord(args, s+i, step, ncclProxyProfileRecvFlushWait); if (step < sub->nsteps) { struct recvResources* resources = (struct recvResources*) (sub->connection->transportResources); - if (resources->useGdr) useGdr = 1; + if (resources->useGdr) needFlush |= resources->needFlush; } } subGroup->requests[step%NCCL_STEPS] = NULL; - if (totalSize > 0 && p == NCCL_PROTO_SIMPLE && useGdr) { + if (totalSize > 0 && p == NCCL_PROTO_SIMPLE && needFlush) { // GDRCOPY support struct recvResources* resources = (struct recvResources*) (subGroup->connection->transportResources); if (resources->gdcFlush) {