2.12.10-1

Fix bug with CollNet
Fix bug with zero-bytes send/recv operations
Fix NCCL_PARAM implementation to avoid taking a lock on every call
Fix bug when setting NCCL_IB_QPS_PER_CONNECTION to more than one.
Improve error reporting for network errors.
This commit is contained in:
Sylvain Jeaugey 2022-03-30 02:25:49 -07:00
parent 44eb40da0e
commit 353e8ba446
16 changed files with 203 additions and 114 deletions

View File

@ -1,6 +1,6 @@
##### version
NCCL_MAJOR := 2
NCCL_MINOR := 12
NCCL_PATCH := 7
NCCL_PATCH := 10
NCCL_SUFFIX :=
PKG_REVISION := 1

View File

@ -10,7 +10,7 @@ include ../makefiles/version.mk
##### src files
INCEXPORTS := nccl.h nccl_net.h
LIBSRCFILES := init.cc channel.cc bootstrap.cc transport.cc enqueue.cc group.cc debug.cc proxy.cc enhcompat.cc net.cc \
misc/nvmlwrap.cc misc/ibvwrap.cc misc/gdrwrap.cc misc/utils.cc misc/argcheck.cc misc/socket.cc misc/shmutils.cc misc/profiler.cc \
misc/nvmlwrap.cc misc/ibvwrap.cc misc/gdrwrap.cc misc/utils.cc misc/argcheck.cc misc/socket.cc misc/shmutils.cc misc/profiler.cc misc/param.cc \
transport/p2p.cc transport/shm.cc transport/net.cc transport/net_socket.cc transport/net_ib.cc transport/coll_net.cc \
collectives/sendrecv.cc collectives/all_reduce.cc collectives/all_gather.cc collectives/broadcast.cc collectives/reduce.cc collectives/reduce_scatter.cc \
graph/topo.cc graph/paths.cc graph/search.cc graph/connect.cc graph/rings.cc graph/trees.cc graph/tuning.cc graph/xml.cc

View File

@ -125,6 +125,19 @@ error:
return (res != ncclSuccess) ? 0 : max;
}
// Set shared memory carveout for the nccl kernels
ncclResult_t ncclKernSetSharedMemoryCarveout(int carveOut) {
ncclResult_t res = ncclSuccess;
int numNcclKerns = sizeof(ncclKerns)/sizeof(ncclKerns[0]);
for (int i = 0; i < numNcclKerns; i++) {
CUDACHECKGOTO(cudaFuncSetAttribute(ncclKerns[i], cudaFuncAttributePreferredSharedMemoryCarveout, carveOut), res, error);
}
error:
return res;
}
/*****************************************************************************/
/* Launch system : synchronization and CUDA kernel launch */
/*****************************************************************************/
@ -705,17 +718,6 @@ static ncclResult_t ncclSetupCollKernel(struct ncclInfo* info) {
params->blockDim.x = std::max<unsigned>(params->blockDim.x, info->nThreads);
comm->enqueueInfo->maxChannels = params->gridDim.x; // params may be varied by a second graph hence we need to capture it here
// Register and exchange input and output buffers
if (comm->usingCudaGraph && // only in CUDA graph mode
comm->graphRegister == 1 && // when registration is enabled
info->algorithm == NCCL_ALGO_COLLNET && // limited to CollNet for now
comm->intraHighestTransportType == TRANSPORT_P2P && // only when all ranks can p2p each other
comm->intraRanks == 1) { // only in multi-process mode
NCCLCHECK(ncclRegBuffAndExchange(info, &eqElem->buffRegInfo));
comm->enqueueInfo->nRegBuffs += eqElem->buffRegInfo.nBuffs;
work->header.type = ncclWorkTypeRegColl;
}
// Inline the first kernel
if (params->func == NULL) {
params->func = ncclKerns[work->header.funcIndex];
@ -728,6 +730,20 @@ static ncclResult_t ncclSetupCollKernel(struct ncclInfo* info) {
}
}
// Register and exchange input and output buffers
if (comm->usingCudaGraph && // only in CUDA graph mode
comm->graphRegister == 1 && // when registration is enabled
info->algorithm == NCCL_ALGO_COLLNET && // limited to CollNet for now
comm->intraHighestTransportType == TRANSPORT_P2P && // only when all ranks can p2p each other
comm->intraRanks == 1) { // only in multi-process mode
NCCLCHECK(ncclRegBuffAndExchange(info, &eqElem->buffRegInfo));
comm->enqueueInfo->nRegBuffs += eqElem->buffRegInfo.nBuffs;
work->header.type = ncclWorkTypeRegColl;
// Disable inline argument because we need kernel to copy the entire ncclWork from workFifo
// because the registered addresses are in ncclWorkElemReg
comm->args.header.type = ncclWorkTypeUnused;
}
return ncclSuccess;
}

