Skip to content

Commit

Permalink
Merge branch 'fix-workspacesize_per_gpu_with_optimizer_state-aleliu' …
Browse files Browse the repository at this point in the history
…into 'v3.5-integration'

[ready for merge]fix workspacesize_per_gpu_with_optimizer_state

See merge request dl/hugectr/hugectr!694
  • Loading branch information
zehuanw committed Mar 1, 2022
2 parents 4e7c47b + 64b328d commit c8e5d75
Show file tree
Hide file tree
Showing 107 changed files with 274 additions and 542 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,24 +141,6 @@ class DistributedSlotSparseEmbeddingHash : public IEmbedding {
const std::vector<std::shared_ptr<HashTable<TypeHashKey, size_t>>> &hash_tables) const;

public:
/**
* The constructor of DistributedSlotSparseEmbeddingHash.
* @param row_offsets_tensors row offsets of the input tensor(refer to row offset vector in sparse
* matrix CSR format).
* @param hash_key_tensors hash keys of the input tensor(refer to value vector in sparse matrix
* CSR format).
* @param embedding_params embedding params for initialization.
* @param resource_manager the GPU resource group
*/
DistributedSlotSparseEmbeddingHash(const Tensors2<TypeHashKey> &train_row_offsets_tensors,
const Tensors2<TypeHashKey> &train_value_tensors,
const std::vector<std::shared_ptr<size_t>> &train_nnz_array,
const Tensors2<TypeHashKey> &evaluate_row_offsets_tensors,
const Tensors2<TypeHashKey> &evaluate_value_tensors,
const std::vector<std::shared_ptr<size_t>> &evaluate_nnz_array,
const SparseEmbeddingHashParams &embedding_params,
const std::shared_ptr<ResourceManager> &resource_manager);

