diff --git a/makefiles/version.mk b/makefiles/version.mk index 7c9bf0f..88656d9 100644 --- a/makefiles/version.mk +++ b/makefiles/version.mk @@ -1,6 +1,6 @@ ##### version NCCL_MAJOR := 2 NCCL_MINOR := 12 -NCCL_PATCH := 10 +NCCL_PATCH := 12 NCCL_SUFFIX := PKG_REVISION := 1 diff --git a/src/bootstrap.cc b/src/bootstrap.cc index daaa8cd..4f7f48c 100644 --- a/src/bootstrap.cc +++ b/src/bootstrap.cc @@ -165,8 +165,8 @@ ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) { memcpy(id, &listenSock->addr, sizeof(union ncclSocketAddress)); pthread_t thread; pthread_create(&thread, NULL, bootstrapRoot, (void*)listenSock); - pthread_detach(thread); // will not be pthread_join()'d ncclSetThreadName(thread, "NCCL BootstrapR"); + pthread_detach(thread); // will not be pthread_join()'d return ncclSuccess; } diff --git a/src/enqueue.cc b/src/enqueue.cc index a15c370..349cb2b 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -9,6 +9,7 @@ #include "coll_net.h" #include "gdrwrap.h" #include "bootstrap.h" +#include "channel.h" #include // std::memcpy @@ -861,20 +862,14 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { struct ncclComm* comm = info->comm; int peer = info->root; ssize_t nBytes = info->count*ncclTypeSize(info->datatype); - int p2pGroupSize = NCCL_MAX_WORK_ELEMENTS_P2P/2; - int peerNode = comm->rankToNode[peer]; - int peerIndex = comm->rankToLocalRank[peer]; - int nsteps = comm->maxLocalRanks; - int rankIndex = comm->rankToLocalRank[comm->rank]; + int channelBaseId; + NCCLCHECK(ncclChannelComputeBase(comm, peer, info->coll, &channelBaseId)); if (info->coll == ncclFuncSend) { if (peer != comm->rank) { - int step = (nsteps + peerIndex - rankIndex)%nsteps; - int delta = (comm->nNodes + peerNode - comm->node) % comm->nNodes; - if (comm->nNodes == 1) delta = (comm->nRanks + peer - comm->rank) % comm->nRanks; // Mark channels that need pre-connect for (int c=0; cp2pnChannelsPerPeer; c++) { - int shuffle = comm->nNodes > 1 ? delta+(step/p2pGroupSize) : step; - int channelId = (shuffle+comm->p2pChannels[c]) % comm->p2pnChannels; + int channelId; + NCCLCHECK(ncclChannelComputeFromBase(comm, channelBaseId, c, &channelId)); if (comm->channels[channelId].peers[peer].send[1].connected == 0) { // P2P uses only 1 connector comm->connectSend[peer] |= (1<connect = 1; @@ -885,13 +880,10 @@ static ncclResult_t ncclSaveP2p(struct ncclInfo* info) { comm->p2pSendCount++; } else { if (peer != comm->rank) { - int step = (nsteps + rankIndex - peerIndex)%nsteps; - int delta = (comm->nNodes + comm->node - peerNode) % comm->nNodes; - if (comm->nNodes == 1) delta = (comm->nRanks - peer + comm->rank) % comm->nRanks; // Mark channels that need pre-connect for (int c=0; cp2pnChannelsPerPeer; c++) { - int shuffle = comm->nNodes > 1 ? delta+(step/p2pGroupSize) : step; - int channelId = (shuffle+comm->p2pChannels[c]) % comm->p2pnChannels; + int channelId; + NCCLCHECK(ncclChannelComputeFromBase(comm, channelBaseId, c, &channelId)); if (comm->channels[channelId].peers[peer].recv[1].connected == 0) { // P2P uses only 1 connector comm->connectRecv[peer] |= (1<connect = 1; diff --git a/src/graph/paths.cc b/src/graph/paths.cc index 2bd52b0..222be70 100644 --- a/src/graph/paths.cc +++ b/src/graph/paths.cc @@ -228,6 +228,9 @@ ncclResult_t ncclGetLevel(int* level, const char* disableEnv, const char* levelE } } // Old style numbering + // levelsOldToNew to is an array with each index corresponding to the + // "old level" int, and each value mapping to the correct value defined in topo.h + // maxOldLevel is a quick check to handle out of bounds (based on the length of levelsOldToNew) if (l == -1 && str[0] >= '0' && str[0] <= '9') { int oldLevel = strtol(str, NULL, 0); const int maxOldLevel = sizeof(levelsOldToNew)/sizeof(int) - 1; @@ -521,24 +524,27 @@ ncclResult_t ncclTopoComputePaths(struct ncclTopoSystem* system, struct ncclPeer // Check whether we can access the NIC through another NVLink-connected GPU (PXN) struct ncclTopoNode* gpu = system->nodes[GPU].nodes+g; if (ncclPxnDisable() != 1 && gpu->paths[NET][n].type > PATH_PXB) { + int pxnGpu = -1; + for (int p=0; pnodes[GPU].count; p++) { if (p == g) continue; - struct ncclTopoNode* peerNode = system->nodes[GPU].nodes+p; - - // To ensure proper balancing, use only a local GPU which advertised that NIC as its preferred one. - int netDev; - NCCLCHECK(ncclTopoGetLocalNet(system, peerNode->gpu.rank, &netDev)); - // Make sure we can allocate memory on that GPU. - if (netDev != netNode->id) continue; // PXN = PCI + NVLink. - if (netNode->paths[GPU][p].type > PATH_PXB || peerNode->paths[GPU][g].type > PATH_NVL) continue; + struct ncclTopoNode* peerNode = system->nodes[GPU].nodes+p; + if (peerNode->paths[NET][n].type > PATH_PXB || peerNode->paths[GPU][g].type > PATH_NVL) continue; + pxnGpu = p; + + int netDev; + NCCLCHECK(ncclTopoGetLocalNet(system, peerNode->gpu.rank, &netDev)); + // To ensure proper balancing, use preferably a local GPU which advertised that NIC as its preferred one. + if (netDev == netNode->id) break; + } + if (pxnGpu != -1) { // We can use that GPU as relay to communicate with that NIC. // Only enabling it in the GPU->NIC direction for now to favor // receiving locally and sending remotely (consistent with net.cc) - NCCLCHECK(addInterStep(system, GPU, p, GPU, g, NET, n)); - break; + NCCLCHECK(addInterStep(system, GPU, pxnGpu, GPU, g, NET, n)); } } // Update path when we dont want to / can't use GPU Direct RDMA. diff --git a/src/graph/topo.cc b/src/graph/topo.cc index 83f125f..53e12e5 100644 --- a/src/graph/topo.cc +++ b/src/graph/topo.cc @@ -371,7 +371,7 @@ ncclResult_t ncclTopoAddGpu(struct ncclXmlNode* xmlGpu, struct ncclTopoSystem* s struct kvDict kvDictPciClass[] = { { "0x060400", PCI }, { "0x068000", NVS }, { "0x068001", CPU }, { "0x03", GPU }, { "0x02", NIC }, { NULL, PCI /* Default fallback value */ } }; struct kvDict kvDictPciGen[] = { - { "2.5 GT/s", 15 }, { "5 GT/s", 30 }, { "8 GT/s", 60 }, { "16 GT/s", 120 }, /* Kernel 5.6 and earlier */ + { "2.5 GT/s", 15 }, { "5 GT/s", 30 }, { "8 GT/s", 60 }, { "16 GT/s", 120 }, { "32 GT/s", 240 }, /* Kernel 5.6 and earlier */ { "2.5 GT/s PCIe", 15 }, { "5.0 GT/s PCIe", 30 }, { "8.0 GT/s PCIe", 60 }, { "16.0 GT/s PCIe", 120 }, { "32.0 GT/s PCIe", 240 }, { "64.0 GT/s PCIe", 480 }, { NULL, 60 /* Default fallback */ } }; // x100 Mbps per lane ncclResult_t ncclTopoAddPci(struct ncclXmlNode* xmlPci, struct ncclTopoSystem* system, struct ncclTopoNode* parent) { diff --git a/src/graph/topo.h b/src/graph/topo.h index ada1732..71c1fca 100644 --- a/src/graph/topo.h +++ b/src/graph/topo.h @@ -49,13 +49,28 @@ extern const char* topoNodeTypeStr[]; #define LINK_NET 8 extern const char* topoLinkTypeStr[]; +// Local (myself) #define PATH_LOC 0 + +// Connection traversing NVLink #define PATH_NVL 1 + +// Connection through NVLink using an intermediate GPU #define PATH_NVB 2 + +// Connection traversing at most a single PCIe bridge #define PATH_PIX 3 + +// Connection traversing multiple PCIe bridges (without traversing the PCIe Host Bridge) #define PATH_PXB 4 + +// Connection between a GPU and a NIC using an intermediate GPU. Used to enable rail-local, aggregated network send/recv operations. #define PATH_PXN 5 + +// Connection traversing PCIe as well as a PCIe Host Bridge (typically the CPU) #define PATH_PHB 6 + +// Connection traversing PCIe as well as the SMP interconnect between NUMA nodes (e.g., QPI/UPI) #define PATH_SYS 7 #define PATH_DIS 7 extern const char* topoPathTypeStr[]; diff --git a/src/group.cc b/src/group.cc index f2b9e37..5f65a58 100644 --- a/src/group.cc +++ b/src/group.cc @@ -8,6 +8,7 @@ #include "debug.h" #include "enqueue.h" #include "transport.h" +#include "channel.h" #define MAX_ASYNC_OPS 128 thread_local pthread_t ncclGroupThreads[MAX_ASYNC_OPS]; @@ -101,18 +102,22 @@ ncclResult_t ncclGroupStart() { return ncclSuccess; } -static ncclResult_t scheduleSend(struct ncclComm* comm, int peer, int channelId, size_t count, void* buff) { +static ncclResult_t scheduleSend(struct ncclComm* comm, int peer, int chunk, size_t count, void* buff) { struct ncclInfo info = { ncclFuncSend, "Send", NULL, buff, count, ncclInt8, ncclSum, peer, comm, comm->userStream, /* Args */ 1, 1 }; + int channelId; + NCCLCHECK(ncclChannelCompute(comm, peer, chunk%comm->p2pnChannelsPerPeer, ncclFuncSend, &channelId)); info.channelId = channelId; NCCLCHECK(ncclSetupP2pKernel(&info)); return ncclSuccess; } -static ncclResult_t scheduleRecv(struct ncclComm* comm, int peer, int channelId, size_t count, void* buff) { +static ncclResult_t scheduleRecv(struct ncclComm* comm, int peer, int chunk, size_t count, void* buff) { struct ncclInfo info = { ncclFuncRecv, "Recv", NULL, buff, count, ncclInt8, ncclSum, peer, comm, comm->userStream, /* Args */ 1, 1 }; + int channelId; + NCCLCHECK(ncclChannelCompute(comm, peer, chunk%comm->p2pnChannelsPerPeer, ncclFuncRecv, &channelId)); info.channelId = channelId; NCCLCHECK(ncclSetupP2pKernel(&info)); return ncclSuccess; @@ -208,7 +213,6 @@ ncclResult_t ncclGroupEnd() { int node = comm->node; int nNodes = comm->nNodes; int localRank = comm->localRank; - int p2pGroupSize = NCCL_MAX_WORK_ELEMENTS_P2P/2; // Compute how much to split operations // Natural step size matching buffer steps. @@ -266,8 +270,6 @@ sched_delta: do { // Shuffle channels with s intra-node, and delta inter-node. Inter-node, make sure // to use multiple channels to guarantee progress on all ranks from the same node. - int shuffle = comm->nNodes > 1 ? delta+(s/p2pGroupSize) : s; - int channelId = (shuffle+comm->p2pChannels[chunk%comm->p2pnChannelsPerPeer]) % comm->p2pnChannels; ssize_t recvbytes = totRecvBytes-recvOffset; ssize_t sendbytes = totSendBytes-sendOffset; if (recvbytes > recvChunkSize) { recvbytes = recvChunkSize; } else { recvRemaining = 0; } @@ -277,10 +279,10 @@ sched_delta: if (sendbytes < 0 || (sendbytes == 0 && totSendBytes != 0)) send = NULL; if (recvbytes < 0 || (recvbytes == 0 && totRecvBytes != 0)) recv = NULL; if (recv) { - NCCLCHECKGOTO(scheduleRecv(comm, recvPeer, channelId, recvbytes, ((char*)recvBuff)+recvOffset), ret, group_cleanup); + NCCLCHECKGOTO(scheduleRecv(comm, recvPeer, chunk, recvbytes, ((char*)recvBuff)+recvOffset), ret, group_cleanup); } if (send) { - NCCLCHECKGOTO(scheduleSend(comm, sendPeer, channelId, sendbytes, ((char*)sendBuff)+sendOffset), ret, group_cleanup); + NCCLCHECKGOTO(scheduleSend(comm, sendPeer, chunk, sendbytes, ((char*)sendBuff)+sendOffset), ret, group_cleanup); } recvOffset += recvChunkSize; sendOffset += sendChunkSize; diff --git a/src/include/channel.h b/src/include/channel.h index e2da325..dc1536a 100644 --- a/src/include/channel.h +++ b/src/include/channel.h @@ -10,5 +10,36 @@ ncclResult_t initChannel(struct ncclComm* comm, int channelid); ncclResult_t freeChannel(struct ncclChannel* channel, int nRanks); +static ncclResult_t ncclChannelComputeBase(struct ncclComm* comm, int peer, int coll, int*channelBase) { + int p2pGroupSize = NCCL_MAX_WORK_ELEMENTS_P2P/2; + int peerNode = comm->rankToNode[peer]; + int peerIndex = comm->rankToLocalRank[peer]; + int nsteps = comm->maxLocalRanks; + int rankIndex = comm->rankToLocalRank[comm->rank]; + int step, delta; + if (coll == ncclFuncSend) { + step = (nsteps + peerIndex - rankIndex)%nsteps; + delta = (comm->nNodes + peerNode - comm->node) % comm->nNodes; + } else if (coll == ncclFuncRecv) { + step = (nsteps + rankIndex - peerIndex)%nsteps; + delta = (comm->nNodes + comm->node - peerNode) % comm->nNodes; + } else { + return ncclInternalError; + } + *channelBase = comm->nNodes > 1 ? delta+(step/p2pGroupSize) : step; + return ncclSuccess; +} + +static ncclResult_t ncclChannelComputeFromBase(struct ncclComm* comm, int base, int channelInc, int*channelId) { + *channelId = (base+comm->p2pChannels[channelInc]) % comm->p2pnChannels; + return ncclSuccess; +} + +static ncclResult_t ncclChannelCompute(struct ncclComm* comm, int peer, int channelInc, int coll, int*channelId) { + int base; + NCCLCHECK(ncclChannelComputeBase(comm, peer, coll, &base)); + NCCLCHECK(ncclChannelComputeFromBase(comm, base, channelInc, channelId)); + return ncclSuccess; +} #endif diff --git a/src/init.cc b/src/init.cc index 29bfa01..c6b6e8f 100644 --- a/src/init.cc +++ b/src/init.cc @@ -823,18 +823,17 @@ collnet_cleanup: NCCLCHECK(ncclTopoGetNvbGpus(comm->topo, comm->rank, &nvbNpeers, &nvbPeers)); for (int r=0; rnRanks + (comm->rank-peer)) % comm->nRanks; + int channelId; for (int c=0; cp2pnChannelsPerPeer; c++) { - int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels; - if (comm->channels[channelId].peers[peer].recv[1].connected == 0) { // P2P uses only 1 connector - comm->connectRecv[peer] |= (1<channels[channelId].peers[peer].send[1].connected == 0) { + comm->connectSend[peer] |= (1<nRanks - (comm->rank-peer)) % comm->nRanks; for (int c=0; cp2pnChannelsPerPeer; c++) { - int channelId = (delta+comm->p2pChannels[c]) % comm->p2pnChannels; - if (comm->channels[channelId].peers[peer].send[1].connected == 0) { // P2P uses only 1 connector - comm->connectSend[peer] |= (1<channels[channelId].peers[peer].recv[1].connected == 0) { + comm->connectRecv[peer] |= (1<proxyState.listenSock->fd; pollfds[NCCL_MAX_LOCAL_RANKS].events = POLLIN; @@ -1066,13 +1064,13 @@ void* ncclProxyService(void* _args) { ncclResult_t ncclProxyInit(struct ncclComm* comm, struct ncclSocket* sock, union ncclSocketAddress* peerAddresses) { comm->proxyState.listenSock = sock; comm->proxyState.peerAddresses = peerAddresses; - ncclSetThreadName(comm->proxyState.thread, "NCCL Service %2d", comm->cudaDev); return ncclSuccess; } ncclResult_t ncclProxyCreate(struct ncclComm* comm) { // comm->proxyState.thread is pthread_join()'d by commFree() in init.cc pthread_create(&comm->proxyState.thread, NULL, ncclProxyService, comm); + ncclSetThreadName(comm->proxyState.thread, "NCCL Service %2d", comm->cudaDev); return ncclSuccess; } diff --git a/src/transport/net_ib.cc b/src/transport/net_ib.cc index 26b47be..d3d4f9a 100644 --- a/src/transport/net_ib.cc +++ b/src/transport/net_ib.cc @@ -223,8 +223,8 @@ ncclResult_t ncclIbInit(ncclDebugLogger_t logFunction) { ncclIbDevs[ncclNIbDevs].mrCache.slots = NULL; pthread_create(&ncclIbAsyncThread, NULL, ncclIbAsyncThreadMain, context); - pthread_detach(ncclIbAsyncThread); // will not be pthread_join()'d ncclSetThreadName(ncclIbAsyncThread, "NCCL IbAsync %2d", ncclNIbDevs); + pthread_detach(ncclIbAsyncThread); // will not be pthread_join()'d ncclNIbDevs++; nPorts++; } diff --git a/src/transport/p2p.cc b/src/transport/p2p.cc index e71e157..9859c87 100644 --- a/src/transport/p2p.cc +++ b/src/transport/p2p.cc @@ -127,6 +127,7 @@ ncclResult_t p2pCanConnect(int* ret, struct ncclTopoSystem* topo, struct ncclTop // Setting this to non zero causes P2P to use Reads rather than Writes NCCL_PARAM(P2pReadEnable, "P2P_READ_ENABLE", -2); +NCCL_PARAM(P2pDirectDisable, "P2P_DIRECT_DISABLE", 0); static ncclResult_t p2pGetInfo(struct ncclTopoSystem* topo, struct ncclPeerInfo* info1, struct ncclPeerInfo* info2, int* read, int* intermediateRank) { int p2p; @@ -185,7 +186,7 @@ ncclResult_t p2pSendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, st if (intermediateRank == -1) { info->rank = myInfo->rank; if (myInfo->pidHash == peerInfo->pidHash) { - send->conn.direct |= info->read ? NCCL_DIRECT_READ : NCCL_DIRECT_WRITE; + if (ncclParamP2pDirectDisable() == 0) send->conn.direct |= info->read ? NCCL_DIRECT_READ : NCCL_DIRECT_WRITE; INFO(NCCL_INIT|NCCL_P2P, "Channel %02d : %d[%lx] -> %d[%lx] via P2P/direct pointer%s", channelId, myInfo->rank, myInfo->busId, peerInfo->rank, peerInfo->busId, useReadStr); } else { @@ -230,7 +231,7 @@ ncclResult_t p2pRecvSetup(struct ncclComm* comm, struct ncclTopoGraph* graph, st if (intermediateRank == -1) { info->rank = myInfo->rank; if (myInfo->pidHash == peerInfo->pidHash) { - recv->conn.direct |= info->read ? NCCL_DIRECT_READ : NCCL_DIRECT_WRITE; + if (ncclParamP2pDirectDisable() == 0) recv->conn.direct |= info->read ? NCCL_DIRECT_READ : NCCL_DIRECT_WRITE; } else { recv->conn.direct |= info->read ? NCCL_IPC_READ : NCCL_IPC_WRITE; }