From 59663167711b70dabb52a426a9fca0d712e3cc95 Mon Sep 17 00:00:00 2001 From: Nathan Luehr Date: Wed, 20 Jan 2016 18:19:45 -0800 Subject: [PATCH] Added support for more than 8 GPUs. Change-Id: Iaa1841036a7bfdad6ebec99fed0adcd2bbe6ffad Reviewed-on: http://git-master/r/935459 Reviewed-by: Cliff Woolley Tested-by: Przemek Tredak --- README.md | 2 +- src/all_gather.cu | 16 ++++---- src/all_reduce.cu | 18 ++++----- src/broadcast.cu | 18 ++++----- src/core.cu | 91 ++++++++++++++++++++++++++++--------------- src/core.h | 28 ++++++------- src/reduce.cu | 16 ++++---- src/reduce_scatter.cu | 16 ++++---- 8 files changed, 117 insertions(+), 88 deletions(-) diff --git a/README.md b/README.md index e65e1ba..04f8fe1 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Optimized primitives for collective multi-GPU communication. ## Introduction -NCCL (pronounced "Nickel") is a stand-alone library of standard collective communication routines, such as all-gather, reduce, broadcast, etc., that have been optimized to achieve high bandwidth over PCIe. NCCL supports up to eight GPUs and can be used in either single- or multi-process (e.g., MPI) applications. +NCCL (pronounced "Nickel") is a stand-alone library of standard collective communication routines, such as all-gather, reduce, broadcast, etc., that have been optimized to achieve high bandwidth over PCIe. NCCL supports an arbitrary number of GPUs installed in a single node and can be used in either single- or multi-process (e.g., MPI) applications. ## What's inside diff --git a/src/all_gather.cu b/src/all_gather.cu index a034bb1..f0948eb 100644 --- a/src/all_gather.cu +++ b/src/all_gather.cu @@ -429,18 +429,18 @@ ncclResult_t ncclAllGatherWithType(const void* sendbuff, void* recvbuff, args.NumChunks = (args.N + args.ChunkSize - 1) / args.ChunkSize; } - args.ThisPtrToNextOutput = (T**)&(comm->local[nextId]->recvPtrs[0]); - args.PrevPtrToThisOutput = (T**)&(comm->remote[prevId]->recvPtrs[0]); + args.ThisPtrToNextOutput = (T**)&(comm->ptrs[nextId].local->recvPtrs[0]); + args.PrevPtrToThisOutput = (T**)&(comm->ptrs[prevId].remote->recvPtrs[0]); args.ThisInput = (const T*)sendbuff; args.ThisOutput = (volatile T*)recvbuff; - args.ThisBuffer = (volatile T*)comm->local[prevId]->buff; - args.NextBuffer = (volatile T*)comm->remote[nextId]->buff; + args.ThisBuffer = (volatile T*)comm->ptrs[prevId].local->buff; + args.NextBuffer = (volatile T*)comm->ptrs[nextId].remote->buff; - args.ThisNewDataAvailableFlag = comm->local[prevId]->flags; - args.NextNewDataAvailableFlag = comm->remote[nextId]->flags; - args.ThisChunkDoneFlag = comm->local[nextId]->flags + 1; - args.PrevChunkDoneFlag = comm->remote[prevId]->flags + 1; + args.ThisNewDataAvailableFlag = comm->ptrs[prevId].local->flags; + args.NextNewDataAvailableFlag = comm->ptrs[nextId].remote->flags; + args.ThisChunkDoneFlag = comm->ptrs[nextId].local->flags + 1; + args.PrevChunkDoneFlag = comm->ptrs[prevId].remote->flags + 1; if( comm->useRemoteRecv ) { AllGatherKernel diff --git a/src/all_reduce.cu b/src/all_reduce.cu index 670d45c..7364da0 100644 --- a/src/all_reduce.cu +++ b/src/all_reduce.cu @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -420,19 +420,19 @@ ncclResult_t ncclAllReduceWithTypeAndFunc(const void* sendbuff, void* recvbuff, args.NumChunks = (args.N + args.ChunkSize - 1) / args.ChunkSize; } - args.ThisPtrToNextOutput = (T**)&(comm->local[nextId]->recvPtrs[0]); - args.PrevPtrToThisOutput = (T**)&(comm->remote[prevId]->recvPtrs[0]); + args.ThisPtrToNextOutput = (T**)&(comm->ptrs[nextId].local->recvPtrs[0]); + args.PrevPtrToThisOutput = (T**)&(comm->ptrs[prevId].remote->recvPtrs[0]); args.ThisInput = (const T*)sendbuff; args.ThisOutput = (volatile T*)recvbuff; - args.ThisBuffer = (volatile T*)comm->local[prevId]->buff; - args.NextBuffer = (volatile T*)comm->remote[nextId]->buff; + args.ThisBuffer = (volatile T*)comm->ptrs[prevId].local->buff; + args.NextBuffer = (volatile T*)comm->ptrs[nextId].remote->buff; - args.ThisNewDataAvailableFlag = comm->local[prevId]->flags; - args.NextNewDataAvailableFlag = comm->remote[nextId]->flags; + args.ThisNewDataAvailableFlag = comm->ptrs[prevId].local->flags; + args.NextNewDataAvailableFlag = comm->ptrs[nextId].remote->flags; - args.ThisChunkDoneFlag = comm->local[nextId]->flags + 1; - args.PrevChunkDoneFlag = comm->remote[prevId]->flags + 1; + args.ThisChunkDoneFlag = comm->ptrs[nextId].local->flags + 1; + args.PrevChunkDoneFlag = comm->ptrs[prevId].remote->flags + 1; if( comm->useRemoteRecv ) { AllReduceKernel diff --git a/src/broadcast.cu b/src/broadcast.cu index c3e4c20..d839aee 100644 --- a/src/broadcast.cu +++ b/src/broadcast.cu @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -335,20 +335,20 @@ ncclResult_t ncclBcastWithType(void* buff, const int count, const int root, // printf("sliceSize = %i, chunkSize = %i, numChunks = %i\n", args.SliceSize, args.ChunkSize, args.NumChunks); - args.ThisPtrToNextData = (T**)&(comm->local[nextId]->recvPtrs[0]); - args.PrevPtrToThisData = (T**)&(comm->remote[prevId]->recvPtrs[0]); + args.ThisPtrToNextData = (T**)&(comm->ptrs[nextId].local->recvPtrs[0]); + args.PrevPtrToThisData = (T**)&(comm->ptrs[prevId].remote->recvPtrs[0]); args.ThisData = (T*)buff; - args.ThisBuffer = (volatile T*)comm->local[prevId]->buff; - args.NextBuffer = (volatile T*)comm->remote[nextId]->buff; + args.ThisBuffer = (volatile T*)comm->ptrs[prevId].local->buff; + args.NextBuffer = (volatile T*)comm->ptrs[nextId].remote->buff; // we need 2 * NUM_SUBCHUNKS flags, so use the first NUM_SUBCHUNKS flags // to signal the next GPU that new data is available and the following // NUM_SUBCHUNKS to signal the previous GPU that a chunk is finished - args.ThisNewDataAvailableFlag = comm->local[prevId]->flags; - args.NextNewDataAvailableFlag = comm->remote[nextId]->flags; - args.ThisChunkDoneFlag = comm->local[nextId]->flags + 1; - args.PrevChunkDoneFlag = comm->remote[prevId]->flags + 1; + args.ThisNewDataAvailableFlag = comm->ptrs[prevId].local->flags; + args.NextNewDataAvailableFlag = comm->ptrs[nextId].remote->flags; + args.ThisChunkDoneFlag = comm->ptrs[nextId].local->flags + 1; + args.PrevChunkDoneFlag = comm->ptrs[prevId].remote->flags + 1; if (comm->useRemoteRecv) { if (index == (rootId + comm->nDev - 1) % comm->nDev) { diff --git a/src/core.cu b/src/core.cu index 95482c7..67ac7e3 100644 --- a/src/core.cu +++ b/src/core.cu @@ -300,11 +300,11 @@ static ncclResult_t commClearMaps(ncclComm_t comm) { cudaError_t cures; for(int d=0; dnDev; ++d) { - switch(comm->cleanup[d].type) { + switch(comm->ptrs[d].remoteCleanup) { case CLEANUP_NONE: break; case CLEANUP_CUIPC: - res = wrapCuIpcCloseMemHandle((CUdeviceptr)comm->cleanup[d].handle); + res = wrapCuIpcCloseMemHandle((CUdeviceptr)comm->ptrs[d].cleanupHandle); if (res != ncclSuccess) { WARN("rank %d failed to close IPC handle to rank %d", comm->userFromRing[comm->ncclId], comm->userFromRing[d]); @@ -312,13 +312,13 @@ static ncclResult_t commClearMaps(ncclComm_t comm) { } break; case CLEANUP_UNMAP: - cures = cudaHostUnregister(comm->cleanup[d].handle); + cures = cudaHostUnregister(comm->ptrs[d].cleanupHandle); if (cures != cudaSuccess) { WARN("rank %d failed to unregister handle to rank %d", comm->userFromRing[comm->ncclId], comm->userFromRing[d]); retval = (retval == ncclSuccess) ? ncclUnhandledCudaError : retval; } - res = shmUnmap(comm->cleanup[d].handle, offsetof(ncclMem, buff) + comm->buffSize); + res = shmUnmap(comm->ptrs[d].cleanupHandle, offsetof(ncclMem, buff) + comm->buffSize); if (res != ncclSuccess) { WARN("rank %d failed to unmap handle to rank %d", comm->userFromRing[comm->ncclId], comm->userFromRing[d]); @@ -326,16 +326,19 @@ static ncclResult_t commClearMaps(ncclComm_t comm) { } break; default: - WARN("Unknown cleanup type %d", comm->cleanup[d].type); + WARN("Unknown cleanup type %d", comm->ptrs[d].remoteCleanup); } - comm->cleanup[d].type = 0; - comm->cleanup[d].handle = NULL; + comm->ptrs[d].remoteCleanup = CLEANUP_NONE; + comm->ptrs[d].cleanupHandle = NULL; } - memset(comm->userFromRing, 0, sizeof(int)*MAXPEERS); - memset(comm->ringFromUser, 0, sizeof(int)*MAXPEERS); + if (comm->userFromRing != NULL) + memset(comm->userFromRing, 0, sizeof(int)*comm->nDev); + if (comm->ringFromUser != NULL) + memset(comm->ringFromUser, 0, sizeof(int)*comm->nDev); + if (comm->devUserFromRing != NULL) { - cudaError_t err = cudaMemset(comm->devUserFromRing, 0, sizeof(int)*MAXPEERS); + cudaError_t err = cudaMemset(comm->devUserFromRing, 0, sizeof(int)*comm->nDev); if (err != cudaSuccess) { WARN("Faild to clear dev map: %s", cudaGetErrorString(err)); retval = (retval == ncclSuccess) ? ncclUnhandledCudaError : retval; @@ -387,7 +390,10 @@ static ncclResult_t commBuildMaps(ncclComm_t comm, ncclUniqueId* commId, int ran pid_t myPid = ranks[myId].pid; comm->useRemoteRecv = 1; // Assume we directly write to result ptrs. - for (int i=0; i %d via P2P device mem", rank, iRank); - comm->local[i] = ranks[myId].devptr; - comm->remote[i] = ranks[i].devptr; - comm->cleanup[i].type = CLEANUP_NONE; + comm->ptrs[i].local = ranks[myId].devptr; + comm->ptrs[i].remote = ranks[i].devptr; + comm->ptrs[i].remoteCleanup = CLEANUP_NONE; } else if (iPid == myPid) { INFO("rank access %d -> %d via zero-copy host mem", rank, iRank); - comm->useRemoteRecv = 0; - if (cudaHostGetDevicePointer(comm->local+i, ranks[myId].hostptr, 0) != cudaSuccess) { + if (j <= 2) { + comm->useRemoteRecv = 0; + } + if (cudaHostGetDevicePointer(&comm->ptrs[i].local, ranks[myId].hostptr, 0) != cudaSuccess) { WARN("rank %d failed to map zero copy buffer to device", rank); commClearMaps(comm); return ncclUnhandledCudaError; } - if (cudaHostGetDevicePointer(comm->remote+i, ranks[i].hostptr, 0) != cudaSuccess) { + if (cudaHostGetDevicePointer(&comm->ptrs[i].remote, ranks[i].hostptr, 0) != cudaSuccess) { WARN("rank %d failed to map %d's zero copy buffer to device", rank, iRank); commClearMaps(comm); return ncclUnhandledCudaError; } - comm->cleanup[i].type = CLEANUP_NONE; + comm->ptrs[i].remoteCleanup = CLEANUP_NONE; } else if (canpeer || myDev == iDev) { INFO("rank access %d -> %d via Ipc P2P device mem", rank, iRank); comm->useRemoteRecv = 0; - comm->local[i] = ranks[myId].devptr; - if (wrapCuIpcOpenMemHandle((CUdeviceptr*)(&comm->remote[i]), + comm->ptrs[i].local = ranks[myId].devptr; + if (wrapCuIpcOpenMemHandle((CUdeviceptr*)(&comm->ptrs[i].remote), ranks[i].devipc, CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS) != ncclSuccess) { WARN("rank %d failed to open Ipc handle to rank %d", rank, iRank); commClearMaps(comm); return ncclUnhandledCudaError; } - comm->cleanup[i].type = CLEANUP_CUIPC; - comm->cleanup[i].handle = comm->remote[i]; + comm->ptrs[i].remoteCleanup = CLEANUP_CUIPC; + comm->ptrs[i].cleanupHandle = comm->ptrs[i].remote; } else { INFO("rank access %d -> %d via zero copy host shm", rank, iRank); comm->useRemoteRecv = 0; - if (cudaHostGetDevicePointer(comm->local+i, ranks[myId].hostptr, 0) != cudaSuccess) { + if (cudaHostGetDevicePointer(&comm->ptrs[i].local, ranks[myId].hostptr, 0) != cudaSuccess) { WARN("rank %d failed to obtain dev ptr to sysmem buffer", rank); commClearMaps(comm); return ncclUnhandledCudaError; } char rankname[1024]; sprintf(rankname, "%s-%d", commId->internal, ranks[i].rank); - if (openHostMemShm(rankname, (ncclMem**)&comm->cleanup[i].handle, ranks[i].buffSize) + if (openHostMemShm(rankname, (ncclMem**)&comm->ptrs[i].cleanupHandle, ranks[i].buffSize) != ncclSuccess) { WARN("rank %d failed to open sysmem buffer of rank %d", rank, iRank); commClearMaps(comm); return ncclUnhandledCudaError; } - if (cudaHostGetDevicePointer(comm->remote+i, comm->cleanup[i].handle, 0) != cudaSuccess) { + if (cudaHostGetDevicePointer(&comm->ptrs[i].remote, comm->ptrs[i].cleanupHandle, 0) != cudaSuccess) { WARN("rank %d failed to obtain dev ptr for rank %d", rank, iRank); commClearMaps(comm); return ncclUnhandledCudaError; } - comm->cleanup[i].type = CLEANUP_UNMAP; + comm->ptrs[i].remoteCleanup = CLEANUP_UNMAP; } } INFO("PushToRecv algos are %s\n", (comm->useRemoteRecv) ? "enabled" : "disabled"); @@ -501,10 +509,16 @@ static void commFree(ncclComm_t comm) { if (res != ncclSuccess) INFO("failed to cleanup comm maps"); + if (comm->userFromRing != NULL) + free(comm->userFromRing); + if (comm->devUserFromRing != NULL) if (cudaFree(comm->devUserFromRing) != cudaSuccess) INFO("commFree failed to free dev maps"); + if (comm->ringFromUser != NULL) + free(comm->ringFromUser); + if (comm->devMem != NULL && cudaFree(comm->devMem) != cudaSuccess) INFO("Failed to free devMap"); @@ -524,8 +538,8 @@ static void commFree(ncclComm_t comm) { } static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, const ncclUniqueId* commId, int rank) { - if (ndev < 1 || ndev > MAXPEERS) { - WARN("requested device count (%d) exceeds maximum of %d", ndev, MAXPEERS); + if (ndev < 1) { + WARN("invalid device count (%d) requested", ndev); return ncclUnsupportedDeviceCount; } if (rank >= ndev || rank < 0) { @@ -533,12 +547,13 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, const ncclUniqueId* return ncclInvalidRank; } - struct ncclComm* comm = (struct ncclComm*)malloc(sizeof(struct ncclComm)); + size_t commBytes = offsetof(ncclComm, ptrs) + ndev*sizeof(ncclNodeRef); + struct ncclComm* comm = (struct ncclComm*)malloc(commBytes); if (comm == NULL) { WARN("comm allocation failed"); return ncclSystemError; } - memset(comm, 0, sizeof(struct ncclComm)); + memset(comm, 0, commBytes); comm->nDev = ndev; cudaGetDevice(&comm->cudaDev); @@ -565,12 +580,26 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, const ncclUniqueId* commFree(comm); return res; } - if (cudaMalloc(&comm->devUserFromRing, MAXPEERS*sizeof(int)) != cudaSuccess) { + if (cudaMalloc(&comm->devUserFromRing, ndev*sizeof(int)) != cudaSuccess) { WARN("rank %d failed to allocated device maps", rank); commFree(comm); return ncclCudaMallocFailed; } + comm->userFromRing = (int*)malloc(ndev*sizeof(int)); + if (comm->userFromRing == NULL) { + WARN("rank %d failed to allocate host maps", rank); + commFree(comm); + return ncclSystemError; + } + + comm->ringFromUser = (int*)malloc(ndev*sizeof(int)); + if (comm->ringFromUser == NULL) { + WARN("rank %d failed to allocate host maps", rank); + commFree(comm); + return ncclSystemError; + } + EventQueue* eq = &comm->events; for(int i=0; iisDone+i, cudaEventDisableTiming) != cudaSuccess) { diff --git a/src/core.h b/src/core.h index d2cd092..88e5cf0 100644 --- a/src/core.h +++ b/src/core.h @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2015, NVIDIA CORPORATION. All rights reserved. + * Copyright (c) 2015-2016, NVIDIA CORPORATION. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -33,7 +33,6 @@ #include #include -#define MAXPEERS 8 // Maximum number of devices. #define MAXFLAGS 8 #define MAXQUEUE 4 // Maximum number of queued collectives per communicator. #define DEFAULT_BUFFER_SIZE_BYTES (1UL << 21) @@ -67,6 +66,13 @@ struct ncclMem { char buff[NCCL_MEM_PAD_ALIGN]; }; +struct ncclNodeRef { + ncclMem* remote; + ncclMem* local; + int remoteCleanup; + void* cleanupHandle; +}; + struct ncclComm { int nDev; // number of devices in communicator int cudaDev; // cuda device index @@ -77,29 +83,19 @@ struct ncclComm { ncclMem* hostMem; int hostMemState; - // Device-to-device communication structures to access remote or local device - // memory. - ncclMem* remote[MAXPEERS]; - ncclMem* local[MAXPEERS]; - struct { - int type; - void* handle; - } cleanup[MAXPEERS]; - //int remoteCleanup[MAXPEERS]; // 0 is raw ptr, 1 is unregister/unmap, 2 is ipc close - // Placed between calling and internal device streams. EventQueue events; // Maps an internal nccl index to user-specified rank order. This is necessary // since we need to know how the user expects data to be ordered across // devices. - int userFromRing[MAXPEERS]; + int* userFromRing; // copy of the above stored on each device int* devUserFromRing; // Inverse of userFromRing. Maps user specified index to internal nccl index. - int ringFromUser[MAXPEERS]; + int* ringFromUser; // Size of temp buffer in bytes. size_t buffSize; @@ -108,6 +104,10 @@ struct ncclComm { // GPUs. In single process mode this can be used as long as QPI links are // not present. In multi-process, we never push to a remote recvbuff. int useRemoteRecv; + + // Device-to-device communication structures to access remote or local device + // memory. Actual allocation larger than 1. + ncclNodeRef ptrs[1]; }; typedef enum {NONE=0, WARN=1, INFO=2, ABORT=3} DebugLevel; diff --git a/src/reduce.cu b/src/reduce.cu index 37acb33..3b48a03 100644 --- a/src/reduce.cu +++ b/src/reduce.cu @@ -324,19 +324,19 @@ ncclResult_t ncclReduceWithTypeAndFunc(const void* sendbuff, void* recvbuff, args.NumChunks = (args.N + args.ChunkSize - 1) / args.ChunkSize; } - args.ThisPtrToNextData = (T**)&(comm->local[nextId]->recvPtrs[0]); - args.PrevPtrToThisData = (T**)&(comm->remote[prevId]->recvPtrs[0]); + args.ThisPtrToNextData = (T**)&(comm->ptrs[nextId].local->recvPtrs[0]); + args.PrevPtrToThisData = (T**)&(comm->ptrs[prevId].remote->recvPtrs[0]); args.Output = (T*)recvbuff; args.ThisData = (const T*) sendbuff; - args.ThisBuffer = (volatile T*)comm->local[prevId]->buff; - args.NextBuffer = (volatile T*)comm->remote[nextId]->buff; + args.ThisBuffer = (volatile T*)comm->ptrs[prevId].local->buff; + args.NextBuffer = (volatile T*)comm->ptrs[nextId].remote->buff; - args.ThisNewDataAvailableFlag = comm->local[prevId]->flags; - args.NextNewDataAvailableFlag = comm->remote[nextId]->flags; + args.ThisNewDataAvailableFlag = comm->ptrs[prevId].local->flags; + args.NextNewDataAvailableFlag = comm->ptrs[nextId].remote->flags; - args.ThisChunkDoneFlag = comm->local[nextId]->flags + 1; - args.PrevChunkDoneFlag = comm->remote[prevId]->flags + 1; + args.ThisChunkDoneFlag = comm->ptrs[nextId].local->flags + 1; + args.PrevChunkDoneFlag = comm->ptrs[prevId].remote->flags + 1; if (index == (rootId + 1) % comm->nDev) { ReduceKernel diff --git a/src/reduce_scatter.cu b/src/reduce_scatter.cu index 039e95f..797cfd8 100644 --- a/src/reduce_scatter.cu +++ b/src/reduce_scatter.cu @@ -428,21 +428,21 @@ ncclResult_t ncclReduceScatterWithTypeAndFunc(const void* sendbuff, args.NumChunks = (args.N + args.ChunkSize - 1) / args.ChunkSize; } - args.ThisPtrToNextOutput = (T**)&(comm->local[nextId]->recvPtrs[0]); - args.PrevPtrToThisOutput = (T**)&(comm->remote[prevId]->recvPtrs[0]); + args.ThisPtrToNextOutput = (T**)&(comm->ptrs[nextId].local->recvPtrs[0]); + args.PrevPtrToThisOutput = (T**)&(comm->ptrs[prevId].remote->recvPtrs[0]); args.ThisInput = (const T*)sendbuff; args.ThisOutput = (volatile T*)recvbuff; - args.ThisBuffer = (volatile T*)comm->local[prevId]->buff; - args.NextBuffer = (volatile T*)comm->remote[nextId]->buff; + args.ThisBuffer = (volatile T*)comm->ptrs[prevId].local->buff; + args.NextBuffer = (volatile T*)comm->ptrs[nextId].remote->buff; // we need 2 * NUM_SUBCHUNKS flags, so use the first NUM_SUBCHUNKS flags // to signal the next GPU that new data is available and the following // NUM_SUBCHUNKS to signal the previous GPU that a chunk is finished - args.ThisNewDataAvailableFlag = comm->local[prevId]->flags; - args.NextNewDataAvailableFlag = comm->remote[nextId]->flags; - args.ThisChunkDoneFlag = comm->local[nextId]->flags + 1; - args.PrevChunkDoneFlag = comm->remote[prevId]->flags + 1; + args.ThisNewDataAvailableFlag = comm->ptrs[prevId].local->flags; + args.NextNewDataAvailableFlag = comm->ptrs[nextId].remote->flags; + args.ThisChunkDoneFlag = comm->ptrs[nextId].local->flags + 1; + args.PrevChunkDoneFlag = comm->ptrs[prevId].remote->flags + 1; ReduceScatterKernel <<<1, NUM_THREADS + 1, 0, stream>>>(args);