DistributedSlotSparseEmbeddingHash(const SparseTensors<TypeHashKey> &train_keys,
const SparseTensors<TypeHashKey> &evaluate_keys,
const SparseEmbeddingHashParams &embedding_params,
Expand Down Expand Up @@ -407,14 +389,8 @@ class DistributedSlotSparseEmbeddingHash : public IEmbedding {
for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(); id++) {
context.set_device(embedding_data_.get_local_gpu(id).get_device_id());
size_t count = hash_tables_[id]->get_size(embedding_data_.get_local_gpu(id).get_stream());
if (count > max_vocabulary_size_per_gpu_) {
std::ostringstream os;
os << "Runtime vocabulary size (" << count << ") exceeds max_vocabulary_size_per_gpu ("
<< max_vocabulary_size_per_gpu_ << ") on GPU "
<< embedding_data_.get_local_gpu(id).get_device_id()
<< ", new feature insertion failed.\n";
HCTR_OWN_THROW(Error_t::OutOfBound, os.str());
}
HCTR_CHECK_HINT(count <= max_vocabulary_size_per_gpu_, "Runtime vocabulary size %lu exceeds max_vocabulary_size_per_gpu %lu on GPU %lu. new feature insertion failed. Please adjust workspace_size_per_gpu according to QAList.md#24. How to set workspace_size_per_gpu_in_mb and slot_size_array", count, max_vocabulary_size_per_gpu_, embedding_data_.get_local_gpu(id).get_device_id());

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,24 +204,6 @@ class LocalizedSlotSparseEmbeddingHash : public IEmbedding {
const std::vector<std::shared_ptr<HashTable<TypeHashKey, size_t>>> &hash_tables) const;

public:
/**
* The constructor of LocalizedSlotSparseEmbeddingHash.
* @param row_offsets_tensors row offsets of the input tensor(refer to row offset vector in sparse
* matrix CSR format).
* @param hash_key_tensors hash keys of the input tensor(refer to value vector in sparse matrix
* CSR format).
* @param embedding_params embedding params for initialization.
* @param resource_manager the GPU resource group
*/
LocalizedSlotSparseEmbeddingHash(const Tensors2<TypeHashKey> &train_row_offsets_tensors,
const Tensors2<TypeHashKey> &train_value_tensors,
const std::vector<std::shared_ptr<size_t>> &train_nnz_array,
const Tensors2<TypeHashKey> &evaluate_row_offsets_tensors,
const Tensors2<TypeHashKey> &evaluate_value_tensors,
const std::vector<std::shared_ptr<size_t>> &evaluate_nnz_array,
const SparseEmbeddingHashParams &embedding_params,
const std::shared_ptr<ResourceManager> &resource_manager);

LocalizedSlotSparseEmbeddingHash(const SparseTensors<TypeHashKey> &train_keys,
const SparseTensors<TypeHashKey> &evaluate_keys,
const SparseEmbeddingHashParams &embedding_params,
Expand Down Expand Up @@ -575,14 +557,8 @@ class LocalizedSlotSparseEmbeddingHash : public IEmbedding {
for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(); id++) {
context.set_device(embedding_data_.get_local_gpu(id).get_device_id());
size_t count = hash_tables_[id]->get_size(embedding_data_.get_local_gpu(id).get_stream());
if (count > max_vocabulary_size_per_gpu_) {
std::ostringstream os;
os << "Runtime vocabulary size (" << count << ") exceeds max_vocabulary_size_per_gpu ("
<< max_vocabulary_size_per_gpu_ << ") on GPU "
<< embedding_data_.get_local_gpu(id).get_device_id()
<< ", new feature insertion failed.\n";
HCTR_OWN_THROW(Error_t::OutOfBound, os.str());
}
HCTR_CHECK_HINT(count <= max_vocabulary_size_per_gpu_, "Runtime vocabulary size %lu exceeds max_vocabulary_size_per_gpu %lu on GPU %lu. new feature insertion failed. Please adjust workspace_size_per_gpu according to QAList.md#24. How to set workspace_size_per_gpu_in_mb and slot_size_array", count, max_vocabulary_size_per_gpu_, embedding_data_.get_local_gpu(id).get_device_id());

}
}

Expand Down
2 changes: 2 additions & 0 deletions HugeCTR/include/pybind/model.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ struct SparseEmbedding {
std::vector<size_t>& slot_size_array,
std::shared_ptr<OptParamsPy>& embedding_opt_params,
const HybridEmbeddingParam& hybrid_embedding_param);

void initialize_max_vocabulary_size_per_gpu();
};

struct EmbeddingTrainingCacheParams {
Expand Down
125 changes: 0 additions & 125 deletions HugeCTR/src/embeddings/distributed_slot_sparse_embedding_hash.cu
Original file line number Diff line number Diff line change
Expand Up @@ -150,131 +150,6 @@ void DistributedSlotSparseEmbeddingHash<TypeHashKey, TypeEmbeddingComp>::filter_
}
}