View File

@ -274,8 +274,8 @@ sched_delta:
if (sendbytes > sendChunkSize) { sendbytes = sendChunkSize; } else { sendRemaining = 0; }
// 0-bytes send/recv are considered as syncs. Make sure we only add syncs when requested
// (total size == 0), otherwise set size to -1.
if (sendbytes <= 0 && totSendBytes != 0) send = NULL;
if (recvbytes <= 0 && totRecvBytes != 0) recv = NULL;
if (sendbytes < 0 || (sendbytes == 0 && totSendBytes != 0)) send = NULL;
if (recvbytes < 0 || (recvbytes == 0 && totRecvBytes != 0)) recv = NULL;
if (recv) {
NCCLCHECKGOTO(scheduleRecv(comm, recvPeer, channelId, recvbytes, ((char*)recvBuff)+recvOffset), ret, group_cleanup);
}

View File

@ -15,6 +15,7 @@
#define NCCL_AGG_CHANNEL_SIZE (1LL << 21) /* 2 MiB, ideal per-channel size to fully utilize bandwidth */
size_t ncclKernMaxLocalSize();
ncclResult_t ncclKernSetSharedMemoryCarveout(int carveOut);
ncclResult_t ncclEnqueueCheck(struct ncclInfo* info);
ncclResult_t ncclCpuBarrierIn(struct ncclComm* comm, int* isLast);
ncclResult_t ncclCpuBarrierLast(struct ncclComm* comm);

View File

@ -7,77 +7,23 @@
#ifndef NCCL_PARAM_H_
#define NCCL_PARAM_H_
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pwd.h>
#include <stdint.h>
static const char* userHomeDir() {
struct passwd *pwUser = getpwuid(getuid());
return pwUser == NULL ? NULL : pwUser->pw_dir;
}
const char* userHomeDir();
void setEnvFile(const char* fileName);
void initEnv();
static void setEnvFile(const char* fileName) {
FILE * file = fopen(fileName, "r");
if (file == NULL) return;
void ncclLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int64_t* cache);
char *line = NULL;
char envVar[1024];
char envValue[1024];
size_t n = 0;
ssize_t read;
while ((read = getline(&line, &n, file)) != -1) {
if (line[read-1] == '\n') line[read-1] = '\0';
int s=0; // Env Var Size
while (line[s] != '\0' && line[s] != '=') s++;
if (line[s] == '\0') continue;
strncpy(envVar, line, std::min(1023,s));
envVar[s] = '\0';
s++;
strncpy(envValue, line+s, 1023);
envValue[1023]='\0';
setenv(envVar, envValue, 0);
//printf("%s : %s->%s\n", fileName, envVar, envValue);
#define NCCL_PARAM(name, env, deftVal) \
int64_t ncclParam##name() { \
constexpr int64_t uninitialized = INT64_MIN; \
static_assert(deftVal != uninitialized, "default value cannot be the uninitialized value."); \
static int64_t cache = uninitialized; \
if (__builtin_expect(__atomic_load_n(&cache, __ATOMIC_RELAXED) == uninitialized, false)) { \
ncclLoadParam("NCCL_" env, deftVal, uninitialized, &cache); \
} \
return cache; \
}
if (line) free(line);
fclose(file);
}
static void initEnv() {
char confFilePath[1024];
const char * userDir = userHomeDir();
if (userDir) {
sprintf(confFilePath, "%s/.nccl.conf", userDir);
setEnvFile(confFilePath);
}
sprintf(confFilePath, "/etc/nccl.conf");
setEnvFile(confFilePath);
}
#define NCCL_PARAM(name, env, default_value) \
pthread_mutex_t ncclParamMutex##name = PTHREAD_MUTEX_INITIALIZER; \
int64_t ncclParam##name() { \
static_assert(default_value != -1LL, "default value cannot be -1"); \
static int64_t value = -1LL; \
pthread_mutex_lock(&ncclParamMutex##name); \
if (value == -1LL) { \
value = default_value; \
char* str = getenv("NCCL_" env); \
if (str && strlen(str) > 0) { \
errno = 0; \
int64_t v = strtoll(str, NULL, 0); \
if (errno) { \
INFO(NCCL_ALL,"Invalid value %s for %s, using default %lu.", str, "NCCL_" env, value); \
} else { \
value = v; \
INFO(NCCL_ALL,"%s set by environment to %lu.", "NCCL_" env, value); \
} \
} \
} \
pthread_mutex_unlock(&ncclParamMutex##name); \
return value; \
}
#endif

View File

@ -44,7 +44,7 @@ struct ncclSocket {
enum ncclSocketState state;
};
const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf);
const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf, const int numericHostForm = 1);
ncclResult_t ncclGetSocketAddrFromString(union ncclSocketAddress* ua, const char* ip_port_pair);
int ncclFindInterfaceMatchSubnet(char* ifNames, union ncclSocketAddress* localAddrs, union ncclSocketAddress* remoteAddr, int ifNameMaxSize, int maxIfs);
int ncclFindInterfaces(char* ifNames, union ncclSocketAddress *ifAddrs, int ifNameMaxSize, int maxIfs);

