Add support for inter-node communication using sockets and InfiniBand/RoCE. Improve latency. Add support for aggregation. Improve LL/regular tuning. Remove tests as those are now at github.com/nvidia/nccl-tests .
188 lines
7.0 KiB
Plaintext
188 lines
7.0 KiB
Plaintext
/*************************************************************************
|
|
* Copyright (c) 2016-2018, NVIDIA CORPORATION. All rights reserved.
|
|
*
|
|
* See LICENSE.txt for license information
|
|
************************************************************************/
|
|
|
|
#include "core.h"
|
|
#include "common_coll.h"
|
|
|
|
extern struct ncclTransport p2pTransport;
|
|
extern struct ncclTransport shmTransport;
|
|
extern struct ncclTransport netTransport;
|
|
|
|
struct ncclTransport ncclTransports[NTRANSPORTS] = {
|
|
p2pTransport,
|
|
shmTransport,
|
|
netTransport,
|
|
};
|
|
|
|
static void FifoPullArgs(struct transportProxyInfo* info, struct ncclProxyArgs *args) {
|
|
struct ncclProxyArgs *fifoArgs = info->argsFifo + (info->argsFifoHead % TRANSPORT_PROXY_FIFO_SIZE);
|
|
pthread_mutex_lock(&info->mutex);
|
|
while (fifoArgs->active == 0)
|
|
pthread_cond_wait(&info->cond, &info->mutex);
|
|
__sync_synchronize();
|
|
memcpy(args, fifoArgs, sizeof(struct ncclProxyArgs));
|
|
__sync_synchronize();
|
|
fifoArgs->active = 0;
|
|
pthread_cond_signal(&info->cond);
|
|
pthread_mutex_unlock(&info->mutex);
|
|
info->argsFifoHead++;
|
|
}
|
|
|
|
static struct ncclProxyArgs* FifoGetNextArgs(struct transportProxyInfo* info) {
|
|
if (info == NULL) return NULL;
|
|
struct ncclProxyArgs* fifoArgs = info->argsFifo + (info->argsFifoTail % TRANSPORT_PROXY_FIFO_SIZE);
|
|
pthread_mutex_lock(&info->mutex);
|
|
while (fifoArgs->active == 1)
|
|
pthread_cond_wait(&info->cond, &info->mutex);
|
|
pthread_mutex_unlock(&info->mutex);
|
|
info->argsFifoTail++;
|
|
return fifoArgs;
|
|
}
|
|
|
|
static void FifoPushArgs(struct transportProxyInfo* info) {
|
|
if (info == NULL) return;
|
|
|
|
struct ncclProxyArgs* fifoArgs = info->argsFifo + ((info->argsFifoTail-1) % TRANSPORT_PROXY_FIFO_SIZE);
|
|
if (fifoArgs->active == 0) return;
|
|
|
|
pthread_mutex_lock(&info->mutex);
|
|
pthread_cond_signal(&info->cond);
|
|
pthread_mutex_unlock(&info->mutex);
|
|
}
|
|
|
|
static void WaitProxyReady(struct transportProxyInfo* info) {
|
|
pthread_mutex_lock(&info->mutex);
|
|
while (info->proxyReady == 0)
|
|
pthread_cond_wait(&info->cond, &info->mutex);
|
|
pthread_mutex_unlock(&info->mutex);
|
|
}
|
|
|
|
static void SetProxyReady(struct transportProxyInfo* info) {
|
|
pthread_mutex_lock(&info->mutex);
|
|
info->proxyReady = 1;
|
|
pthread_cond_signal(&info->cond);
|
|
pthread_mutex_unlock(&info->mutex);
|
|
}
|
|
|
|
static void StopProxy(struct transportProxyInfo* info) {
|
|
struct ncclProxyArgs* fifoArgs = FifoGetNextArgs(info);
|
|
fifoArgs->active = -1;
|
|
FifoPushArgs(info);
|
|
}
|
|
|
|
#define RECV 0
|
|
#define SEND 1
|
|
|
|
static bool NeedProxy(int type, int pattern, struct ncclRing* ring, int nranks) {
|
|
enum proxyMode mode = proxyPatternMode(pattern);
|
|
if (mode == proxyRing) return true;
|
|
|
|
/* In chains, one rank does not need a proxy. Let's figure out which one it is */
|
|
int root = proxyPatternRoot(pattern);
|
|
// Which index in the reorganized rings should we compare root against */
|
|
const int myrank = 0, nextrank = 1, prevrank = nranks-1;
|
|
int index = mode == proxyFrom ?
|
|
/* no recv / no send if root = */
|
|
/* bcast */ (type == RECV ? myrank : nextrank ):
|
|
/* reduce */ (type == RECV ? prevrank : myrank );
|
|
int rank = ring->userRanks[index];
|
|
return (root != rank);
|
|
}
|
|
|
|
static void SaveProxy(struct ncclConnector* connector, struct ncclProxyArgs* args, int needProxy) {
|
|
struct transportProxyInfo* info = connector->proxyInfo;
|
|
if (info == NULL) return;
|
|
struct ncclProxyArgs* fifoArgs = FifoGetNextArgs(info);
|
|
args->needProxy = needProxy;
|
|
__sync_synchronize();
|
|
memcpy(fifoArgs, args, sizeof(struct ncclProxyArgs));
|
|
__sync_synchronize();
|
|
fifoArgs->active = 1;
|
|
}
|
|
|
|
ncclResult_t transportSaveProxies(int substeps, int subchunks, int nstepsPerRound, int nblocksPerRound, size_t nbytes, int pattern, struct ncclComm* comm) {
|
|
int llMode, nrings, nthreads;
|
|
ncclGetCollResource(comm, nbytes, &nrings, &nthreads, &llMode);
|
|
nbytes = llMode ? nbytes * 2 : nbytes;
|
|
substeps = llMode ? 1 : substeps;
|
|
subchunks = llMode ? NCCL_LL_CHUNKS : subchunks;
|
|
int buffSize = llMode ? NCCL_LL_BUFF_SIZE : comm->rings[0].buffSize;
|
|
|
|
int nrounds = (int)(DIVUP(nbytes, ((size_t)nrings * nblocksPerRound * (buffSize/subchunks)))); // Fixed 32-bit overflow
|
|
int nsteps = nstepsPerRound * nrounds * substeps;
|
|
TRACE(NET,"opCount %lx substeps %d subchunks %d nrounds %d nsteps %d comm %p", comm->opCount, subchunks, subchunks, nrounds, nsteps, comm);
|
|
TRACE(NET,"opCount %lx nbytes %zi nrings %d buffSize %d pattern %d comm %p", comm->opCount, nbytes, nrings, buffSize, pattern, comm);
|
|
for (int r=0; r<nrings; r++) {
|
|
struct ncclRing* ring = comm->rings+((comm->myParams->gridDim.x+r)%comm->nRings);
|
|
struct ncclProxyArgs args = { ring, substeps*subchunks, nsteps, comm->opCount, llMode, 0 };
|
|
SaveProxy(&ring->recv, &args, NeedProxy(RECV, pattern, ring, comm->nRanks));
|
|
SaveProxy(&ring->send, &args, NeedProxy(SEND, pattern, ring, comm->nRanks));
|
|
}
|
|
return ncclSuccess;
|
|
}
|
|
|
|
ncclResult_t transportStartProxies(ncclComm* comm) {
|
|
for (int r=0; r<comm->nRings; r++) {
|
|
FifoPushArgs(comm->rings[r].send.proxyInfo);
|
|
FifoPushArgs(comm->rings[r].recv.proxyInfo);
|
|
}
|
|
pthread_yield(); // Let other threads run
|
|
return ncclSuccess;
|
|
}
|
|
|
|
void* persistentThread(void *opaqueInfo) {
|
|
struct transportProxyInfo* info = (struct transportProxyInfo*)opaqueInfo;
|
|
// We need to initialize the context before launching any NCCL cuda kernel,
|
|
// otherwise we would create it during the first cudaMemcpyAsync inside the
|
|
// proxy function and that would cause a deadlock
|
|
cudaSetDevice(info->comm->cudaDev);
|
|
// Signal the main thread the context is created and it can proceed.
|
|
SetProxyReady(info);
|
|
while (1) {
|
|
struct ncclProxyArgs args;
|
|
FifoPullArgs(info, &args);
|
|
if (args.active == -1) {
|
|
// Main thread asked to stop
|
|
return NULL;
|
|
}
|
|
ncclResult_t res = info->func(&args);
|
|
if (res != ncclSuccess) {
|
|
WARN("%s:%d -> %d [Proxy thread error]", __FILE__, __LINE__, res);
|
|
}
|
|
}
|
|
}
|
|
|
|
ncclResult_t transportCreateProxy(int type, struct ncclRing* ring, struct ncclComm* comm) {
|
|
struct ncclConnector* connector = (type == RECV) ? &ring->recv : &ring->send;
|
|
threadFunc_t proxyfunc = (threadFunc_t) ((type == RECV) ? connector->transport->recv.proxy : connector->transport->send.proxy);
|
|
if (proxyfunc) {
|
|
TRACE(NET,"type %d ring %p proxyfunc %p comm %p", type, ring, proxyfunc, comm);
|
|
struct transportProxyInfo* info;
|
|
NCCLCHECK(ncclCalloc(&info, 1));
|
|
connector->proxyInfo = info;
|
|
info->comm = comm;
|
|
info->cond = PTHREAD_COND_INITIALIZER;
|
|
info->mutex = PTHREAD_MUTEX_INITIALIZER;
|
|
info->func = proxyfunc;
|
|
info->argsFifoHead = info->argsFifoTail = 0;
|
|
info->proxyReady = 0;
|
|
pthread_create(&connector->proxyInfo->thread, NULL, persistentThread, info);
|
|
// Wait for thread to initialize its CUDA context.
|
|
WaitProxyReady(info);
|
|
}
|
|
return ncclSuccess;
|
|
}
|
|
|
|
ncclResult_t transportDestroyProxy(struct ncclConnector* connector) {
|
|
if (connector->proxyInfo) {
|
|
StopProxy(connector->proxyInfo);
|
|
pthread_join(connector->proxyInfo->thread, NULL);
|
|
free(connector->proxyInfo);
|
|
connector->proxyInfo = NULL;
|
|
}
|
|
return ncclSuccess;
|
|
}
|