Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor TBB layer #2841

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions cpp/daal/include/services/env_detect.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,6 @@ class DAAL_EXPORT Environment : public Base
void initNumberOfThreads();

env _env;
// Pointer to the oneapi::tbb::task_scheduler_handle class object, global for oneDAL.
// The oneapi::tbb::task_scheduler_handle and the oneapi::tbb::finalize function
// allow user to wait for completion of worker threads.
void * _schedulerHandle;
void * _globalControl;
SharedPtr<services::internal::sycl::ExecutionContextIface> _executionContext;
};
} // namespace interface1
Expand Down
59 changes: 6 additions & 53 deletions cpp/daal/src/externals/core_threading_win_dll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,7 @@ typedef void (*_daal_run_task_group_t)(void * taskGroupPtr, daal::task * t);
typedef void (*_daal_wait_task_group_t)(void * taskGroupPtr);

typedef bool (*_daal_is_in_parallel_t)();
typedef void (*_daal_tbb_task_scheduler_free_t)(void *& globalControl);
typedef void (*_daal_tbb_task_scheduler_handle_free_t)(void *& schedulerHandle);
typedef size_t (*_setNumberOfThreads_t)(const size_t, void **);
typedef size_t (*_setSchedulerHandle_t)(void **);
typedef size_t (*_setNumberOfThreads_t)(const size_t);
typedef void * (*_daal_threader_env_t)();

typedef void (*_daal_parallel_sort_int32_t)(int *, int *);
Expand Down Expand Up @@ -207,12 +204,9 @@ static _daal_del_task_group_t _daal_del_task_group_ptr = NULL;
static _daal_run_task_group_t _daal_run_task_group_ptr = NULL;
static _daal_wait_task_group_t _daal_wait_task_group_ptr = NULL;

static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL;
static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL;
static _daal_tbb_task_scheduler_handle_free_t _daal_tbb_task_scheduler_handle_free_ptr = NULL;
static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL;
static _setSchedulerHandle_t _setSchedulerHandle_ptr = NULL;
static _daal_threader_env_t _daal_threader_env_ptr = NULL;
static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL;
static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL;
static _daal_threader_env_t _daal_threader_env_ptr = NULL;

static _daal_parallel_sort_int32_t _daal_parallel_sort_int32_ptr = NULL;
static _daal_parallel_sort_uint64_t _daal_parallel_sort_uint64_ptr = NULL;
Expand Down Expand Up @@ -640,55 +634,14 @@ DAAL_EXPORT bool _daal_is_in_parallel()
return _daal_is_in_parallel_ptr();
}

DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& init)
{
if (init == NULL)
{
// If threading library was not opened, there is nothing to free,
// so we do not need to load threading library.
// Moreover, loading threading library in the Environment destructor
// results in a crush because of the use of Wintrust library after it was unloaded.
// This happens due to undefined order of static objects deinitialization
// like Environment, and dependent libraries.
return;
}

load_daal_thr_dll();
if (_daal_tbb_task_scheduler_free_ptr == NULL)
{
_daal_tbb_task_scheduler_free_ptr = (_daal_tbb_task_scheduler_free_t)load_daal_thr_func("_daal_tbb_task_scheduler_free");
}
return _daal_tbb_task_scheduler_free_ptr(init);
}

DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& init)
{
load_daal_thr_dll();
if (_daal_tbb_task_scheduler_handle_free_ptr == NULL)
{
_daal_tbb_task_scheduler_handle_free_ptr = (_daal_tbb_task_scheduler_handle_free_t)load_daal_thr_func("_daal_tbb_task_scheduler_handle_free");
}
return _daal_tbb_task_scheduler_handle_free_ptr(init);
}

DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** init)
DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads)
{
load_daal_thr_dll();
if (_setNumberOfThreads_ptr == NULL)
{
_setNumberOfThreads_ptr = (_setNumberOfThreads_t)load_daal_thr_func("_setNumberOfThreads");
}
return _setNumberOfThreads_ptr(numThreads, init);
}

DAAL_EXPORT size_t _setSchedulerHandle(void ** init)
{
load_daal_thr_dll();
if (_setSchedulerHandle_ptr == NULL)
{
_setSchedulerHandle_ptr = (_setSchedulerHandle_t)load_daal_thr_func("_setSchedulerHandle");
}
return _setSchedulerHandle_ptr(init);
return _setNumberOfThreads_ptr(numThreads);
}

DAAL_EXPORT void * _daal_threader_env()
Expand Down
30 changes: 16 additions & 14 deletions cpp/daal/src/services/env_detect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "src/externals/service_service.h"
#include "src/threading/threading.h"
#include "services/error_indexes.h"
#include <iostream>

#include "src/services/service_topo.h"
#include "src/threading/service_thread_pinner.h"
Expand Down Expand Up @@ -125,43 +126,47 @@ DAAL_EXPORT void daal::services::Environment::setDynamicLibraryThreadingTypeOnWi
initNumberOfThreads();
}

DAAL_EXPORT daal::services::Environment::Environment() : _schedulerHandle {}, _globalControl {}
DAAL_EXPORT daal::services::Environment::Environment()
{
std::cout << "Env constructor" << std::endl;
_env.cpuid_init_flag = false;
_env.cpuid = -1;
this->setDefaultExecutionContext(internal::CpuExecutionContext());
initNumberOfThreads();
}

DAAL_EXPORT daal::services::Environment::Environment(const Environment & e) : daal::services::Environment::Environment() {}

DAAL_EXPORT void daal::services::Environment::initNumberOfThreads()
{
if (isInit) return;
// Initializes global oneapi::tbb::task_scheduler_handle object in oneDAL to prevent the unexpected
// destruction of the calling thread.
// When the oneapi::tbb::finalize function is called with an oneapi::tbb::task_scheduler_handle
// instance, it blocks the calling thread until the completion of all worker
// threads that were implicitly created by the library.
#if defined(TARGET_X86_64)
daal::setSchedulerHandle(&_schedulerHandle);
#endif
std::cout << "Inside init number of threads" << std::endl;

/* if HT enabled - set _numThreads to physical cores num */
if (daal::internal::ServiceInst::serv_get_ht())
{
/* Number of cores = number of cpu packages * number of cores per cpu package */
int ncores = daal::internal::ServiceInst::serv_get_ncpus() * daal::internal::ServiceInst::serv_get_ncorespercpu();

/* Re-set number of threads if ncores is valid and different to _numThreads */
if ((ncores > 0) && (ncores < _daal_threader_get_max_threads()))

if (ncores > 0)
{
std::cout << "(ht enabled) init with " << ncores << std::endl;
daal::services::Environment::setNumberOfThreads(ncores);
}
}
else
{
std::cout << "(ht disabled) init with " << _daal_threader_get_max_threads() << std::endl;
daal::services::Environment::setNumberOfThreads(_daal_threader_get_max_threads());
}
isInit = true;
}

DAAL_EXPORT daal::services::Environment::~Environment()
{
std::cout << "Env destructor" << std::endl;
daal::services::daal_free_buffers();
}

Expand All @@ -177,10 +182,7 @@ void daal::services::Environment::_cpu_detect(int enable)
DAAL_EXPORT void daal::services::Environment::setNumberOfThreads(const size_t numThreads)
{
isInit = true;
#if defined(TARGET_X86_64)
daal::setSchedulerHandle(&_schedulerHandle);
#endif
daal::setNumberOfThreads(numThreads, &_globalControl);
daal::setNumberOfThreads(numThreads);
}

DAAL_EXPORT size_t daal::services::Environment::getNumberOfThreads() const
Expand Down
Loading
Loading