View File

@ -59,7 +59,8 @@ ncclResult_t initGdrCopy() {
return ncclSuccess;
}
NCCL_PARAM(CollNetEnable, "COLLNET_ENABLE", 0);
NCCL_PARAM(L1SharedMemoryCarveout, "L1_SHARED_MEMORY_CARVEOUT", 0);
pthread_mutex_t initLock = PTHREAD_MUTEX_INITIALIZER;
static bool initialized = false;
@ -71,6 +72,8 @@ static ncclResult_t ncclInit() {
initEnv();
initGdrCopy();
maxLocalSizeBytes = ncclKernMaxLocalSize();
int carveout = ncclParamL1SharedMemoryCarveout();
if (carveout) ncclKernSetSharedMemoryCarveout(carveout);
NCCLCHECK(ncclNetInit());
INFO(NCCL_INIT, "Using network %s", ncclNetName());
initialized = true;
@ -529,7 +532,16 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
}
// Determine local CollNet support before all-gather
if (ncclParamCollNetEnable() == 1 && collNetSupport() == 1 && collNetGraph.nChannels > 0) comm->collNetSupport = 1;
if (collNetSupport()) {
char *collNetEnable = getenv("NCCL_COLLNET_ENABLE");
if (collNetEnable != NULL) {
INFO(NCCL_ALL, "NCCL_COLLNET_ENABLE set by environment to %s.", collNetEnable);
if (strcmp(collNetEnable, "1") == 0) {
comm->collNetSupport = 1;
}
}
}
if (comm->collNetSupport == 1 && collNetGraph.nChannels <= 0) comm->collNetSupport = 0;
// AllGather3 - begin
struct ncclGraphInfo {
@ -832,7 +844,6 @@ collnet_cleanup:
// Connect to local net proxy
struct ncclProxyConnector proxyConn;
NCCLCHECK(ncclTopoGetLocalRank(comm->topo, comm->rank, &proxyConn.localRank));
NCCLCHECK(ncclProxyConnect(comm, TRANSPORT_NET, 1, comm->rank, &proxyConn));
NCCLCHECK(ncclProxyCall(&proxyConn, ncclProxyMsgSharedInit, &comm->p2pnChannels, sizeof(int), NULL, 0));

View File

@ -11,7 +11,7 @@ static ncclResult_t CudaPtrCheck(const void* pointer, struct ncclComm* comm, con
cudaPointerAttributes attr;
cudaError_t err = cudaPointerGetAttributes(&attr, pointer);
if (err != cudaSuccess || attr.devicePointer == NULL) {
WARN("%s : %s is not a valid pointer", opname, ptrname);
WARN("%s : %s %p is not a valid pointer", opname, ptrname, pointer);
return ncclInvalidArgument;
}
#if CUDART_VERSION >= 10000
@ -63,7 +63,8 @@ ncclResult_t ArgsCheck(struct ncclInfo* info) {
}
if (info->comm->checkPointers) {
if ((info->coll == ncclFuncSend || info->coll == ncclFuncRecv) && info->count > 0) {
if ((info->coll == ncclFuncSend || info->coll == ncclFuncRecv)) {
if (info->count >0)
NCCLCHECK(CudaPtrCheck(info->recvbuff, info->comm, "buff", info->opName));
} else {
// Check CUDA device pointers

81
src/misc/param.cc Normal file
View File

@ -0,0 +1,81 @@
/*************************************************************************
* Copyright (c) 2019-2022, NVIDIA CORPORATION. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "param.h"
#include "debug.h"
#include <algorithm>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include <pwd.h>
const char* userHomeDir() {
struct passwd *pwUser = getpwuid(getuid());
return pwUser == NULL ? NULL : pwUser->pw_dir;
}
void setEnvFile(const char* fileName) {
FILE * file = fopen(fileName, "r");
if (file == NULL) return;
char *line = NULL;
char envVar[1024];
char envValue[1024];
size_t n = 0;
ssize_t read;
while ((read = getline(&line, &n, file)) != -1) {
if (line[read-1] == '\n') line[read-1] = '\0';
int s=0; // Env Var Size
while (line[s] != '\0' && line[s] != '=') s++;
if (line[s] == '\0') continue;
strncpy(envVar, line, std::min(1023,s));
envVar[s] = '\0';
s++;
strncpy(envValue, line+s, 1023);
envValue[1023]='\0';
setenv(envVar, envValue, 0);
//printf("%s : %s->%s\n", fileName, envVar, envValue);
}
if (line) free(line);
fclose(file);
}
void initEnv() {
char confFilePath[1024];
const char * userDir = userHomeDir();
if (userDir) {
sprintf(confFilePath, "%s/.nccl.conf", userDir);
setEnvFile(confFilePath);
}
sprintf(confFilePath, "/etc/nccl.conf");
setEnvFile(confFilePath);
}
void ncclLoadParam(char const* env, int64_t deftVal, int64_t uninitialized, int64_t* cache) {
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&mutex);
if (__atomic_load_n(cache, __ATOMIC_RELAXED) == uninitialized) {
char* str = getenv(env);
int64_t value = deftVal;
if (str && strlen(str) > 0) {
errno = 0;
value = strtoll(str, nullptr, 0);
if (errno) {
value = deftVal;
INFO(NCCL_ALL,"Invalid value %s for %s, using default %lld.", str, env, (long long)deftVal);
} else {
INFO(NCCL_ALL,"%s set by environment to %lld.", env, (long long)value);
}
}
__atomic_store_n(cache, value, __ATOMIC_RELAXED);
}
pthread_mutex_unlock(&mutex);
}

View File

@ -16,12 +16,16 @@
*
* Output: "IPv4/IPv6 address<port>"
*/
const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf) {
const char *ncclSocketToString(union ncclSocketAddress *addr, char *buf, const int numericHostForm /*= 1*/) {
if (buf == NULL || addr == NULL) return NULL;
struct sockaddr *saddr = &addr->sa;
if (saddr->sa_family != AF_INET && saddr->sa_family != AF_INET6) { buf[0]='\0'; return buf; }
char host[NI_MAXHOST], service[NI_MAXSERV];
(void) getnameinfo(saddr, sizeof(union ncclSocketAddress), host, NI_MAXHOST, service, NI_MAXSERV, NI_NUMERICHOST|NI_NUMERICSERV);
/* NI_NUMERICHOST: If set, then the numeric form of the hostname is returned.
* (When not set, this will still happen in case the node's name cannot be determined.)
*/
int flag = NI_NUMERICSERV | (numericHostForm ? NI_NUMERICHOST : 0);
(void) getnameinfo(saddr, sizeof(union ncclSocketAddress), host, NI_MAXHOST, service, NI_MAXSERV, flag);
sprintf(buf, "%s<%s>", host, service);
return buf;
}
@ -516,7 +520,7 @@ ncclResult_t ncclSocketProgress(int op, struct ncclSocket* sock, void* ptr, int
NCCLCHECK(ncclSocketProgressOpt(op, sock, ptr, size, offset, 0, &closed));
if (closed) {
char line[SOCKET_NAME_MAXLEN+1];
WARN("Net : Connection closed by remote peer %s", ncclSocketToString(&sock->addr, line));
WARN("Net : Connection closed by remote peer %s", ncclSocketToString(&sock->addr, line, 0));
return ncclSystemError;
}
return ncclSuccess;

View File

@ -806,6 +806,7 @@ ncclResult_t ncclProxyCall(struct ncclProxyConnector* proxyConn, int type, void*
return ncclSuccess;
error:
WARN("Proxy Call to rank %d failed (%s)", proxyConn->comm->localRankToRank[proxyConn->localRank], ncclProxyMsgTypeStr[type]);
sock->fd = -1;
return ret;
}
@ -870,8 +871,6 @@ ncclResult_t ncclProxyShmUnlink(struct ncclComm* comm) {
static ncclResult_t proxyConnInit(struct ncclProxyLocalPeer* peer, struct ncclProxyConnectionPool* connectionPool, struct ncclComm* comm) {
struct ncclSocket* sock = &peer->sock;
char buf[SOCKET_NAME_MAXLEN+1];
buf[SOCKET_NAME_MAXLEN] = '\0';
int id;
struct ncclProxyConnection* connection;
NCCLCHECK(ncclProxyNewConnection(connectionPool, &id));
@ -889,8 +888,7 @@ static ncclResult_t proxyConnInit(struct ncclProxyLocalPeer* peer, struct ncclPr
struct ncclProxyProgressState* state = &comm->proxyState.progressState;
NCCLCHECK(ncclSocketSend(sock, state->opsPoolShmSuffix, sizeof("XXXXXX")-1));
}
buf[SOCKET_NAME_MAXLEN] = '\0';
INFO(NCCL_NET, "New proxy %s connection %d from %s, transport %d", connection->send ? "send":"recv", id, ncclSocketToString(&sock->addr, buf), connection->transport);
INFO(NCCL_NET, "New proxy %s connection %d from local rank %d, transport %d", connection->send ? "send":"recv", id, connection->localRank, connection->transport);
return ncclSuccess;
}

View File

@ -217,6 +217,9 @@ static ncclResult_t sendConnect(struct ncclComm* comm, struct ncclConnect* conne
struct connectMap* map;
NCCLCHECK(ncclProxyCall(&send->proxyConn, ncclProxyMsgConnect, &args, sizeof(struct collNetConnectArgs), &map, sizeof(struct connectMap*)));
// If collnet connect failed, propagate error to fallback on regular p2p
if (map == NULL) return ncclSystemError;
//NCCLCHECK(collNetDumpMap(map));
struct ncclSendMem *sendMem = (struct ncclSendMem*) NCCL_NET_MAP_GET_POINTER(map, gpu, sendMem);
@ -240,6 +243,9 @@ static ncclResult_t recvConnect(struct ncclComm* comm, struct ncclConnect* conne
struct connectMap* map;
NCCLCHECK(ncclProxyCall(&recv->proxyConn, ncclProxyMsgConnect, &args, sizeof(struct collNetConnectArgs), &map, sizeof(struct connectMap*)));
// If collnet connect failed, propagate error to fallback on regular p2p
if (map == NULL) return ncclSystemError;
//NCCLCHECK(collNetDumpMap(map));
struct ncclSendMem *sendMem = (struct ncclSendMem*) NCCL_NET_MAP_GET_POINTER(map, gpu, sendMem);
@ -309,12 +315,15 @@ static ncclResult_t sharedConnect(struct ncclComm* comm, int netDev, struct nccl
resources->collNetListenComms[netDev],
resources->collNetComms+netDev);
free(handlePtrs);
NCCLCHECK(ret);
if (ret == ncclSuccess) {
// Close listen comm
NCCLCHECK(collNetCloseListen(resources->collNetListenComms[netDev]));
} else {
resources->collNetListenComms[netDev] = NULL;
}
}
*collNetComm = resources->collNetComms[netDev];
resources->commRefCount[netDev]++;
if (*collNetComm) resources->commRefCount[netDev]++;
return ncclSuccess;
}
@ -400,6 +409,13 @@ static ncclResult_t sendProxyConnect(struct ncclProxyConnection* connection, str
resources->recvMhandles[p] = info->mhandles[p];
NCCLCHECK(sharedConnect(comm, resources->netDev, args->connectInfos, args->nranks, args->rank, &resources->collNetComm));
// Collnet connect is allowed to fail. Gracefully handle that case by returning NULL to the caller.
if (respSize != sizeof(struct connectMap*)) { WARN("sendProxyConnect: respSize is %d != %ld\n", respSize, sizeof(void*)); return ncclInternalError; }
if (resources->collNetComm == NULL) {
*((struct connectMap**)respBuff) = NULL;
return ncclSuccess;
}
connection->proxyAppendPtr = comm->proxyState.progressState.collNet.proxyAppend+2*resources->netDev;
struct connectMap* map = &resources->map;
@ -435,7 +451,6 @@ static ncclResult_t sendProxyConnect(struct ncclProxyConnection* connection, str
resources->useGdr ? NCCL_PTR_CUDA : NCCL_PTR_HOST,
&resources->sendMhandles[NCCL_PROTO_SIMPLE]));
if (respSize != sizeof(struct connectMap*)) { WARN("sendProxyConnect: respSize is %d != %ld\n", respSize, sizeof(void*)); return ncclInternalError; }
*((struct connectMap**)respBuff) = &resources->map;
return ncclSuccess;
}
@ -449,6 +464,13 @@ static ncclResult_t recvProxyConnect(struct ncclProxyConnection* connection, str
resources->collNetRank = args->rank;
NCCLCHECK(sharedConnect(comm, resources->netDev, args->connectInfos, args->nranks, args->rank, &resources->collNetComm));
// Collnet connect is allowed to fail. Gracefully handle that case by returning NULL to the caller.
if (respSize != sizeof(struct connectMap*)) { WARN("sendProxyConnect: respSize is %d != %ld\n", respSize, sizeof(void*)); return ncclInternalError; }
if (resources->collNetComm == NULL) {
*((struct connectMap**)respBuff) = NULL;
return ncclSuccess;
}
connection->proxyAppendPtr = comm->proxyState.progressState.collNet.proxyAppend+2*resources->netDev+1;
struct connectMap* map = &resources->map;
@ -743,7 +765,7 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg
int offset;
NCCLCHECK(sharedBuffersGet(comm, 1, sharedBuffSlot, startChannel, &offset));
volatile int* offsFifo = (volatile int*)resources->recvMem->offsFifo;
offsFifo[buffSlot] = offset;
offsFifo[buffSlot] = offset + (s%COLLNET_GROUP_NSUBS)*args->chunkSize;
__sync_synchronize();
volatile uint64_t* recvTail = resources->gdcSync ? resources->gdcSync : &resources->recvMem->tail;
*recvTail = sub->base + sub->flushed;

View File

@ -130,7 +130,7 @@ struct recvResources {
uint64_t llLastCleaning;
};
NCCL_PARAM(NetDisableIntra, "NET_DISABLE_INTRA", -2);
NCCL_PARAM(NetDisableIntra, "NET_DISABLE_INTRA", 0);
/* Determine if two peers can communicate with NET */
static ncclResult_t canConnect(int* ret, struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* info1, struct ncclPeerInfo* info2) {

View File

@ -43,7 +43,7 @@ struct ncclIbMrCache {
};
static int ncclNIbDevs = -1;
struct ncclIbDev {
struct alignas(64) ncclIbDev {
pthread_mutex_t lock;
int device;
uint64_t guid;
@ -841,7 +841,7 @@ ncclResult_t ncclIbRegMr(void* comm, void* data, int size, int type, void** mhan
else {
NCCLCHECKGOTO(wrap_ibv_reg_mr(&mr, verbs->pd, (void*)addr, pages*pageSize, flags), res, returning);
}
TRACE(NCCL_INIT,"regAddr %llx size %lld rkey %x", (unsigned long long)addr, (long long)pages*PageSize, mr->rkey);
TRACE(NCCL_INIT,"regAddr %llx size %lld rkey %x", (unsigned long long)addr, (long long)pages*pageSize, mr->rkey);
cache->population += 1;
cache->slots[slot].addr = addr;
cache->slots[slot].pages = pages;
@ -940,9 +940,11 @@ ncclResult_t ncclIbMultiSend(struct ncclIbSendComm* comm, int slot) {
lastWr->next = NULL;
lastWr->send_flags = IBV_SEND_SIGNALED;
// Multi-QP: make sure IB writes are multiples of 128B so that LL and LL128 protocols still work
const int align = 128;
for (int q=0; q<comm->nqps; q++) {
for (int r=0; r<nreqs; r++) {
int chunkSize = std::max(8, DIVUP(reqs[r]->send.size, comm->nqps));
int chunkSize = DIVUP(DIVUP(reqs[r]->send.size, comm->nqps), align) * align;
int length = std::min(reqs[r]->send.size-reqs[r]->send.offset, chunkSize);
if (length <= 0) {
comm->wrs[r].sg_list = NULL;
@ -957,7 +959,7 @@ ncclResult_t ncclIbMultiSend(struct ncclIbSendComm* comm, int slot) {
NCCLCHECK(wrap_ibv_post_send(comm->qps[q], comm->wrs, &bad_wr));
for (int r=0; r<nreqs; r++) {
int chunkSize = std::max(8, DIVUP(reqs[r]->send.size, comm->nqps));
int chunkSize = DIVUP(DIVUP(reqs[r]->send.size, comm->nqps), align) * align;
reqs[r]->send.offset += chunkSize;
comm->sges[r].addr += chunkSize;
comm->wrs[r].wr.rdma.remote_addr += chunkSize;
@ -991,11 +993,16 @@ ncclResult_t ncclIbIsend(void* sendComm, void* data, int size, int tag, void* mh
if (reqs[r] != NULL || slots[r].tag != tag) continue;
// Sanity checks to catch user collective call count/size mismatches
// plus any potential programming errors
if (size > slots[r].size || slots[r].size < 0 || slots[r].addr == 0 || slots[r].rkey == 0) {
if (size > slots[r].size) {
char line[SOCKET_NAME_MAXLEN+1];
WARN("NET/IB : req %d/%d tag %x peer %s collective mismatch error local size %d remote %d addr %lx rkey %x",
r, nreqs, tag, ncclSocketToString(&comm->sock.addr, line), size, slots[r].size, slots[r].addr, slots[r].rkey);
WARN("NET/IB : req %d/%d tag %x peer %s collective mismatch error, local size %d remote size %d",
r, nreqs, tag, ncclSocketToString(&comm->sock.addr, line), size, slots[r].size);
return ncclInvalidUsage;
} // plus any potential programming errors
else if (slots[r].size < 0 || slots[r].addr == 0 || slots[r].rkey == 0) {
char line[SOCKET_NAME_MAXLEN+1];
WARN("NET/IB : req %d/%d tag %x peer %s posted incorrect receive info: size %d addr %lx rkey %x",
r, nreqs, tag, ncclSocketToString(&comm->sock.addr, line), slots[r].size, slots[r].addr, slots[r].rkey);
return ncclInternalError;
}
struct ncclIbRequest* req;

View File

@ -500,8 +500,10 @@ ncclResult_t ncclSocketTest(void* request, int* done, int* size) {
// Check size is less or equal to the size provided by the user
if (r->op == NCCL_SOCKET_RECV && data > r->size) {
char line[SOCKET_NAME_MAXLEN+1];
WARN("NET/Socket : peer %s message truncated : receiving %d bytes instead of %d", ncclSocketToString(&r->ctrlSock->addr, line), data, r->size);
return ncclInternalError;
WARN("NET/Socket : peer %s message truncated : receiving %d bytes instead of %d. If you believe your socket network is in healthy state, \
there may be a mismatch in collective sizes or environment settings (e.g. NCCL_PROTO, NCCL_ALGO) between ranks",
ncclSocketToString(&r->ctrlSock->addr, line), data, r->size);
return ncclInvalidUsage;
}
r->size = data;
r->offset = 0;