From 27d32ac5d93f86bfb406551fda0ea3edeafdb199 Mon Sep 17 00:00:00 2001 From: Nathan Luehr Date: Thu, 19 Nov 2015 11:11:52 -0800 Subject: [PATCH] Fixed a race condition in reduce and braodcast. --- src/all_gather.cu | 2 +- src/broadcast.cu | 22 +++++++++------------- src/reduce.cu | 18 +++++++----------- 3 files changed, 17 insertions(+), 25 deletions(-) diff --git a/src/all_gather.cu b/src/all_gather.cu index 1a44c79..a83385f 100644 --- a/src/all_gather.cu +++ b/src/all_gather.cu @@ -59,7 +59,7 @@ // subchunks, we interleave the independent subchunks so that more data can be // transferred while the sync is in progress. This is the number of subchunks // that are active at the same time -#define NUM_SUBCHUNKS 1 +#define NUM_SUBCHUNKS 2 // If this is called with STEP, it means that we just finished processing the // data for step STEP on this GPU, which is the data required on the next GPU diff --git a/src/broadcast.cu b/src/broadcast.cu index e319d26..cde9c9e 100644 --- a/src/broadcast.cu +++ b/src/broadcast.cu @@ -180,21 +180,17 @@ __global__ void BroadcastKernel(const BroadcastKernelArgs args) { // First wait for args.PrevPtrToThisOutput to become nullptr to ensure that // the previous GPU is done with a previous collective operation. if (tid == 0) { - if (ROLE != ROOT) { - Wait([=] { - return *((T * volatile *)args.PrevPtrToThisData) == nullptr; // Wait for previous processor to be done - }); + Wait([=] { + return *((T * volatile *)args.PrevPtrToThisData) == nullptr; // Wait for previous processor to be done + }); - *((T * volatile *)args.PrevPtrToThisData) = (T*)args.ThisData; // Tell Previous I'm starting - } - if (ROLE != END) { - Wait([=] { - return *((T * volatile *)args.ThisPtrToNextData) != nullptr; // Wait till I've been told next started - }); + *((T * volatile *)args.PrevPtrToThisData) = (T*)args.ThisData; // Tell Previous I'm starting + Wait([=] { + return *((T * volatile *)args.ThisPtrToNextData) != nullptr; // Wait till I've been told next started + }); - if (PUSHRECV) - nextData = *((volatile void * volatile *)args.ThisPtrToNextData); // Grab next's pointer if needed. - } + if (PUSHRECV) + nextData = *((volatile void * volatile *)args.ThisPtrToNextData); // Grab next's pointer if needed. } __syncthreads(); diff --git a/src/reduce.cu b/src/reduce.cu index 959bf24..2863e2a 100644 --- a/src/reduce.cu +++ b/src/reduce.cu @@ -182,18 +182,14 @@ __global__ void ReduceKernel(const ReduceKernelArgs args) { // First wait for args.PrevPtrToThisOutput to become nullptr to ensure that // the previous GPU is done with a previous collective operation. if (tid == 0) { - if (ROLE != BEGIN) { - Wait([=] { - return *((T * volatile *)args.PrevPtrToThisData) == nullptr; // Wait for previous processor to be done - }); + Wait([=] { + return *((T * volatile *)args.PrevPtrToThisData) == nullptr; // Wait for previous processor to be done + }); - *((T * volatile *)args.PrevPtrToThisData) = (T*)args.ThisData; // Tell Previous I'm starting - } - if (ROLE != END) { - Wait([=] { - return *((T * volatile *)args.ThisPtrToNextData) != nullptr; // Wait till I've been told next started - }); - } + *((T * volatile *)args.PrevPtrToThisData) = (T*)args.ThisData; // Tell Previous I'm starting + Wait([=] { + return *((T * volatile *)args.ThisPtrToNextData) != nullptr; // Wait till I've been told next started + }); } __syncthreads();