Skip to content

Commit

Permalink
Add init to env constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
avolkov-intel committed Jul 4, 2024
1 parent b1f2bf6 commit c960c03
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 62 deletions.
32 changes: 10 additions & 22 deletions cpp/daal/src/externals/core_threading_win_dll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,8 @@ typedef void (*_daal_run_task_group_t)(void * taskGroupPtr, daal::task * t);
typedef void (*_daal_wait_task_group_t)(void * taskGroupPtr);

typedef bool (*_daal_is_in_parallel_t)();
typedef void (*_daal_tbb_task_scheduler_free_t)(void *& globalControl);
typedef void (*_daal_tbb_task_scheduler_handle_free_t)(void *& schedulerHandle);
typedef size_t (*_setNumberOfThreads_t)(const size_t, void **);
typedef size_t (*_setSchedulerHandle_t)(void **);
// typedef void (*_daal_tbb_task_scheduler_free_t)(void *& globalControl);
typedef size_t (*_setNumberOfThreads_t)(const size_t); //, void **);
typedef void * (*_daal_threader_env_t)();

typedef void (*_daal_parallel_sort_int32_t)(int *, int *);
Expand Down Expand Up @@ -207,12 +205,10 @@ static _daal_del_task_group_t _daal_del_task_group_ptr = NULL;
static _daal_run_task_group_t _daal_run_task_group_ptr = NULL;
static _daal_wait_task_group_t _daal_wait_task_group_ptr = NULL;

static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL;
static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL;
static _daal_tbb_task_scheduler_handle_free_t _daal_tbb_task_scheduler_handle_free_ptr = NULL;
static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL;
static _setSchedulerHandle_t _setSchedulerHandle_ptr = NULL;
static _daal_threader_env_t _daal_threader_env_ptr = NULL;
static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL;
// static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL;
static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL;
static _daal_threader_env_t _daal_threader_env_ptr = NULL;

static _daal_parallel_sort_int32_t _daal_parallel_sort_int32_ptr = NULL;
static _daal_parallel_sort_uint64_t _daal_parallel_sort_uint64_ptr = NULL;
Expand Down Expand Up @@ -640,6 +636,7 @@ DAAL_EXPORT bool _daal_is_in_parallel()
return _daal_is_in_parallel_ptr();
}

/*
DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& init)
{
if (init == NULL)
Expand All @@ -660,25 +657,16 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& init)
}
return _daal_tbb_task_scheduler_free_ptr(init);
}
*/

DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& init)
{
load_daal_thr_dll();
if (_daal_tbb_task_scheduler_handle_free_ptr == NULL)
{
_daal_tbb_task_scheduler_handle_free_ptr = (_daal_tbb_task_scheduler_handle_free_t)load_daal_thr_func("_daal_tbb_task_scheduler_handle_free");
}
return _daal_tbb_task_scheduler_handle_free_ptr(init);
}

DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** init)
DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads /*, void ** init*/)
{
load_daal_thr_dll();
if (_setNumberOfThreads_ptr == NULL)
{
_setNumberOfThreads_ptr = (_setNumberOfThreads_t)load_daal_thr_func("_setNumberOfThreads");
}
return _setNumberOfThreads_ptr(numThreads, init);
return _setNumberOfThreads_ptr(numThreads /*, init*/);
}

DAAL_EXPORT size_t _setSchedulerHandle(void ** init)
Expand Down
25 changes: 15 additions & 10 deletions cpp/daal/src/services/env_detect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "src/externals/service_service.h"
#include "src/threading/threading.h"
#include "services/error_indexes.h"
#include <iostream>

#include "src/services/service_topo.h"
#include "src/threading/service_thread_pinner.h"
Expand Down Expand Up @@ -127,41 +128,45 @@ DAAL_EXPORT void daal::services::Environment::setDynamicLibraryThreadingTypeOnWi

DAAL_EXPORT daal::services::Environment::Environment() /* : _globalControl {}*/
{
std::cerr << "Environment constructor" << std::endl;
_env.cpuid_init_flag = false;
_env.cpuid = -1;
this->setDefaultExecutionContext(internal::CpuExecutionContext());
daal::services::Environment::initNumberOfThreads();
}

DAAL_EXPORT daal::services::Environment::Environment(const Environment & e) : daal::services::Environment::Environment() {}

DAAL_EXPORT void daal::services::Environment::initNumberOfThreads()
{
if (isInit) return;
// Initializes global oneapi::tbb::task_scheduler_handle object in oneDAL to prevent the unexpected
// destruction of the calling thread.
// When the oneapi::tbb::finalize function is called with an oneapi::tbb::task_scheduler_handle
// instance, it blocks the calling thread until the completion of all worker
// threads that were implicitly created by the library.
#if defined(TARGET_X86_64)
daal::setSchedulerHandle(&_schedulerHandle);
#endif
std::cerr << "Inside init" << std::endl;

/* if HT enabled - set _numThreads to physical cores num */
if (daal::internal::ServiceInst::serv_get_ht())
{
/* Number of cores = number of cpu packages * number of cores per cpu package */
int ncores = daal::internal::ServiceInst::serv_get_ncpus() * daal::internal::ServiceInst::serv_get_ncorespercpu();

/* Re-set number of threads if ncores is valid and different to _numThreads */
if ((ncores > 0) && (ncores < _daal_threader_get_max_threads()))

std::cerr << "Init with " << ncores << std::endl;
if (ncores > 0)
{
daal::services::Environment::setNumberOfThreads(ncores);
}
}
else
{
std::cerr << "Init with " << (_daal_threader_get_max_threads()) << std::endl;
daal::services::Environment::setNumberOfThreads(_daal_threader_get_max_threads());
}
isInit = true;
}