template <typename TypeHashKey, typename TypeEmbeddingComp>
DistributedSlotSparseEmbeddingHash<TypeHashKey, TypeEmbeddingComp>::
DistributedSlotSparseEmbeddingHash(
const Tensors2<TypeHashKey> &train_row_offsets_tensors,
const Tensors2<TypeHashKey> &train_value_tensors,
const std::vector<std::shared_ptr<size_t>> &train_nnz_array,
const Tensors2<TypeHashKey> &evaluate_row_offsets_tensors,
const Tensors2<TypeHashKey> &evaluate_value_tensors,
const std::vector<std::shared_ptr<size_t>> &evaluate_nnz_array,
const SparseEmbeddingHashParams &embedding_params,
const std::shared_ptr<ResourceManager> &resource_manager)
: embedding_data_(train_row_offsets_tensors, train_value_tensors, train_nnz_array,
evaluate_row_offsets_tensors, evaluate_value_tensors, evaluate_nnz_array,
Embedding_t::DistributedSlotSparseEmbeddingHash, embedding_params,
resource_manager) {
embedding_data_.embedding_params_.is_data_parallel =
false; // this ctor is only used for embedding plugin
try {
// CAUSION: can not decide how many <key,value> pairs in each GPU, because the GPU
// distribution is computed by (key%gpu_count). In order to not allocate the total size of
// hash table on each GPU, meanwhile get a better performance by a unfull hash table, the
// users need to set the param "load_factor"(load_factor<1).
max_vocabulary_size_per_gpu_ = embedding_data_.embedding_params_.max_vocabulary_size_per_gpu;
max_vocabulary_size_ = max_vocabulary_size_per_gpu_ *
embedding_data_.get_resource_manager().get_global_gpu_count();

HCTR_LOG_S(INFO, ROOT) << "max_vocabulary_size_per_gpu_=" << max_vocabulary_size_per_gpu_
<< std::endl;
CudaDeviceContext context;
for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(); id++) {
context.set_device(embedding_data_.get_local_gpu(id).get_device_id());

// new GeneralBuffer objects
const std::shared_ptr<GeneralBuffer2<CudaAllocator>> &buf = embedding_data_.get_buffer(id);
embedding_optimizers_.emplace_back(max_vocabulary_size_per_gpu_,
embedding_data_.embedding_params_, buf);

// new hash table value vectors
{
Tensor2<float> tensor;
buf->reserve(
{max_vocabulary_size_per_gpu_, embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
hash_table_value_tensors_.push_back(tensor);
}

// new hash table value_index that get() from HashTable
{
Tensor2<size_t> tensor;
buf->reserve({1, embedding_data_.embedding_params_.get_universal_batch_size() *
embedding_data_.embedding_params_.max_feature_num},
&tensor);
hash_value_index_tensors_.push_back(tensor);
}

// new embedding features reduced by hash table values(results of forward)
{
Tensor2<TypeEmbeddingComp> tensor;
buf->reserve({embedding_data_.embedding_params_.get_universal_batch_size() *
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
embedding_feature_tensors_.push_back(tensor);
}

// new wgrad used by backward
{
Tensor2<TypeEmbeddingComp> tensor;
buf->reserve({embedding_data_.embedding_params_.get_batch_size(true) *
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
wgrad_tensors_.push_back(tensor);
}

// new temp tensors used by update_params
{
Tensor2<TypeHashKey> tensor;
buf->reserve({1, embedding_data_.embedding_params_.get_universal_batch_size() *
embedding_data_.embedding_params_.slot_num +
1},
&tensor);
row_offset_allreduce_tensors_.push_back(tensor);
}
{
Tensor2<TypeEmbeddingComp> tensor;
buf->reserve({embedding_data_.embedding_params_.get_universal_batch_size() *
embedding_data_.embedding_params_.slot_num,
embedding_data_.embedding_params_.embedding_vec_size},
&tensor);
utest_forward_temp_tensors_.push_back(tensor);
}
// init GenenralBuffers to do real allocation
#ifndef NDEBUG
HCTR_LOG_S(DEBUG, WORLD) << " max_feature_num_:"
<< embedding_data_.embedding_params_.max_feature_num << std::endl;
#endif
}

hash_tables_.resize(embedding_data_.get_resource_manager().get_local_gpu_count());
#pragma omp parallel num_threads(embedding_data_.get_resource_manager().get_local_gpu_count())
{
size_t id = omp_get_thread_num();
CudaDeviceContext context(embedding_data_.get_local_gpu(id).get_device_id());
// construct HashTable object: used to store hash table <key, value_index>
hash_tables_[id].reset(new NvHashTable(max_vocabulary_size_per_gpu_));
embedding_data_.get_buffer(id)->allocate();
}

for (size_t id = 0; id < embedding_data_.get_resource_manager().get_local_gpu_count(); id++) {
context.set_device(embedding_data_.get_local_gpu(id).get_device_id());
embedding_optimizers_[id].initialize(embedding_data_.get_local_gpu(id));

} // end of for(int id = 0; id < embedding_data_.get_local_gpu_count(); id++)

functors_.sync_all_gpus(embedding_data_.get_resource_manager());

} catch (const std::runtime_error &rt_err) {
HCTR_LOG_S(ERROR, WORLD) << rt_err.what() << std::endl;
throw;
}

return;
}

template <typename TypeHashKey, typename TypeEmbeddingComp>
DistributedSlotSparseEmbeddingHash<TypeHashKey, TypeEmbeddingComp>::
DistributedSlotSparseEmbeddingHash(const SparseTensors<TypeHashKey> &train_keys,
Expand Down
Loading

0 comments on commit c8e5d75

Please sign in to comment.