Skip to content

Commit

Permalink
compatability of fp16 optimizer to dist opt
Browse files Browse the repository at this point in the history
  • Loading branch information
chrishkchris committed Nov 1, 2020
1 parent 207bec2 commit c089958
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 47 deletions.
7 changes: 5 additions & 2 deletions include/singa/io/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,11 @@ class Communicator {

// normal synch
size_t sendBuffOffset = 0;
float *fusedSendBuff;
float *fusedRecvBuff;
void *fusedSendBuff;
void *fusedRecvBuff;
void *offsetPointer;
size_t dataSize;
ncclDataType_t ncclType;

// half synch
bool halfInitialized;
Expand Down
128 changes: 83 additions & 45 deletions src/io/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,14 @@ void Communicator::fusedSynch(vector<Tensor> &t, bool send) {

generateBlocks(t);

if (t[0].data_type() == kFloat16) {
ncclType = ncclHalf;
dataSize = sizeof(__half);
} else {
ncclType = ncclFloat;
dataSize = sizeof(float);
}

if (!send) {
// buffer the tensors
device_->Exec(
Expand All @@ -228,11 +236,17 @@ void Communicator::fusedSynch(vector<Tensor> &t, bool send) {
[this, t](Context *ctx) mutable {
// memory copy to fusedBuff
for (size_t i = 0; i < t.size(); i++) {
CUDA_CHECK(
cudaMemcpyAsync((void *)(fusedSendBuff + sendBuffOffset),
(const void *)t[i].block()->mutable_data(),
t[i].Size() * sizeof(float),
cudaMemcpyDeviceToDevice, ctx->c1));
if (t[0].data_type() == kFloat16) {
offsetPointer =
(void *)(static_cast<__half *>(fusedRecvBuff) + sendBuffOffset);
} else {
offsetPointer =
(void *)(static_cast<float *>(fusedRecvBuff) + sendBuffOffset);
}
CUDA_CHECK(cudaMemcpyAsync(
(void *)offsetPointer,
(const void *)t[i].block()->mutable_data(),
t[i].Size() * dataSize, cudaMemcpyDeviceToDevice, ctx->c1));
sendBuffOffset += t[i].Size();
}
},
Expand All @@ -247,28 +261,38 @@ void Communicator::fusedSynch(vector<Tensor> &t, bool send) {
CUDA_CHECK(cudaStreamWaitEvent(ctx->s, event, 0));
},
prev_blocks_, prev_blocks_, "Waiting");

device_->Exec(
[this](Context *ctx) mutable {
allReduce((int)sendBuffOffset, (void *)fusedSendBuff,
(void *)fusedRecvBuff, ncclFloat, ctx);
(void *)fusedRecvBuff, ncclType, ctx);
sendBuffOffset = 0;
},
prev_blocks_, blocks_, "Dist_s_fusedSynch_allreduce");

device_->Exec(
[this](Context *ctx) mutable {
// wait for the allreduce to complete
CUDA_CHECK(cudaEventRecord(event, ctx->s));
CUDA_CHECK(cudaStreamWaitEvent(ctx->c1, event, 0));
},
blocks_, blocks_, "Waiting");

device_->Exec(
[this, t](Context *ctx) mutable {
// copy data back to tensors after allreduce
size_t offset = 0;
for (size_t i = 0; i < t.size(); i++) {
if (t[0].data_type() == kFloat16) {
offsetPointer =
(void *)(static_cast<__half *>(fusedRecvBuff) + offset);
} else {
offsetPointer =
(void *)(static_cast<float *>(fusedRecvBuff) + offset);
}
CUDA_CHECK(cudaMemcpyAsync((void *)t[i].block()->mutable_data(),
(const void *)(fusedRecvBuff + offset),
t[i].Size() * sizeof(float),
(const void *)offsetPointer,
t[i].Size() * dataSize,
cudaMemcpyDeviceToDevice, ctx->c1));
offset += t[i].Size();
}
Expand All @@ -281,6 +305,11 @@ void Communicator::synch(Tensor &t) {
// generateBlocks(t);
device_ = t.device();

if (t.data_type() == kFloat16)
ncclType = ncclHalf;
else
ncclType = ncclFloat;

device_->Exec(
[this, t](Context *ctx) mutable {
// record the event of the default cuda stream and follow it
Expand All @@ -292,10 +321,11 @@ void Communicator::synch(Tensor &t) {
device_->Exec(
[this, t](Context *ctx) mutable {
void *addr = t.block()->mutable_data();
allReduce(t.Size(), addr, addr, ncclFloat, ctx);
allReduce(t.Size(), addr, addr, ncclType, ctx);
},
{t.block()}, {t.block()}, "Dist_s_synch_allreduce");
}

} // namespace singa

void Communicator::fusedSynchHalf(vector<Tensor> &t, bool send) {
CHECK_GT(t.size(), 0);
Expand All @@ -318,11 +348,11 @@ void Communicator::fusedSynchHalf(vector<Tensor> &t, bool send) {
size_t offset = 0;
// memory copy to fusedBuff
for (size_t i = 0; i < t.size(); i++) {
CUDA_CHECK(
cudaMemcpyAsync((void *)(fusedSendBuff + sendBuffOffset),
(const void *)t[i].block()->mutable_data(),
t[i].Size() * sizeof(float),
cudaMemcpyDeviceToDevice, ctx->c1));
CUDA_CHECK(cudaMemcpyAsync(
(void *)(static_cast<float *>(fusedSendBuff) + sendBuffOffset),
(const void *)t[i].block()->mutable_data(),
t[i].Size() * sizeof(float), cudaMemcpyDeviceToDevice,
ctx->c1));
sendBuffOffset += t[i].Size();
offset += t[i].Size();
}
Expand All @@ -332,8 +362,8 @@ void Communicator::fusedSynchHalf(vector<Tensor> &t, bool send) {
// send the tensors in the buffer
device_->Exec(
[this](Context *ctx) mutable {
cuda::float2half(sendBuffOffset, fusedSendBuff, fusedSendBuffHalf,
ctx->c1);
cuda::float2half(sendBuffOffset, static_cast<float *>(fusedSendBuff),
static_cast<__half *>(fusedSendBuffHalf), ctx->c1);
},
prev_blocks_, blocks_, "Dist_c1_fusedSynchHalf_float2half");
device_->Exec(
Expand All @@ -353,23 +383,25 @@ void Communicator::fusedSynchHalf(vector<Tensor> &t, bool send) {
[this](Context *ctx) mutable {
// wait for the allreduce to complete
CUDA_CHECK(cudaEventRecord(event, ctx->s));
CUDA_CHECK(cudaStreamWaitEvent(ctx->c2, event, 0));
CUDA_CHECK(cudaStreamWaitEvent(ctx->c2, event, 0));
},
blocks_, blocks_, "Waiting");
device_->Exec(
[this, t](Context *ctx) mutable {
cuda::half2float(sendBuffOffset, fusedRecvBuffHalf, fusedRecvBuff,
ctx->c2);
cuda::half2float(sendBuffOffset,
static_cast<__half *>(fusedRecvBuffHalf),
static_cast<float *>(fusedRecvBuff), ctx->c2);

sendBuffOffset = 0;

// copy data back to tensors after allreduce
size_t offset = 0;
for (size_t i = 0; i < t.size(); i++) {
CUDA_CHECK(cudaMemcpyAsync((void *)t[i].block()->mutable_data(),
(const void *)(fusedRecvBuff + offset),
t[i].Size() * sizeof(float),
cudaMemcpyDeviceToDevice, ctx->c2));
CUDA_CHECK(cudaMemcpyAsync(
(void *)t[i].block()->mutable_data(),
(const void *)(static_cast<float *>(fusedRecvBuff) + offset),
t[i].Size() * sizeof(float), cudaMemcpyDeviceToDevice,
ctx->c2));
offset += t[i].Size();
}
},
Expand Down Expand Up @@ -421,7 +453,6 @@ void Communicator::synchHalf(Tensor &t) {
cuda::half2float(t.Size(), fusedRecvBuffHalf, addr, ctx->c2);
},
blocks_, blocks_, "Dist_c2_synchHalf_half2float");

}

void Communicator::sparsification(Tensor &t, Tensor &accumulation,
Expand Down Expand Up @@ -520,10 +551,10 @@ void Communicator::_fusedSparsification(vector<Tensor> &t, Tensor *accumulation,

// memory copy to fusedBuff
for (size_t i = 0; i < t.size(); i++) {
CUDA_CHECK(cudaMemcpyAsync((void *)(fusedSendBuff + offset),
(const void *)t[i].block()->mutable_data(),
t[i].Size() * sizeof(float),
cudaMemcpyDeviceToDevice, ctx->c1));
CUDA_CHECK(cudaMemcpyAsync(
(void *)(static_cast<float *>(fusedSendBuff) + offset),
(const void *)t[i].block()->mutable_data(), t[i].Size() * sizeof(float),
cudaMemcpyDeviceToDevice, ctx->c1));
offset += t[i].Size();
}

Expand All @@ -542,10 +573,10 @@ void Communicator::_fusedSparsification(vector<Tensor> &t, Tensor *accumulation,
// copy data back to tensors after allreduce
offset = 0;
for (size_t i = 0; i < t.size(); i++) {
CUDA_CHECK(cudaMemcpyAsync((void *)t[i].block()->mutable_data(),
(const void *)(fusedRecvBuff + offset),
t[i].Size() * sizeof(float),
cudaMemcpyDeviceToDevice, ctx->c2));
CUDA_CHECK(cudaMemcpyAsync(
(void *)t[i].block()->mutable_data(),
(const void *)(static_cast<float *>(fusedRecvBuff) + offset),
t[i].Size() * sizeof(float), cudaMemcpyDeviceToDevice, ctx->c2));
offset += t[i].Size();
}
}
Expand All @@ -556,22 +587,26 @@ void Communicator::valSparsAllReduce(size_t num, float *accumulation,

if (accumulation != NULL) {
// add the previous accumulation
cuda::add(num, fusedSendBuff, accumulation, fusedSendBuff, ctx->c1);
cuda::add(num, static_cast<float *>(fusedSendBuff), accumulation,
static_cast<float *>(fusedSendBuff), ctx->c1);
// backup the fusedSendBuff
CUDA_CHECK(cudaMemcpyAsync((void *)backupBuff, (const void *)fusedSendBuff,
sizeof(float) * num, cudaMemcpyDeviceToDevice,
ctx->c1));
}

// sparsification based on threshold
cuda::sparsabs(num, threshold, fusedSendBuff, fusedSendBuff, ctx->c1);
cuda::sparsabs(num, threshold, static_cast<float *>(fusedSendBuff),
static_cast<float *>(fusedSendBuff), ctx->c1);

// output the gradient accumulation
if (accumulation != NULL)
cuda::sub(num, backupBuff, fusedSendBuff, accumulation, ctx->c1);
cuda::sub(num, backupBuff, static_cast<float *>(fusedSendBuff),
accumulation, ctx->c1);

// produce the index of the sparse array
cuda::sparsindex(num, fusedSendBuff, fusedIndex, ctx->c1);
cuda::sparsindex(num, static_cast<float *>(fusedSendBuff), fusedIndex,
ctx->c1);

// remove zero of index to become sprase array and get the num of non-zero nnz
cuda::removezeroidx(num, fusedIndex, ctx->c1, nnz);
Expand All @@ -594,7 +629,7 @@ void Communicator::valSparsAllReduce(size_t num, float *accumulation,
if (nnzAll[i] > nnzMax) nnzMax = nnzAll[i];

// remove zero of values to become sprase array
cuda::removezeroval(num, fusedSendBuff, ctx->c1);
cuda::removezeroval(num, static_cast<float *>(fusedSendBuff), ctx->c1);

CUDA_CHECK(cudaMemcpyAsync((void *)(sparsSendBuff), (const void *)fusedIndex,
sizeof(int) * (*nnz), cudaMemcpyDeviceToDevice,
Expand Down Expand Up @@ -635,7 +670,7 @@ void Communicator::valSparsAllReduce(size_t num, float *accumulation,
sizeof(float) * nnzAll[i], cudaMemcpyDeviceToDevice, ctx->c2));
offset += (2 * nnzMax - nnzAll[i]);
CUSPARSE_CHECK(cusparseSaxpyi(cusparse_handle, nnzAll[i], &alpha, xVal,
xInd, fusedRecvBuff,
xInd, static_cast<float *>(fusedRecvBuff),
CUSPARSE_INDEX_BASE_ONE));
}
}
Expand All @@ -647,7 +682,8 @@ void Communicator::topKSparsAllReduce(size_t num, float *accumulation,
// use gradient accumulation
if (accumulation != NULL) {
// add the previous accumulation
cuda::add(num, fusedSendBuff, accumulation, fusedSendBuff, ctx->c1);
cuda::add(num, static_cast<float *>(fusedSendBuff), accumulation,
static_cast<float *>(fusedSendBuff), ctx->c1);
// backup the fusedSendBuff
CUDA_CHECK(cudaMemcpyAsync((void *)backupBuff, (const void *)fusedSendBuff,
sizeof(float) * num, cudaMemcpyDeviceToDevice,
Expand All @@ -656,7 +692,8 @@ void Communicator::topKSparsAllReduce(size_t num, float *accumulation,

// generate an index and sort the fusedSendBuff from large to small values
cuda::generateindex(num, fusedIndex, ctx->c1);
cuda::sortbykey(num, fusedSendBuff, fusedIndex, ctx->c1);
cuda::sortbykey(num, static_cast<float *>(fusedSendBuff), fusedIndex,
ctx->c1);

// determine the number of topK for communication
int nnzMax = (int)ceil(threshold * num);
Expand All @@ -666,9 +703,9 @@ void Communicator::topKSparsAllReduce(size_t num, float *accumulation,
if (accumulation != NULL) {
CUDA_CHECK(cudaMemsetAsync(accumulation, 0, num * sizeof(float), ctx->c1));
CUSPARSE_CHECK(cusparseSetStream(cusparse_handle, ctx->c1));
CUSPARSE_CHECK(cusparseSaxpyi(cusparse_handle, nnzMax, &alpha,
fusedSendBuff, fusedIndex, accumulation,
CUSPARSE_INDEX_BASE_ONE));
CUSPARSE_CHECK(cusparseSaxpyi(
cusparse_handle, nnzMax, &alpha, static_cast<float *>(fusedSendBuff),
fusedIndex, accumulation, CUSPARSE_INDEX_BASE_ONE));
cuda::sub(num, backupBuff, accumulation, accumulation, ctx->c1);
}

Expand Down Expand Up @@ -711,7 +748,8 @@ void Communicator::topKSparsAllReduce(size_t num, float *accumulation,
sizeof(float) * nnzMax, cudaMemcpyDeviceToDevice, ctx->c2));
offset += nnzMax;
CUSPARSE_CHECK(cusparseSaxpyi(cusparse_handle, nnzMax, &alpha, xVal, xInd,
fusedRecvBuff, CUSPARSE_INDEX_BASE_ONE));
static_cast<float *>(fusedRecvBuff),
CUSPARSE_INDEX_BASE_ONE));
}
}
} // namespace singa
Expand Down

0 comments on commit c089958

Please sign in to comment.