Added support for more than 8 GPUs.

Change-Id: Iaa1841036a7bfdad6ebec99fed0adcd2bbe6ffad
Reviewed-on: http://git-master/r/935459
Reviewed-by: Cliff Woolley <jwoolley@nvidia.com>
Tested-by: Przemek Tredak <ptredak@nvidia.com>
This commit is contained in:
Nathan Luehr 2016-01-20 18:19:45 -08:00 committed by Przemek Tredak
parent 130ee246e2
commit 5966316771
8 changed files with 117 additions and 88 deletions

View File

@ -4,7 +4,7 @@ Optimized primitives for collective multi-GPU communication.
## Introduction
NCCL (pronounced "Nickel") is a stand-alone library of standard collective communication routines, such as all-gather, reduce, broadcast, etc., that have been optimized to achieve high bandwidth over PCIe. NCCL supports up to eight GPUs and can be used in either single- or multi-process (e.g., MPI) applications.
NCCL (pronounced "Nickel") is a stand-alone library of standard collective communication routines, such as all-gather, reduce, broadcast, etc., that have been optimized to achieve high bandwidth over PCIe. NCCL supports an arbitrary number of GPUs installed in a single node and can be used in either single- or multi-process (e.g., MPI) applications.
## What's inside

View File

@ -429,18 +429,18 @@ ncclResult_t ncclAllGatherWithType(const void* sendbuff, void* recvbuff,
args.NumChunks = (args.N + args.ChunkSize - 1) / args.ChunkSize;
}
args.ThisPtrToNextOutput = (T**)&(comm->local[nextId]->recvPtrs[0]);
args.PrevPtrToThisOutput = (T**)&(comm->remote[prevId]->recvPtrs[0]);
args.ThisPtrToNextOutput = (T**)&(comm->ptrs[nextId].local->recvPtrs[0]);
args.PrevPtrToThisOutput = (T**)&(comm->ptrs[prevId].remote->recvPtrs[0]);
args.ThisInput = (const T*)sendbuff;
args.ThisOutput = (volatile T*)recvbuff;
args.ThisBuffer = (volatile T*)comm->local[prevId]->buff;
args.NextBuffer = (volatile T*)comm->remote[nextId]->buff;
args.ThisBuffer = (volatile T*)comm->ptrs[prevId].local->buff;
args.NextBuffer = (volatile T*)comm->ptrs[nextId].remote->buff;
args.ThisNewDataAvailableFlag = comm->local[prevId]->flags;
args.NextNewDataAvailableFlag = comm->remote[nextId]->flags;
args.ThisChunkDoneFlag = comm->local[nextId]->flags + 1;
args.PrevChunkDoneFlag = comm->remote[prevId]->flags + 1;
args.ThisNewDataAvailableFlag = comm->ptrs[prevId].local->flags;
args.NextNewDataAvailableFlag = comm->ptrs[nextId].remote->flags;
args.ThisChunkDoneFlag = comm->ptrs[nextId].local->flags + 1;
args.PrevChunkDoneFlag = comm->ptrs[prevId].remote->flags + 1;
if( comm->useRemoteRecv ) {
AllGatherKernel<NUM_THREADS, UNROLL_COUNT, true, T>

View File

@ -1,5 +1,5 @@
/*************************************************************************
* Copyright (c) 2015, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -420,19 +420,19 @@ ncclResult_t ncclAllReduceWithTypeAndFunc(const void* sendbuff, void* recvbuff,
args.NumChunks = (args.N + args.ChunkSize - 1) / args.ChunkSize;
}
args.ThisPtrToNextOutput = (T**)&(comm->local[nextId]->recvPtrs[0]);
args.PrevPtrToThisOutput = (T**)&(comm->remote[prevId]->recvPtrs[0]);
args.ThisPtrToNextOutput = (T**)&(comm->ptrs[nextId].local->recvPtrs[0]);
args.PrevPtrToThisOutput = (T**)&(comm->ptrs[prevId].remote->recvPtrs[0]);
args.ThisInput = (const T*)sendbuff;
args.ThisOutput = (volatile T*)recvbuff;
args.ThisBuffer = (volatile T*)comm->local[prevId]->buff;
args.NextBuffer = (volatile T*)comm->remote[nextId]->buff;
args.ThisBuffer = (volatile T*)comm->ptrs[prevId].local->buff;
args.NextBuffer = (volatile T*)comm->ptrs[nextId].remote->buff;
args.ThisNewDataAvailableFlag = comm->local[prevId]->flags;
args.NextNewDataAvailableFlag = comm->remote[nextId]->flags;
args.ThisNewDataAvailableFlag = comm->ptrs[prevId].local->flags;
args.NextNewDataAvailableFlag = comm->ptrs[nextId].remote->flags;
args.ThisChunkDoneFlag = comm->local[nextId]->flags + 1;
args.PrevChunkDoneFlag = comm->remote[prevId]->flags + 1;
args.ThisChunkDoneFlag = comm->ptrs[nextId].local->flags + 1;
args.PrevChunkDoneFlag = comm->ptrs[prevId].remote->flags + 1;
if( comm->useRemoteRecv ) {
AllReduceKernel<NUM_THREADS, UNROLL_COUNT, FUNC, true, T>

View File

@ -1,5 +1,5 @@
/*************************************************************************
* Copyright (c) 2015, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -335,20 +335,20 @@ ncclResult_t ncclBcastWithType(void* buff, const int count, const int root,
// printf("sliceSize = %i, chunkSize = %i, numChunks = %i\n", args.SliceSize, args.ChunkSize, args.NumChunks);
args.ThisPtrToNextData = (T**)&(comm->local[nextId]->recvPtrs[0]);
args.PrevPtrToThisData = (T**)&(comm->remote[prevId]->recvPtrs[0]);
args.ThisPtrToNextData = (T**)&(comm->ptrs[nextId].local->recvPtrs[0]);
args.PrevPtrToThisData = (T**)&(comm->ptrs[prevId].remote->recvPtrs[0]);
args.ThisData = (T*)buff;
args.ThisBuffer = (volatile T*)comm->local[prevId]->buff;
args.NextBuffer = (volatile T*)comm->remote[nextId]->buff;
args.ThisBuffer = (volatile T*)comm->ptrs[prevId].local->buff;
args.NextBuffer = (volatile T*)comm->ptrs[nextId].remote->buff;
// we need 2 * NUM_SUBCHUNKS flags, so use the first NUM_SUBCHUNKS flags
// to signal the next GPU that new data is available and the following
// NUM_SUBCHUNKS to signal the previous GPU that a chunk is finished
args.ThisNewDataAvailableFlag = comm->local[prevId]->flags;
args.NextNewDataAvailableFlag = comm->remote[nextId]->flags;
args.ThisChunkDoneFlag = comm->local[nextId]->flags + 1;
args.PrevChunkDoneFlag = comm->remote[prevId]->flags + 1;
args.ThisNewDataAvailableFlag = comm->ptrs[prevId].local->flags;
args.NextNewDataAvailableFlag = comm->ptrs[nextId].remote->flags;
args.ThisChunkDoneFlag = comm->ptrs[nextId].local->flags + 1;
args.PrevChunkDoneFlag = comm->ptrs[prevId].remote->flags + 1;
if (comm->useRemoteRecv) {
if (index == (rootId + comm->nDev - 1) % comm->nDev) {

View File

@ -300,11 +300,11 @@ static ncclResult_t commClearMaps(ncclComm_t comm) {
cudaError_t cures;
for(int d=0; d<comm->nDev; ++d) {
switch(comm->cleanup[d].type) {
switch(comm->ptrs[d].remoteCleanup) {
case CLEANUP_NONE:
break;
case CLEANUP_CUIPC:
res = wrapCuIpcCloseMemHandle((CUdeviceptr)comm->cleanup[d].handle);
res = wrapCuIpcCloseMemHandle((CUdeviceptr)comm->ptrs[d].cleanupHandle);
if (res != ncclSuccess) {
WARN("rank %d failed to close IPC handle to rank %d",
comm->userFromRing[comm->ncclId], comm->userFromRing[d]);
@ -312,13 +312,13 @@ static ncclResult_t commClearMaps(ncclComm_t comm) {
}
break;
case CLEANUP_UNMAP:
cures = cudaHostUnregister(comm->cleanup[d].handle);
cures = cudaHostUnregister(comm->ptrs[d].cleanupHandle);
if (cures != cudaSuccess) {
WARN("rank %d failed to unregister handle to rank %d",
comm->userFromRing[comm->ncclId], comm->userFromRing[d]);
retval = (retval == ncclSuccess) ? ncclUnhandledCudaError : retval;
}
res = shmUnmap(comm->cleanup[d].handle, offsetof(ncclMem, buff) + comm->buffSize);
res = shmUnmap(comm->ptrs[d].cleanupHandle, offsetof(ncclMem, buff) + comm->buffSize);
if (res != ncclSuccess) {
WARN("rank %d failed to unmap handle to rank %d",
comm->userFromRing[comm->ncclId], comm->userFromRing[d]);
@ -326,16 +326,19 @@ static ncclResult_t commClearMaps(ncclComm_t comm) {
}
break;
default:
WARN("Unknown cleanup type %d", comm->cleanup[d].type);
WARN("Unknown cleanup type %d", comm->ptrs[d].remoteCleanup);
}
comm->cleanup[d].type = 0;
comm->cleanup[d].handle = NULL;
comm->ptrs[d].remoteCleanup = CLEANUP_NONE;
comm->ptrs[d].cleanupHandle = NULL;
}
memset(comm->userFromRing, 0, sizeof(int)*MAXPEERS);
memset(comm->ringFromUser, 0, sizeof(int)*MAXPEERS);
if (comm->userFromRing != NULL)
memset(comm->userFromRing, 0, sizeof(int)*comm->nDev);
if (comm->ringFromUser != NULL)
memset(comm->ringFromUser, 0, sizeof(int)*comm->nDev);
if (comm->devUserFromRing != NULL) {
cudaError_t err = cudaMemset(comm->devUserFromRing, 0, sizeof(int)*MAXPEERS);
cudaError_t err = cudaMemset(comm->devUserFromRing, 0, sizeof(int)*comm->nDev);
if (err != cudaSuccess) {
WARN("Faild to clear dev map: %s", cudaGetErrorString(err));
retval = (retval == ncclSuccess) ? ncclUnhandledCudaError : retval;
@ -387,7 +390,10 @@ static ncclResult_t commBuildMaps(ncclComm_t comm, ncclUniqueId* commId, int ran
pid_t myPid = ranks[myId].pid;
comm->useRemoteRecv = 1; // Assume we directly write to result ptrs.
for (int i=0; i<ndev; ++i) {
// The order that we link with peers must ensure that
// P2P slots are used for high-priority links first.
for (int j=0; j<ndev; ++j) {
int i = (myId - 1 + ndev + j) % ndev;
int iRank = ranks[i].rank;
int iDev = ranks[i].cudaDev;
pid_t iPid = ranks[i].pid;
@ -414,57 +420,59 @@ static ncclResult_t commBuildMaps(ncclComm_t comm, ncclUniqueId* commId, int ran
if (iPid == myPid && (canpeer || myDev == iDev)) {
INFO("rank access %d -> %d via P2P device mem", rank, iRank);
comm->local[i] = ranks[myId].devptr;
comm->remote[i] = ranks[i].devptr;
comm->cleanup[i].type = CLEANUP_NONE;
comm->ptrs[i].local = ranks[myId].devptr;
comm->ptrs[i].remote = ranks[i].devptr;
comm->ptrs[i].remoteCleanup = CLEANUP_NONE;
} else if (iPid == myPid) {
INFO("rank access %d -> %d via zero-copy host mem", rank, iRank);
comm->useRemoteRecv = 0;
if (cudaHostGetDevicePointer(comm->local+i, ranks[myId].hostptr, 0) != cudaSuccess) {
if (j <= 2) {
comm->useRemoteRecv = 0;
}
if (cudaHostGetDevicePointer(&comm->ptrs[i].local, ranks[myId].hostptr, 0) != cudaSuccess) {
WARN("rank %d failed to map zero copy buffer to device", rank);
commClearMaps(comm);
return ncclUnhandledCudaError;
}
if (cudaHostGetDevicePointer(comm->remote+i, ranks[i].hostptr, 0) != cudaSuccess) {
if (cudaHostGetDevicePointer(&comm->ptrs[i].remote, ranks[i].hostptr, 0) != cudaSuccess) {
WARN("rank %d failed to map %d's zero copy buffer to device", rank, iRank);
commClearMaps(comm);
return ncclUnhandledCudaError;
}
comm->cleanup[i].type = CLEANUP_NONE;
comm->ptrs[i].remoteCleanup = CLEANUP_NONE;
} else if (canpeer || myDev == iDev) {
INFO("rank access %d -> %d via Ipc P2P device mem", rank, iRank);
comm->useRemoteRecv = 0;
comm->local[i] = ranks[myId].devptr;
if (wrapCuIpcOpenMemHandle((CUdeviceptr*)(&comm->remote[i]),
comm->ptrs[i].local = ranks[myId].devptr;
if (wrapCuIpcOpenMemHandle((CUdeviceptr*)(&comm->ptrs[i].remote),
ranks[i].devipc, CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS) != ncclSuccess) {
WARN("rank %d failed to open Ipc handle to rank %d", rank, iRank);
commClearMaps(comm);
return ncclUnhandledCudaError;
}
comm->cleanup[i].type = CLEANUP_CUIPC;
comm->cleanup[i].handle = comm->remote[i];
comm->ptrs[i].remoteCleanup = CLEANUP_CUIPC;
comm->ptrs[i].cleanupHandle = comm->ptrs[i].remote;
} else {
INFO("rank access %d -> %d via zero copy host shm", rank, iRank);
comm->useRemoteRecv = 0;
if (cudaHostGetDevicePointer(comm->local+i, ranks[myId].hostptr, 0) != cudaSuccess) {
if (cudaHostGetDevicePointer(&comm->ptrs[i].local, ranks[myId].hostptr, 0) != cudaSuccess) {
WARN("rank %d failed to obtain dev ptr to sysmem buffer", rank);
commClearMaps(comm);
return ncclUnhandledCudaError;
}
char rankname[1024];
sprintf(rankname, "%s-%d", commId->internal, ranks[i].rank);
if (openHostMemShm(rankname, (ncclMem**)&comm->cleanup[i].handle, ranks[i].buffSize)
if (openHostMemShm(rankname, (ncclMem**)&comm->ptrs[i].cleanupHandle, ranks[i].buffSize)
!= ncclSuccess) {
WARN("rank %d failed to open sysmem buffer of rank %d", rank, iRank);
commClearMaps(comm);
return ncclUnhandledCudaError;
}
if (cudaHostGetDevicePointer(comm->remote+i, comm->cleanup[i].handle, 0) != cudaSuccess) {
if (cudaHostGetDevicePointer(&comm->ptrs[i].remote, comm->ptrs[i].cleanupHandle, 0) != cudaSuccess) {
WARN("rank %d failed to obtain dev ptr for rank %d", rank, iRank);
commClearMaps(comm);
return ncclUnhandledCudaError;
}
comm->cleanup[i].type = CLEANUP_UNMAP;
comm->ptrs[i].remoteCleanup = CLEANUP_UNMAP;
}
}
INFO("PushToRecv algos are %s\n", (comm->useRemoteRecv) ? "enabled" : "disabled");
@ -501,10 +509,16 @@ static void commFree(ncclComm_t comm) {
if (res != ncclSuccess)
INFO("failed to cleanup comm maps");
if (comm->userFromRing != NULL)
free(comm->userFromRing);
if (comm->devUserFromRing != NULL)
if (cudaFree(comm->devUserFromRing) != cudaSuccess)
INFO("commFree failed to free dev maps");
if (comm->ringFromUser != NULL)
free(comm->ringFromUser);
if (comm->devMem != NULL && cudaFree(comm->devMem) != cudaSuccess)
INFO("Failed to free devMap");
@ -524,8 +538,8 @@ static void commFree(ncclComm_t comm) {
}
static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, const ncclUniqueId* commId, int rank) {
if (ndev < 1 || ndev > MAXPEERS) {
WARN("requested device count (%d) exceeds maximum of %d", ndev, MAXPEERS);
if (ndev < 1) {
WARN("invalid device count (%d) requested", ndev);
return ncclUnsupportedDeviceCount;
}
if (rank >= ndev || rank < 0) {
@ -533,12 +547,13 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, const ncclUniqueId*
return ncclInvalidRank;
}
struct ncclComm* comm = (struct ncclComm*)malloc(sizeof(struct ncclComm));
size_t commBytes = offsetof(ncclComm, ptrs) + ndev*sizeof(ncclNodeRef);
struct ncclComm* comm = (struct ncclComm*)malloc(commBytes);
if (comm == NULL) {
WARN("comm allocation failed");
return ncclSystemError;
}
memset(comm, 0, sizeof(struct ncclComm));
memset(comm, 0, commBytes);
comm->nDev = ndev;
cudaGetDevice(&comm->cudaDev);
@ -565,12 +580,26 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, const ncclUniqueId*
commFree(comm);
return res;
}
if (cudaMalloc(&comm->devUserFromRing, MAXPEERS*sizeof(int)) != cudaSuccess) {
if (cudaMalloc(&comm->devUserFromRing, ndev*sizeof(int)) != cudaSuccess) {
WARN("rank %d failed to allocated device maps", rank);
commFree(comm);
return ncclCudaMallocFailed;
}
comm->userFromRing = (int*)malloc(ndev*sizeof(int));
if (comm->userFromRing == NULL) {
WARN("rank %d failed to allocate host maps", rank);
commFree(comm);
return ncclSystemError;
}
comm->ringFromUser = (int*)malloc(ndev*sizeof(int));
if (comm->ringFromUser == NULL) {
WARN("rank %d failed to allocate host maps", rank);
commFree(comm);
return ncclSystemError;
}
EventQueue* eq = &comm->events;
for(int i=0; i<MAXQUEUE; ++i) {
if (cudaEventCreateWithFlags(eq->isDone+i, cudaEventDisableTiming) != cudaSuccess) {

View File

@ -1,5 +1,5 @@
/*************************************************************************
* Copyright (c) 2015, NVIDIA CORPORATION. All rights reserved.
* Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
@ -33,7 +33,6 @@
#include <cstdio>
#include <cuda_runtime.h>
#define MAXPEERS 8 // Maximum number of devices.
#define MAXFLAGS 8
#define MAXQUEUE 4 // Maximum number of queued collectives per communicator.
#define DEFAULT_BUFFER_SIZE_BYTES (1UL << 21)
@ -67,6 +66,13 @@ struct ncclMem {
char buff[NCCL_MEM_PAD_ALIGN];
};
struct ncclNodeRef {
ncclMem* remote;
ncclMem* local;
int remoteCleanup;
void* cleanupHandle;
};
struct ncclComm {
int nDev; // number of devices in communicator
int cudaDev; // cuda device index
@ -77,29 +83,19 @@ struct ncclComm {
ncclMem* hostMem;
int hostMemState;
// Device-to-device communication structures to access remote or local device
// memory.
ncclMem* remote[MAXPEERS];
ncclMem* local[MAXPEERS];
struct {
int type;
void* handle;
} cleanup[MAXPEERS];
//int remoteCleanup[MAXPEERS]; // 0 is raw ptr, 1 is unregister/unmap, 2 is ipc close
// Placed between calling and internal device streams.
EventQueue events;
// Maps an internal nccl index to user-specified rank order. This is necessary
// since we need to know how the user expects data to be ordered across
// devices.
int userFromRing[MAXPEERS];
int* userFromRing;
// copy of the above stored on each device
int* devUserFromRing;
// Inverse of userFromRing. Maps user specified index to internal nccl index.
int ringFromUser[MAXPEERS];
int* ringFromUser;
// Size of temp buffer in bytes.
size_t buffSize;
@ -108,6 +104,10 @@ struct ncclComm {
// GPUs. In single process mode this can be used as long as QPI links are
// not present. In multi-process, we never push to a remote recvbuff.
int useRemoteRecv;
// Device-to-device communication structures to access remote or local device
// memory. Actual allocation larger than 1.
ncclNodeRef ptrs[1];
};
typedef enum {NONE=0, WARN=1, INFO=2, ABORT=3} DebugLevel;

View File

@ -324,19 +324,19 @@ ncclResult_t ncclReduceWithTypeAndFunc(const void* sendbuff, void* recvbuff,
args.NumChunks = (args.N + args.ChunkSize - 1) / args.ChunkSize;
}
args.ThisPtrToNextData = (T**)&(comm->local[nextId]->recvPtrs[0]);
args.PrevPtrToThisData = (T**)&(comm->remote[prevId]->recvPtrs[0]);
args.ThisPtrToNextData = (T**)&(comm->ptrs[nextId].local->recvPtrs[0]);
args.PrevPtrToThisData = (T**)&(comm->ptrs[prevId].remote->recvPtrs[0]);
args.Output = (T*)recvbuff;
args.ThisData = (const T*) sendbuff;
args.ThisBuffer = (volatile T*)comm->local[prevId]->buff;
args.NextBuffer = (volatile T*)comm->remote[nextId]->buff;
args.ThisBuffer = (volatile T*)comm->ptrs[prevId].local->buff;
args.NextBuffer = (volatile T*)comm->ptrs[nextId].remote->buff;
args.ThisNewDataAvailableFlag = comm->local[prevId]->flags;
args.NextNewDataAvailableFlag = comm->remote[nextId]->flags;
args.ThisNewDataAvailableFlag = comm->ptrs[prevId].local->flags;
args.NextNewDataAvailableFlag = comm->ptrs[nextId].remote->flags;
args.ThisChunkDoneFlag = comm->local[nextId]->flags + 1;
args.PrevChunkDoneFlag = comm->remote[prevId]->flags + 1;
args.ThisChunkDoneFlag = comm->ptrs[nextId].local->flags + 1;
args.PrevChunkDoneFlag = comm->ptrs[prevId].remote->flags + 1;
if (index == (rootId + 1) % comm->nDev) {
ReduceKernel<NUM_THREADS, UNROLL_COUNT, FUNC, BEGIN, T>

View File

@ -428,21 +428,21 @@ ncclResult_t ncclReduceScatterWithTypeAndFunc(const void* sendbuff,
args.NumChunks = (args.N + args.ChunkSize - 1) / args.ChunkSize;
}
args.ThisPtrToNextOutput = (T**)&(comm->local[nextId]->recvPtrs[0]);
args.PrevPtrToThisOutput = (T**)&(comm->remote[prevId]->recvPtrs[0]);
args.ThisPtrToNextOutput = (T**)&(comm->ptrs[nextId].local->recvPtrs[0]);
args.PrevPtrToThisOutput = (T**)&(comm->ptrs[prevId].remote->recvPtrs[0]);
args.ThisInput = (const T*)sendbuff;
args.ThisOutput = (volatile T*)recvbuff;
args.ThisBuffer = (volatile T*)comm->local[prevId]->buff;
args.NextBuffer = (volatile T*)comm->remote[nextId]->buff;
args.ThisBuffer = (volatile T*)comm->ptrs[prevId].local->buff;
args.NextBuffer = (volatile T*)comm->ptrs[nextId].remote->buff;
// we need 2 * NUM_SUBCHUNKS flags, so use the first NUM_SUBCHUNKS flags
// to signal the next GPU that new data is available and the following
// NUM_SUBCHUNKS to signal the previous GPU that a chunk is finished
args.ThisNewDataAvailableFlag = comm->local[prevId]->flags;
args.NextNewDataAvailableFlag = comm->remote[nextId]->flags;
args.ThisChunkDoneFlag = comm->local[nextId]->flags + 1;
args.PrevChunkDoneFlag = comm->remote[prevId]->flags + 1;
args.ThisNewDataAvailableFlag = comm->ptrs[prevId].local->flags;
args.NextNewDataAvailableFlag = comm->ptrs[nextId].remote->flags;
args.ThisChunkDoneFlag = comm->ptrs[nextId].local->flags + 1;
args.PrevChunkDoneFlag = comm->ptrs[prevId].remote->flags + 1;
ReduceScatterKernel<NUM_THREADS, UNROLL_COUNT, FUNC, T>
<<<1, NUM_THREADS + 1, 0, stream>>>(args);