DAAL_EXPORT daal::services::Environment::~Environment()
{
std::cerr << "Env destructor" << std::endl;
daal::services::daal_free_buffers();
// _daal_tbb_task_scheduler_free(_globalControl);
}
Expand All @@ -178,7 +183,7 @@ void daal::services::Environment::_cpu_detect(int enable)
DAAL_EXPORT void daal::services::Environment::setNumberOfThreads(const size_t numThreads)
{
isInit = true;
daal::setNumberOfThreads(numThreads /*, &_globalControl*/);
daal::setNumberOfThreads(numThreads);
}

DAAL_EXPORT size_t daal::services::Environment::getNumberOfThreads() const
Expand Down
65 changes: 42 additions & 23 deletions cpp/daal/src/threading/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,43 @@
#include <tbb/global_control.h>
#include <tbb/task_arena.h>
#include "services/daal_atomic_int.h"
#include <iostream>

#if defined(TBB_INTERFACE_VERSION) && TBB_INTERFACE_VERSION >= 12002
#include <tbb/task.h>
#endif

namespace daal
{
ThreaderEnvironment::ThreaderEnvironment() : _numberOfThreads(1), _taskArena(nullptr)
{
std::cout << "ThreaderEnv constructor" << std::endl;
}
ThreaderEnvironment::~ThreaderEnvironment()
{
std::cerr << "ThreaderEnv destructor" << std::endl;
if (_taskArena)
{
delete reinterpret_cast<tbb::task_arena *>(_taskArena);
_taskArena = nullptr;
}
}
void ThreaderEnvironment::setNumberOfThreads(size_t value)
{
std::cerr << "setNumberOfThreads from " << (_numberOfThreads) << " to " << value << std::endl;
if (_taskArena)
{
delete reinterpret_cast<tbb::task_arena *>(_taskArena);
_taskArena = nullptr;
}
if (value > 1)
{
_taskArena = reinterpret_cast<void *>(new tbb::task_arena(value));
}
_numberOfThreads = value;
}
} // namespace daal

using namespace daal::services;

DAAL_EXPORT void * _threaded_scalable_malloc(const size_t size, const size_t alignment)
Expand All @@ -52,24 +84,14 @@ DAAL_EXPORT void _threaded_scalable_free(void * ptr)
scalable_aligned_free(ptr);
}

DAAL_EXPORT void _daal_tbb_task_arena_free(void *& taskArena)
{
// void* taskArena = daal::threader_env()->getTaskArena();
if (taskArena)
{
delete reinterpret_cast<tbb::task_arena *>(taskArena);
taskArena = nullptr;
}
}

DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl)
{
if (globalControl)
{
delete reinterpret_cast<tbb::global_control *>(globalControl);
globalControl = nullptr;
}
}
// DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl)
// {
// if (globalControl)
// {
// delete reinterpret_cast<tbb::global_control *>(globalControl);
// globalControl = nullptr;
// }
// }

DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads)
{
Expand All @@ -79,13 +101,11 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads)
{
const size_t maxNumThreads = _daal_threader_get_max_threads();
const size_t limitedNumThreads = numThreads < maxNumThreads ? numThreads : maxNumThreads;
void *& taskArena = daal::threader_env()->getTaskArena();
_daal_tbb_task_arena_free(taskArena);
taskArena = reinterpret_cast<void *>(new tbb::task_arena(limitedNumThreads));
std::cerr << "_set nthreads " << numThreads << "(max " << maxNumThreads << ")" << std::endl;
daal::threader_env()->setNumberOfThreads(limitedNumThreads);
return limitedNumThreads;
}
_daal_tbb_task_arena_free(daal::threader_env()->getTaskArena());
std::cerr << "_set nthreads 1" << std::endl;
daal::threader_env()->setNumberOfThreads(1);
return 1;
}
Expand Down Expand Up @@ -215,7 +235,6 @@ DAAL_EXPORT int64_t _daal_parallel_reduce_int32_int64(int32_t n, int64_t init, c
if (daal::threader_env()->getNumberOfThreads() > 1)
{
tbb::task_arena * taskArena = reinterpret_cast<tbb::task_arena *>(daal::threader_env()->getTaskArena());
// ?????
return taskArena->execute([&] {
return tbb::parallel_reduce(
tbb::blocked_range<int32_t>(0, n), init,
Expand Down
12 changes: 5 additions & 7 deletions cpp/daal/src/threading/threading.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ extern "C"
DAAL_EXPORT void _daal_del_task_group(void * taskGroupPtr);
DAAL_EXPORT void _daal_run_task_group(void * taskGroupPtr, daal::task * t);
DAAL_EXPORT void _daal_wait_task_group(void * taskGroupPtr);

DAAL_EXPORT void _daal_tbb_task_arena_free(void *& taskArena);
DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl);
// DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl);
DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads);

DAAL_EXPORT void * _daal_threader_env();
Expand Down Expand Up @@ -166,11 +164,11 @@ inline void threaded_scalable_free(void * ptr)
class ThreaderEnvironment
{
public:
ThreaderEnvironment() : _numberOfThreads(1 /*_daal_threader_get_max_threads()*/) {}
~ThreaderEnvironment() { _daal_tbb_task_arena_free(_taskArena); }
ThreaderEnvironment();
~ThreaderEnvironment();
size_t getNumberOfThreads() const { return _numberOfThreads; }
void setNumberOfThreads(size_t value) { _numberOfThreads = value; }
void *& getTaskArena() { return _taskArena; }
void * getTaskArena() const { return _taskArena; };
void setNumberOfThreads(size_t value);

private:
size_t _numberOfThreads;
Expand Down

0 comments on commit c960c03

Please sign in to comment.