nccl/src/transport.cu
Sylvain Jeaugey f93fe9bfd9 2.3.5-5
Add support for inter-node communication using sockets and InfiniBand/RoCE.
Improve latency.
Add support for aggregation.
Improve LL/regular tuning.
Remove tests as those are now at github.com/nvidia/nccl-tests .
2018-09-25 14:12:01 -07:00

188 lines
7.0 KiB
Plaintext

/*************************************************************************
* Copyright (c) 2016-2018, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "core.h"
#include "common_coll.h"
extern struct ncclTransport p2pTransport;
extern struct ncclTransport shmTransport;
extern struct ncclTransport netTransport;
struct ncclTransport ncclTransports[NTRANSPORTS] = {
p2pTransport,
shmTransport,
netTransport,
};
static void FifoPullArgs(struct transportProxyInfo* info, struct ncclProxyArgs *args) {
struct ncclProxyArgs *fifoArgs = info->argsFifo + (info->argsFifoHead % TRANSPORT_PROXY_FIFO_SIZE);
pthread_mutex_lock(&info->mutex);
while (fifoArgs->active == 0)
pthread_cond_wait(&info->cond, &info->mutex);
__sync_synchronize();
memcpy(args, fifoArgs, sizeof(struct ncclProxyArgs));
__sync_synchronize();
fifoArgs->active = 0;
pthread_cond_signal(&info->cond);
pthread_mutex_unlock(&info->mutex);
info->argsFifoHead++;
}
static struct ncclProxyArgs* FifoGetNextArgs(struct transportProxyInfo* info) {
if (info == NULL) return NULL;
struct ncclProxyArgs* fifoArgs = info->argsFifo + (info->argsFifoTail % TRANSPORT_PROXY_FIFO_SIZE);
pthread_mutex_lock(&info->mutex);
while (fifoArgs->active == 1)
pthread_cond_wait(&info->cond, &info->mutex);
pthread_mutex_unlock(&info->mutex);
info->argsFifoTail++;
return fifoArgs;
}
static void FifoPushArgs(struct transportProxyInfo* info) {
if (info == NULL) return;
struct ncclProxyArgs* fifoArgs = info->argsFifo + ((info->argsFifoTail-1) % TRANSPORT_PROXY_FIFO_SIZE);
if (fifoArgs->active == 0) return;
pthread_mutex_lock(&info->mutex);
pthread_cond_signal(&info->cond);
pthread_mutex_unlock(&info->mutex);
}
static void WaitProxyReady(struct transportProxyInfo* info) {
pthread_mutex_lock(&info->mutex);
while (info->proxyReady == 0)
pthread_cond_wait(&info->cond, &info->mutex);
pthread_mutex_unlock(&info->mutex);
}
static void SetProxyReady(struct transportProxyInfo* info) {
pthread_mutex_lock(&info->mutex);
info->proxyReady = 1;
pthread_cond_signal(&info->cond);
pthread_mutex_unlock(&info->mutex);
}
static void StopProxy(struct transportProxyInfo* info) {
struct ncclProxyArgs* fifoArgs = FifoGetNextArgs(info);
fifoArgs->active = -1;
FifoPushArgs(info);
}
#define RECV 0
#define SEND 1
static bool NeedProxy(int type, int pattern, struct ncclRing* ring, int nranks) {
enum proxyMode mode = proxyPatternMode(pattern);
if (mode == proxyRing) return true;
/* In chains, one rank does not need a proxy. Let's figure out which one it is */
int root = proxyPatternRoot(pattern);
// Which index in the reorganized rings should we compare root against */
const int myrank = 0, nextrank = 1, prevrank = nranks-1;
int index = mode == proxyFrom ?
/* no recv / no send if root = */
/* bcast */ (type == RECV ? myrank : nextrank ):
/* reduce */ (type == RECV ? prevrank : myrank );
int rank = ring->userRanks[index];
return (root != rank);
}
static void SaveProxy(struct ncclConnector* connector, struct ncclProxyArgs* args, int needProxy) {
struct transportProxyInfo* info = connector->proxyInfo;
if (info == NULL) return;
struct ncclProxyArgs* fifoArgs = FifoGetNextArgs(info);
args->needProxy = needProxy;
__sync_synchronize();
memcpy(fifoArgs, args, sizeof(struct ncclProxyArgs));
__sync_synchronize();
fifoArgs->active = 1;
}
ncclResult_t transportSaveProxies(int substeps, int subchunks, int nstepsPerRound, int nblocksPerRound, size_t nbytes, int pattern, struct ncclComm* comm) {
int llMode, nrings, nthreads;
ncclGetCollResource(comm, nbytes, &nrings, &nthreads, &llMode);
nbytes = llMode ? nbytes * 2 : nbytes;
substeps = llMode ? 1 : substeps;
subchunks = llMode ? NCCL_LL_CHUNKS : subchunks;
int buffSize = llMode ? NCCL_LL_BUFF_SIZE : comm->rings[0].buffSize;
int nrounds = (int)(DIVUP(nbytes, ((size_t)nrings * nblocksPerRound * (buffSize/subchunks)))); // Fixed 32-bit overflow
int nsteps = nstepsPerRound * nrounds * substeps;
TRACE(NET,"opCount %lx substeps %d subchunks %d nrounds %d nsteps %d comm %p", comm->opCount, subchunks, subchunks, nrounds, nsteps, comm);
TRACE(NET,"opCount %lx nbytes %zi nrings %d buffSize %d pattern %d comm %p", comm->opCount, nbytes, nrings, buffSize, pattern, comm);
for (int r=0; r<nrings; r++) {
struct ncclRing* ring = comm->rings+((comm->myParams->gridDim.x+r)%comm->nRings);
struct ncclProxyArgs args = { ring, substeps*subchunks, nsteps, comm->opCount, llMode, 0 };
SaveProxy(&ring->recv, &args, NeedProxy(RECV, pattern, ring, comm->nRanks));
SaveProxy(&ring->send, &args, NeedProxy(SEND, pattern, ring, comm->nRanks));
}
return ncclSuccess;
}
ncclResult_t transportStartProxies(ncclComm* comm) {
for (int r=0; r<comm->nRings; r++) {
FifoPushArgs(comm->rings[r].send.proxyInfo);
FifoPushArgs(comm->rings[r].recv.proxyInfo);
}
pthread_yield(); // Let other threads run
return ncclSuccess;
}
void* persistentThread(void *opaqueInfo) {
struct transportProxyInfo* info = (struct transportProxyInfo*)opaqueInfo;
// We need to initialize the context before launching any NCCL cuda kernel,
// otherwise we would create it during the first cudaMemcpyAsync inside the
// proxy function and that would cause a deadlock
cudaSetDevice(info->comm->cudaDev);
// Signal the main thread the context is created and it can proceed.
SetProxyReady(info);
while (1) {
struct ncclProxyArgs args;
FifoPullArgs(info, &args);
if (args.active == -1) {
// Main thread asked to stop
return NULL;
}
ncclResult_t res = info->func(&args);
if (res != ncclSuccess) {
WARN("%s:%d -> %d [Proxy thread error]", __FILE__, __LINE__, res);
}
}
}
ncclResult_t transportCreateProxy(int type, struct ncclRing* ring, struct ncclComm* comm) {
struct ncclConnector* connector = (type == RECV) ? &ring->recv : &ring->send;
threadFunc_t proxyfunc = (threadFunc_t) ((type == RECV) ? connector->transport->recv.proxy : connector->transport->send.proxy);
if (proxyfunc) {
TRACE(NET,"type %d ring %p proxyfunc %p comm %p", type, ring, proxyfunc, comm);
struct transportProxyInfo* info;
NCCLCHECK(ncclCalloc(&info, 1));
connector->proxyInfo = info;
info->comm = comm;
info->cond = PTHREAD_COND_INITIALIZER;
info->mutex = PTHREAD_MUTEX_INITIALIZER;
info->func = proxyfunc;
info->argsFifoHead = info->argsFifoTail = 0;
info->proxyReady = 0;
pthread_create(&connector->proxyInfo->thread, NULL, persistentThread, info);
// Wait for thread to initialize its CUDA context.
WaitProxyReady(info);
}
return ncclSuccess;
}
ncclResult_t transportDestroyProxy(struct ncclConnector* connector) {
if (connector->proxyInfo) {
StopProxy(connector->proxyInfo);
pthread_join(connector->proxyInfo->thread, NULL);
free(connector->proxyInfo);
connector->proxyInfo = NULL;
}
return ncclSuccess;
}