Skip to content

Commit

Permalink
Revert "[Submodule] Remove deprecated USE_TBB option and TBB submodule (
Browse files Browse the repository at this point in the history
pytorch#127051)"

This reverts commit 699db79.

Reverted pytorch#127051 on behalf of https://github.com/PaliC due to This PR needs to be synced using the import button as there is a bug in our diff train ([comment](pytorch#127051 (comment)))
  • Loading branch information
pytorchmergebot committed May 30, 2024
1 parent 1abcac9 commit 67739d8
Show file tree
Hide file tree
Showing 34 changed files with 863 additions and 19 deletions.
5 changes: 4 additions & 1 deletion .ci/pytorch/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ if [[ "$BUILD_ENVIRONMENT" == *cuda11* ]]; then
fi
fi

if [[ ${BUILD_ENVIRONMENT} == *"parallelnative"* ]]; then
if [[ ${BUILD_ENVIRONMENT} == *"paralleltbb"* ]]; then
export ATEN_THREADING=TBB
export USE_TBB=1
elif [[ ${BUILD_ENVIRONMENT} == *"parallelnative"* ]]; then
export ATEN_THREADING=NATIVE
fi

Expand Down
18 changes: 18 additions & 0 deletions .ci/pytorch/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ test_aten() {
${SUDO} ln -sf "$TORCH_LIB_DIR"/libmkldnn* "$TEST_BASE_DIR"
${SUDO} ln -sf "$TORCH_LIB_DIR"/libnccl* "$TEST_BASE_DIR"
${SUDO} ln -sf "$TORCH_LIB_DIR"/libtorch* "$TEST_BASE_DIR"
${SUDO} ln -sf "$TORCH_LIB_DIR"/libtbb* "$TEST_BASE_DIR"

ls "$TEST_BASE_DIR"
aten/tools/run_tests.sh "$TEST_BASE_DIR"
Expand All @@ -800,6 +801,21 @@ test_without_numpy() {
popd
}

# pytorch extensions require including torch/extension.h which includes all.h
# which includes utils.h which includes Parallel.h.
# So you can call for instance parallel_for() from your extension,
# but the compilation will fail because of Parallel.h has only declarations
# and definitions are conditionally included Parallel.h(see last lines of Parallel.h).
# I tried to solve it #39612 and #39881 by including Config.h into Parallel.h
# But if Pytorch is built with TBB it provides Config.h
# that has AT_PARALLEL_NATIVE_TBB=1(see #3961 or #39881) and it means that if you include
# torch/extension.h which transitively includes Parallel.h
# which transitively includes tbb.h which is not available!
if [[ "${BUILD_ENVIRONMENT}" == *tbb* ]]; then
sudo mkdir -p /usr/include/tbb
sudo cp -r "$PWD"/third_party/tbb/include/tbb/* /usr/include/tbb
fi

test_libtorch() {
local SHARD="$1"

Expand All @@ -813,6 +829,7 @@ test_libtorch() {
ln -sf "$TORCH_LIB_DIR"/libc10* "$TORCH_BIN_DIR"
ln -sf "$TORCH_LIB_DIR"/libshm* "$TORCH_BIN_DIR"
ln -sf "$TORCH_LIB_DIR"/libtorch* "$TORCH_BIN_DIR"
ln -sf "$TORCH_LIB_DIR"/libtbb* "$TORCH_BIN_DIR"
ln -sf "$TORCH_LIB_DIR"/libnvfuser* "$TORCH_BIN_DIR"

export CPP_TESTS_DIR="${TORCH_BIN_DIR}"
Expand Down Expand Up @@ -949,6 +966,7 @@ test_rpc() {
# test reporting process to function as expected.
ln -sf "$TORCH_LIB_DIR"/libtorch* "$TORCH_BIN_DIR"
ln -sf "$TORCH_LIB_DIR"/libc10* "$TORCH_BIN_DIR"
ln -sf "$TORCH_LIB_DIR"/libtbb* "$TORCH_BIN_DIR"

CPP_TESTS_DIR="${TORCH_BIN_DIR}" python test/run_test.py --cpp --verbose -i cpp/test_cpp_rpc
}
Expand Down
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
ignore = dirty
path = third_party/foxi
url = https://github.com/houseroad/foxi.git
[submodule "third_party/tbb"]
path = third_party/tbb
url = https://github.com/01org/tbb
branch = tbb_2018
[submodule "android/libs/fbjni"]
ignore = dirty
path = android/libs/fbjni
Expand Down
14 changes: 5 additions & 9 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ filegroup(
data = [":generate-code"],
)

exports_files(
srcs = ["aten/src/ATen/cpu/tbb/extra/version_string.ver.in"],
)

# ATen
filegroup(
name = "aten_base_cpp",
Expand Down Expand Up @@ -271,6 +275,7 @@ header_template_rule(
"@AT_BUILD_WITH_LAPACK@": "1",
"@AT_PARALLEL_OPENMP@": "0",
"@AT_PARALLEL_NATIVE@": "1",
"@AT_PARALLEL_NATIVE_TBB@": "0",
"@AT_BLAS_F2C@": "0",
"@AT_BLAS_USE_CBLAS_DOT@": "1",
},
Expand Down Expand Up @@ -354,9 +359,6 @@ cc_library(
":aten_src_ATen_config",
] + generated_cpu_cpp + aten_ufunc_generated_cpu_sources("aten/src/ATen/{}"),
copts = ATEN_COPTS,
linkopts = [
"-ldl",
],
data = if_cuda(
[":libcaffe2_nvrtc.so"],
[],
Expand Down Expand Up @@ -770,9 +772,6 @@ cc_library(
],
)) + torch_sources,
copts = TORCH_COPTS,
linkopts = [
"-lrt",
],
defines = [
"CAFFE2_NIGHTLY_VERSION=20200115",
],
Expand All @@ -792,9 +791,6 @@ cc_library(
cc_library(
name = "shm",
srcs = glob(["torch/lib/libshm/*.cpp"]),
linkopts = [
"-lrt",
],
deps = [
":torch",
],
Expand Down
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ cmake_dependent_option(
cmake_dependent_option(
USE_TENSORPIPE "Use TensorPipe. Only available if USE_DISTRIBUTED is on." ON
"USE_DISTRIBUTED" OFF)
option(USE_TBB "Use TBB (Deprecated)" OFF)
cmake_dependent_option(
USE_SYSTEM_TBB "Use system-provided Intel TBB." OFF "USE_TBB" OFF)
option(ONNX_ML "Enable traditional ONNX ML API." ON)
option(HAVE_SOVERSION "Whether to add SOVERSION to the shared objects" OFF)
option(BUILD_LIBTORCH_CPU_WITH_DEBUG
Expand Down Expand Up @@ -480,6 +483,9 @@ if(USE_SYSTEM_LIBS)
if(USE_NCCL)
set(USE_SYSTEM_NCCL ON)
endif()
if(USE_TBB)
set(USE_SYSTEM_TBB ON)
endif()
endif()

# Used when building Caffe2 through setup.py
Expand Down
10 changes: 10 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,16 @@ new_local_repository(
path = "third_party/opentelemetry-cpp",
)

new_patched_local_repository(
name = "tbb",
build_file = "//third_party:tbb.BUILD",
patch_strip = 1,
patches = [
"@//third_party:tbb.patch",
],
path = "third_party/tbb",
)

new_local_repository(
name = "tensorpipe",
build_file = "//third_party:tensorpipe.BUILD",
Expand Down
10 changes: 10 additions & 0 deletions aten/src/ATen/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,16 @@ endif()

list(APPEND ATen_CPU_INCLUDE ${CMAKE_CURRENT_SOURCE_DIR}/..)

if(USE_TBB)
if(USE_SYSTEM_TBB)
message("ATen is compiled with system-provided Intel TBB.")
else()
message("ATen is compiled with Intel TBB (${TBB_ROOT_DIR}).")
endif()
list(APPEND ATen_CPU_INCLUDE ${TBB_INCLUDE_DIR})
list(APPEND ATen_CPU_DEPENDENCY_LIBS TBB::tbb)
endif()

if(BLAS_FOUND)
if($ENV{TH_BINARY_BUILD})
message(STATUS "TH_BINARY_BUILD detected. Enabling special linkage.")
Expand Down
1 change: 1 addition & 0 deletions aten/src/ATen/Config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@
#define AT_BUILD_WITH_LAPACK() @AT_BUILD_WITH_LAPACK@
#define AT_PARALLEL_OPENMP @AT_PARALLEL_OPENMP@
#define AT_PARALLEL_NATIVE @AT_PARALLEL_NATIVE@
#define AT_PARALLEL_NATIVE_TBB @AT_PARALLEL_NATIVE_TBB@
#define AT_BLAS_F2C() @AT_BLAS_F2C@
#define AT_BLAS_USE_CBLAS_DOT() @AT_BLAS_USE_CBLAS_DOT@
2 changes: 2 additions & 0 deletions aten/src/ATen/Parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ TORCH_API int intraop_default_num_threads();
#include <ATen/ParallelOpenMP.h> // IWYU pragma: keep
#elif AT_PARALLEL_NATIVE
#include <ATen/ParallelNative.h> // IWYU pragma: keep
#elif AT_PARALLEL_NATIVE_TBB
#include <ATen/ParallelNativeTBB.h> // IWYU pragma: keep
#endif

#include <ATen/Parallel-inl.h> // IWYU pragma: keep
2 changes: 2 additions & 0 deletions aten/src/ATen/ParallelCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ std::string get_parallel_info() {
ss << "OpenMP";
#elif AT_PARALLEL_NATIVE
ss << "native thread pool";
#elif AT_PARALLEL_NATIVE_TBB
ss << "native thread pool and TBB";
#endif
#ifdef C10_MOBILE
ss << " [mobile]";
Expand Down
115 changes: 115 additions & 0 deletions aten/src/ATen/ParallelNativeTBB.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#include <ATen/Config.h>
#if AT_PARALLEL_NATIVE_TBB
#include <ATen/Parallel.h>
#include <ATen/ParallelFuture.h>
#include <ATen/PTThreadPool.h>

#include <atomic>
#include <mutex>

#include <tbb/tbb.h>
#define TBB_PREVIEW_GLOBAL_CONTROL 1
#include <tbb/global_control.h>

#ifdef _OPENMP
#include <omp.h>
#endif

#if AT_MKL_ENABLED()
#include <mkl.h>
#endif

namespace at {

namespace {
static thread_local tbb::task_group tg_;
thread_local int this_thread_id{0};

std::mutex global_thread_mutex_;
std::shared_ptr<tbb::global_control> global_thread_limit_ = nullptr;
std::atomic<int> num_intraop_threads_{-1};

void _internal_set_num_threads(int nthreads) {
TORCH_INTERNAL_ASSERT(nthreads > 0);
{
std::unique_lock<std::mutex> lk(global_thread_mutex_);
// This is an antipattern and we shouldn't be constraining the number of
// threads in library code.
// TODO: Think of a smarter way to leverage tbb::thread_arena to limit the
// number of slots instead of the number of threads.
global_thread_limit_ = std::make_shared<tbb::global_control>(
tbb::global_control::max_allowed_parallelism, nthreads);
num_intraop_threads_.store(nthreads);
}
}
}

void init_num_threads() {
#ifdef _OPENMP
omp_set_num_threads(1);
#endif

#if AT_MKL_ENABLED()
mkl_set_num_threads(1);
#endif

int nthreads = num_intraop_threads_.load();
if (nthreads < 0) {
nthreads = intraop_default_num_threads();
}
_internal_set_num_threads(nthreads);
}

void set_num_threads(int nthreads) {
TORCH_CHECK(nthreads > 0);

_internal_set_num_threads(nthreads);
}

int get_num_threads() {
at::internal::lazy_init_num_threads();
return tbb::global_control::active_value(
tbb::global_control::max_allowed_parallelism);
}

int get_thread_num() {
return this_thread_id;
}

namespace internal {
void set_thread_num(int id) {
this_thread_id = id;
}
}

bool in_parallel_region() {
return tbb::this_task_arena::current_thread_index() >= 0;
}

void intraop_launch(std::function<void()> func) {
if (get_num_threads() > 1) {
tg_.run(func);
} else {
func();
}
}

c10::intrusive_ptr<c10::ivalue::Future> intraop_launch_future(
std::function<void()> func) {
auto future = c10::make_intrusive<c10::ivalue::Future>(NoneType::get());
if (get_num_threads() > 1) {
tg_.run(
[func, future]() {
func();
future->markCompleted();
}
);
} else {
func();
future->markCompleted();
}
return future;
}

} // namespace at
#endif
52 changes: 52 additions & 0 deletions aten/src/ATen/ParallelNativeTBB.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <atomic>
#include <cstddef>
#include <exception>

#include <c10/util/Exception.h>

#ifdef _WIN32
#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#endif
#include <tbb/tbb.h>

#define INTRA_OP_PARALLEL

namespace at::internal {

template <typename F>
inline void invoke_parallel(
const int64_t begin,
const int64_t end,
const int64_t grain_size,
const F& f) {
// Choose number of tasks based on grain size and number of threads.
int64_t chunk_size = divup((end - begin), get_num_threads());
// Make sure each task is at least grain_size size.
chunk_size = std::max(grain_size, chunk_size);

std::atomic_flag err_flag = ATOMIC_FLAG_INIT;
std::exception_ptr eptr;
tbb::parallel_for(
tbb::blocked_range<int64_t>(begin, end, chunk_size),
[&eptr, &err_flag, f](const tbb::blocked_range<int64_t>& r) {
try {
internal::ThreadIdGuard tid_guard(
tbb::this_task_arena::current_thread_index());
f(r.begin(), r.end());
} catch (...) {
if (!err_flag.test_and_set()) {
eptr = std::current_exception();
}
}
},
tbb::static_partitioner{});
if (eptr) {
std::rethrow_exception(eptr);
}
}

} // namespace at::internal
2 changes: 1 addition & 1 deletion aten/src/ATen/ParallelThreadPoolNative.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include <ATen/Config.h>
#if AT_PARALLEL_OPENMP || AT_PARALLEL_NATIVE
#if AT_PARALLEL_OPENMP || AT_PARALLEL_NATIVE || AT_PARALLEL_NATIVE_TBB
#include <ATen/Parallel.h>
#include <ATen/PTThreadPool.h>
#include <ATen/ThreadLocalState.h>
Expand Down
Loading

0 comments on commit 67739d8

Please sign in to comment.