Skip to content

interim release to help with future binary compatibility #234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: RcppParallel
Type: Package
Title: Parallel Programming Tools for 'Rcpp'
Version: 5.1.9.9000
Version: 5.1.10.9000
Authors@R: c(
person("Kevin", "Ushey", role = c("aut", "cre"), email = "[email protected]",
comment = c(ORCID = "0000-0003-2880-7407")),
6 changes: 6 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -9,6 +9,12 @@
implementation. In practice, this implies that RcppParallel will now only
provide a TBB backend with R (>= 4.2.0).

## RcppParallel 5.1.10

* Fixed an issue where packages linking to RcppParallel could inadverently
depend on internals of the TBB library available during compilation, even
if the package did not explicitly use TBB itself.

## RcppParallel 5.1.9

* RcppParallel no longer passes `-rpath` when building / linking on Windows.
18 changes: 9 additions & 9 deletions R/flags.R
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@

#' Compilation flags for RcppParallel
#'
#'
#' Output the compiler or linker flags required to build against RcppParallel.
#'
#'
#' These functions are typically called from `Makevars` as follows:
#'
#'
#' ```
#' PKG_LIBS += $(shell "${R_HOME}/bin/Rscript" -e "RcppParallel::LdFlags()")
#' ```
#'
#'
#' On Windows, the flags ensure that the package links with the built-in TBB
#' library. On Linux and macOS, the output is empty, because TBB is loaded
#' dynamically on load by `RcppParallel`.
#'
#'
#' \R packages using RcppParallel should also add the following to their
#' `NAMESPACE` file:
#'
#'
#' ```
#' importFrom(RcppParallel, RcppParallelLibs)
#' ```
#'
#'
#' This is necessary to ensure that \pkg{RcppParallel} (and so, TBB) is loaded
#' and available.
#'
#'
#' @name flags
#' @rdname flags
#' @aliases RcppParallelLibs LdFlags CxxFlags
#'
#'
#' @return Returns \code{NULL}, invisibly. These functions are called for
#' their side effects (writing the associated flags to stdout).
#'
38 changes: 19 additions & 19 deletions R/tbb.R
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@

#' Get the Path to a TBB Library
#'
#'
#' Retrieve the path to a TBB library. This can be useful for \R packages
#' using RcppParallel that wish to use, or re-use, the version of TBB that
#' RcppParallel has been configured to use.
#'
#'
#' @param name
#' The name of the TBB library to be resolved. Normally, this is one of
#' `tbb`, `tbbmalloc`, or `tbbmalloc_proxy`. When `NULL`, the library
#' path containing the TBB libraries is returned instead.
#'
#'
#' @export
tbbLibraryPath <- function(name = NULL) {

# library paths for different OSes
sysname <- Sys.info()[["sysname"]]

# find root for TBB install
tbbRoot <- Sys.getenv("TBB_LIB", unset = tbbRoot())
if (is.null(name))
return(tbbRoot)

# form library names
tbbLibNames <- list(
"Darwin" = paste0("lib", name, ".dylib"),
"Windows" = paste0( name, ".dll"),
"SunOS" = paste0("lib", name, ".so"),
"Linux" = paste0("lib", name, c(".so.2", ".so"))
)

# skip systems that we know not to be compatible
isCompatible <- !is_sparc() && !is.null(tbbLibNames[[sysname]])
if (!isCompatible)
return(NULL)

# find the request library (if any)
libNames <- tbbLibNames[[sysname]]
for (libName in libNames) {
@@ -49,12 +49,12 @@ tbbLibraryPath <- function(name = NULL) {
return(tbbName)

}

}

tbbCxxFlags <- function() {
if (!TBB_ENABLED)

if (!TBB_ENABLED)
return("-DRCPP_PARALLEL_USE_TBB=0")

flags <- c("-DRCPP_PARALLEL_USE_TBB=1")
@@ -66,7 +66,7 @@ tbbCxxFlags <- function() {
flags <- c(flags, "-DTBB_USE_GCC_BUILTINS")
}
}

# if TBB_INC is set, apply those library paths
tbbInc <- Sys.getenv("TBB_INC", unset = TBB_INC)
if (!file.exists(tbbInc)) {
@@ -86,10 +86,10 @@ tbbCxxFlags <- function() {
flags <- c(flags, paste0("-I", asBuildPath(tbbInc)))

}

# return flags as string
paste(flags, collapse = " ")

}

# Return the linker flags required for TBB on this platform
@@ -120,20 +120,20 @@ tbbLdFlags <- function() {
fmt <- "-L%s -l%s -l%s"
return(sprintf(fmt, asBuildPath(tbbLibraryPath()), TBB_NAME, TBB_MALLOC_NAME))
}

# nothing required on other platforms
""

}

tbbRoot <- function() {

if (nzchar(TBB_LIB))
return(TBB_LIB)

rArch <- .Platform$r_arch
parts <- c("lib", if (nzchar(rArch)) rArch)
libDir <- paste(parts, collapse = "/")
system.file(libDir, package = "RcppParallel")

}
2 changes: 1 addition & 1 deletion R/zzz.R
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ loadTbbLibrary <- function(name) {

# load RcppParallel library if available
if (.Platform$OS.type != "windows") {
.dllInfo <<- library.dynam("RcppParallel", pkgname, libname)
.dllInfo <<- library.dynam("RcppParallel", pkgname, libname, local = FALSE)
}

}
5 changes: 4 additions & 1 deletion RcppParallel.Rproj
Original file line number Diff line number Diff line change
@@ -13,8 +13,11 @@ Encoding: UTF-8
RnwWeave: Sweave
LaTeX: pdfLaTeX

AutoAppendNewline: Yes
StripTrailingWhitespace: Yes

BuildType: Package
PackageCleanBeforeInstall: No
PackageInstallArgs: --with-keep.source --clean
PackageInstallArgs: --with-keep.source
PackageCheckArgs: --as-cran
PackageRoxygenize: rd,collate,namespace
14 changes: 7 additions & 7 deletions inst/include/RcppParallel.h
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
#include "RcppParallel/TinyThread.h"

// Use TBB only where it's known to compile and work correctly
// (NOTE: Windows TBB is temporarily opt-in for packages for
// (NOTE: Windows TBB is temporarily opt-in for packages for
// compatibility with CRAN packages not previously configured
// to link to TBB in Makevars.win)
#ifndef RCPP_PARALLEL_USE_TBB
@@ -31,14 +31,14 @@
namespace RcppParallel {

inline void parallelFor(std::size_t begin,
std::size_t end,
std::size_t end,
Worker& worker,
std::size_t grainSize = 1,
int numThreads = -1)
{
grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, 1u);
grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, std::size_t(1));
numThreads = resolveValue("RCPP_PARALLEL_NUM_THREADS", numThreads, -1);

#if RCPP_PARALLEL_USE_TBB
if (internal::backend() == internal::BACKEND_TBB)
tbbParallelFor(begin, end, worker, grainSize, numThreads);
@@ -51,14 +51,14 @@ inline void parallelFor(std::size_t begin,

template <typename Reducer>
inline void parallelReduce(std::size_t begin,
std::size_t end,
std::size_t end,
Reducer& reducer,
std::size_t grainSize = 1,
int numThreads = -1)
{
grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, 1);
grainSize = resolveValue("RCPP_PARALLEL_GRAIN_SIZE", grainSize, std::size_t(1));
numThreads = resolveValue("RCPP_PARALLEL_NUM_THREADS", numThreads, -1);

#if RCPP_PARALLEL_USE_TBB
if (internal::backend() == internal::BACKEND_TBB)
tbbParallelReduce(begin, end, reducer, grainSize, numThreads);
40 changes: 26 additions & 14 deletions inst/include/RcppParallel/Common.h
Original file line number Diff line number Diff line change
@@ -19,51 +19,63 @@ inline int resolveValue(const char* envvar,

if (useRequestedValue)
return requestedValue;

// otherwise, try reading the default from associated envvar
// if the environment variable is unset, use the default
const char* var = getenv(envvar);
if (var == NULL)
return defaultValue;

// try to convert the string to a number
// if an error occurs during conversion, just use default
errno = 0;
char* end;
long value = strtol(var, &end, 10);

// check for conversion failure
if (end == var || *end != '\0' || errno == ERANGE)
return defaultValue;
// okay, return the parsed environment variable value

// okay, return the parsed environment variable value
return value;
}

// Tag type used for disambiguating splitting constructors
struct Split {};

// Work executed within a background thread. We implement dynamic
// dispatch using vtables so we can have a stable type to cast
// to from the void* passed to the worker thread (required because
// the tinythreads interface allows to pass only a void* to the
// thread main rather than a generic type / template)

struct Worker
{
struct Worker
{
// construct and destruct (delete virtually)
Worker() {}
virtual ~Worker() {}

// dispatch work over a range of values
virtual void operator()(std::size_t begin, std::size_t end) = 0;

// disable copying and assignment
virtual void operator()(std::size_t begin, std::size_t end) = 0;

private:
// disable copying and assignment
Worker(const Worker&);
void operator=(const Worker&);
};

// Tag type used for disambiguating splitting constructors
// Used for controlling the stack size for threads / tasks within a scope.
class ThreadStackSizeControl
{
public:
ThreadStackSizeControl();
~ThreadStackSizeControl();

private:
// COPYING: not copyable
ThreadStackSizeControl(const ThreadStackSizeControl&);
ThreadStackSizeControl& operator=(const ThreadStackSizeControl&);
};

struct Split {};

} // namespace RcppParallel

294 changes: 81 additions & 213 deletions inst/include/RcppParallel/TBB.h
Original file line number Diff line number Diff line change
@@ -44,246 +44,114 @@ class task_scheduler_init {

namespace RcppParallel {

namespace {
// This class is primarily used to implement type erasure. The goals here were:
//
// 1. Hide the tbb symbols / implementation details from client R packages.
// That is, they should get the tools they need only via RcppParallel.
//
// 2. Do this in a way that preserves binary compatibility with pre-existing
// classes that make use of parallelReduce().
//
// 3. Ensure that those packages, when re-compiled without source changes,
// can still function as expected.
//
// The downside here is that all the indirection through std::function<>
// and the requirement for RTTI is probably expensive, but I couldn't find
// a better way forward that could also preserve binary compatibility with
// existing pre-built pacakges.
//
// Hopefully, in a future release, we can do away with this wrapper, once
// packages have been rebuilt and no longer implicitly depend on TBB internals.
struct ReducerWrapper {

template <typename T>
ReducerWrapper(T* reducer)
{
self_ = reinterpret_cast<void*>(reducer);
owned_ = false;

struct TBBWorker
{
explicit TBBWorker(Worker& worker) : worker_(worker) {}

void operator()(const tbb::blocked_range<size_t>& r) const {
worker_(r.begin(), r.end());
}
work_ = [&](void* self, std::size_t begin, std::size_t end)
{
(*reinterpret_cast<T*>(self))(begin, end);
};

private:
Worker& worker_;
};
split_ = [&](void* object, Split split)
{
return new T(*reinterpret_cast<T*>(object), split);
};

template <typename Reducer>
struct TBBReducer
{
explicit TBBReducer(Reducer& reducer)
: pSplitReducer_(NULL), reducer_(reducer)
{
}

TBBReducer(TBBReducer& tbbReducer, tbb::split)
: pSplitReducer_(new Reducer(tbbReducer.reducer_, RcppParallel::Split())),
reducer_(*pSplitReducer_)
{
}

virtual ~TBBReducer() { delete pSplitReducer_; }
join_ = [&](void* self, void* other)
{
(*reinterpret_cast<T*>(self)).join(*reinterpret_cast<T*>(other));
};

void operator()(const tbb::blocked_range<size_t>& r) {
reducer_(r.begin(), r.end());
}

void join(const TBBReducer& tbbReducer) {
reducer_.join(tbbReducer.reducer_);
deleter_ = [&](void* object)
{
delete (T*) object;
};
}

private:
Reducer* pSplitReducer_;
Reducer& reducer_;
};

class TBBParallelForExecutor
{
public:

TBBParallelForExecutor(Worker& worker,
std::size_t begin,
std::size_t end,
std::size_t grainSize)
: worker_(worker),
begin_(begin),
end_(end),
grainSize_(grainSize)
{
}

void operator()() const
~ReducerWrapper()
{
TBBWorker tbbWorker(worker_);
tbb::parallel_for(
tbb::blocked_range<std::size_t>(begin_, end_, grainSize_),
tbbWorker
);
if (owned_)
{
deleter_(self_);
self_ = nullptr;
}
}

private:
Worker& worker_;
std::size_t begin_;
std::size_t end_;
std::size_t grainSize_;
};

template <typename Reducer>
class TBBParallelReduceExecutor
{
public:

TBBParallelReduceExecutor(Reducer& reducer,
std::size_t begin,
std::size_t end,
std::size_t grainSize)
: reducer_(reducer),
begin_(begin),
end_(end),
grainSize_(grainSize)
void operator()(std::size_t begin, std::size_t end) const
{
work_(self_, begin, end);
}

void operator()() const
{
TBBReducer<Reducer> tbbReducer(reducer_);
tbb::parallel_reduce(
tbb::blocked_range<std::size_t>(begin_, end_, grainSize_),
tbbReducer
);
}

private:
Reducer& reducer_;
std::size_t begin_;
std::size_t end_;
std::size_t grainSize_;
};

class TBBArenaParallelForExecutor
{
public:

TBBArenaParallelForExecutor(tbb::task_group& group,
Worker& worker,
std::size_t begin,
std::size_t end,
std::size_t grainSize)
: group_(group),
worker_(worker),
begin_(begin),
end_(end),
grainSize_(grainSize)
{
}

void operator()() const
ReducerWrapper(const ReducerWrapper& rhs, Split split)
{
TBBParallelForExecutor executor(worker_, begin_, end_, grainSize_);
group_.run_and_wait(executor);
}

private:

tbb::task_group& group_;
Worker& worker_;
std::size_t begin_;
std::size_t end_;
std::size_t grainSize_;
};
self_ = rhs.split_(rhs.self_, split);
owned_ = true;

template <typename Reducer>
class TBBArenaParallelReduceExecutor
{
public:

TBBArenaParallelReduceExecutor(tbb::task_group& group,
Reducer& reducer,
std::size_t begin,
std::size_t end,
std::size_t grainSize)
: group_(group),
reducer_(reducer),
begin_(begin),
end_(end),
grainSize_(grainSize)
{
work_ = rhs.work_;
split_ = rhs.split_;
join_ = rhs.join_;
deleter_ = rhs.deleter_;
}

void operator()() const
{
TBBParallelReduceExecutor<Reducer> executor(reducer_, begin_, end_, grainSize_);
group_.run_and_wait(executor);
}

private:

tbb::task_group& group_;
Reducer& reducer_;
std::size_t begin_;
std::size_t end_;
std::size_t grainSize_;
};

class ThreadStackSizeControl
{
public:

ThreadStackSizeControl()
: control_(nullptr)
void join(const ReducerWrapper& rhs) const
{
int stackSize = resolveValue("RCPP_PARALLEL_STACK_SIZE", 0, 0);
if (stackSize > 0)
{
control_ = new tbb::global_control(
tbb::global_control::thread_stack_size,
stackSize
);
}
}

~ThreadStackSizeControl()
{
if (control_ != nullptr)
{
delete control_;
control_ = nullptr;
}
join_(self_, rhs.self_);
}

private:

// COPYING: not copyable
ThreadStackSizeControl(const ThreadStackSizeControl&);
ThreadStackSizeControl& operator=(const ThreadStackSizeControl&);

// private members
tbb::global_control* control_;

void* self_ = nullptr;
bool owned_ = false;

std::function<void (void*, std::size_t, std::size_t)> work_;
std::function<void*(void*, Split)> split_;
std::function<void (void*, void*)> join_;
std::function<void(void*)> deleter_;
};

} // anonymous namespace

void tbbParallelFor(std::size_t begin,
std::size_t end,
Worker& worker,
std::size_t grainSize = 1,
int numThreads = -1);

inline void tbbParallelFor(std::size_t begin,
std::size_t end,
Worker& worker,
void tbbParallelReduceImpl(std::size_t begin,
std::size_t end,
ReducerWrapper& wrapper,
std::size_t grainSize = 1,
int numThreads = tbb::task_arena::automatic)
{
ThreadStackSizeControl control;

tbb::task_arena arena(numThreads);
tbb::task_group group;

TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize);
arena.execute(executor);
}
int numThreads = -1);

template <typename Reducer>
inline void tbbParallelReduce(std::size_t begin,
std::size_t end,
Reducer& reducer,
std::size_t grainSize = 1,
int numThreads = tbb::task_arena::automatic)
void tbbParallelReduce(std::size_t begin,
std::size_t end,
Reducer& reducer,
std::size_t grainSize = 1,
int numThreads = -1)
{
ThreadStackSizeControl control;

tbb::task_arena arena(numThreads);
tbb::task_group group;

TBBArenaParallelReduceExecutor<Reducer> executor(group, reducer, begin, end, grainSize);
arena.execute(executor);
ReducerWrapper wrapper(&reducer);
tbbParallelReduceImpl(begin, end, wrapper, grainSize, numThreads);
}

} // namespace RcppParallel
22 changes: 11 additions & 11 deletions inst/skeleton/vector-sum.cpp
Original file line number Diff line number Diff line change
@@ -19,37 +19,37 @@ using namespace Rcpp;
using namespace RcppParallel;

struct Sum : public Worker
{
{
// source vector
const RVector<double> input;

// accumulated value
double value;

// constructors
Sum(const NumericVector input) : input(input), value(0) {}
Sum(const Sum& sum, Split) : input(sum.input), value(0) {}

// accumulate just the element of the range I've been asked to
void operator()(std::size_t begin, std::size_t end) {
value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0);
}

// join my value with that of another Sum
void join(const Sum& rhs) {
value += rhs.value;
void join(const Sum& rhs) {
value += rhs.value;
}
};

// [[Rcpp::export]]
double parallelVectorSum(NumericVector x) {
// declare the SumBody instance

// declare the SumBody instance
Sum sum(x);

// call parallel_reduce to start the work
parallelReduce(0, x.length(), sum);

// return the computed sum
return sum.value;
}
30 changes: 15 additions & 15 deletions inst/tests/cpp/innerproduct.cpp
Original file line number Diff line number Diff line change
@@ -19,43 +19,43 @@ double innerProduct(NumericVector x, NumericVector y) {
using namespace RcppParallel;

struct InnerProduct : public Worker
{
{
// source vectors
const RVector<double> x;
const RVector<double> y;

// product that I have accumulated
double product;

// constructors
InnerProduct(const NumericVector x, const NumericVector y)
InnerProduct(const NumericVector x, const NumericVector y)
: x(x), y(y), product(0) {}
InnerProduct(const InnerProduct& innerProduct, Split)
InnerProduct(const InnerProduct& innerProduct, Split)
: x(innerProduct.x), y(innerProduct.y), product(0) {}

// process just the elements of the range I have been asked to
void operator()(std::size_t begin, std::size_t end) {
product += std::inner_product(x.begin() + begin,
x.begin() + end,
y.begin() + begin,
product += std::inner_product(x.begin() + begin,
x.begin() + end,
y.begin() + begin,
0.0);
}

// join my value with that of another InnerProduct
void join(const InnerProduct& rhs) {
product += rhs.product;
void join(const InnerProduct& rhs) {
product += rhs.product;
}
};

// [[Rcpp::export]]
double parallelInnerProduct(NumericVector x, NumericVector y) {

// declare the InnerProduct instance that takes a pointer to the vector data
InnerProduct innerProduct(x, y);

// call paralleReduce to start the work
parallelReduce(0, x.length(), innerProduct);

// return the computed product
return innerProduct.product;
}
26 changes: 13 additions & 13 deletions inst/tests/cpp/sum.cpp
Original file line number Diff line number Diff line change
@@ -3,7 +3,7 @@
* @author JJ Allaire
* @license GPL (>= 2)
*/

#include <Rcpp.h>
#include <RcppParallel.h>

@@ -12,37 +12,37 @@ using namespace RcppParallel;
using namespace Rcpp;

struct Sum : public Worker
{
{
// source vector
const RVector<double> input;

// accumulated value
double value;

// constructors
Sum(const NumericVector input) : input(input), value(0) {}
Sum(const Sum& sum, Split) : input(sum.input), value(0) {}

// accumulate just the element of the range I have been asked to
void operator()(std::size_t begin, std::size_t end) {
value += std::accumulate(input.begin() + begin, input.begin() + end, 0.0);
}

// join my value with that of another Sum
void join(const Sum& rhs) {
value += rhs.value;
}
void join(const Sum& rhs) {
value += rhs.value;
}
};

// [[Rcpp::export]]
double parallelVectorSum(NumericVector x) {
// declare the SumBody instance

// declare the SumBody instance
Sum sum(x);

// call parallel_reduce to start the work
parallelReduce(0, x.length(), sum);

// return the computed sum
return sum.value;
}
240 changes: 240 additions & 0 deletions src/tbb.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@

#if RCPP_PARALLEL_USE_TBB

#include <RcppParallel/Common.h>
#include <RcppParallel/TBB.h>

namespace RcppParallel {

tbb::global_control* s_globalControl = nullptr;

// TBB Tools ----

struct TBBWorker
{
explicit TBBWorker(Worker& worker) : worker_(worker) {}

void operator()(const tbb::blocked_range<size_t>& r) const {
worker_(r.begin(), r.end());
}

private:
Worker& worker_;
};

ThreadStackSizeControl::ThreadStackSizeControl()
{
int stackSize = resolveValue("RCPP_PARALLEL_STACK_SIZE", 0, 0);
if (stackSize > 0)
{
s_globalControl = new tbb::global_control(
tbb::global_control::thread_stack_size,
stackSize
);
}
}

ThreadStackSizeControl::~ThreadStackSizeControl()
{
if (s_globalControl != nullptr)
{
delete s_globalControl;
s_globalControl = nullptr;
}
}


// TBB Parallel For ----

class TBBParallelForExecutor
{
public:

TBBParallelForExecutor(Worker& worker,
std::size_t begin,
std::size_t end,
std::size_t grainSize)
: worker_(worker),
begin_(begin),
end_(end),
grainSize_(grainSize)
{
}

void operator()() const
{
TBBWorker tbbWorker(worker_);
tbb::parallel_for(
tbb::blocked_range<std::size_t>(begin_, end_, grainSize_),
tbbWorker
);
}

private:
Worker& worker_;
std::size_t begin_;
std::size_t end_;
std::size_t grainSize_;
};

class TBBArenaParallelForExecutor
{
public:

TBBArenaParallelForExecutor(tbb::task_group& group,
Worker& worker,
std::size_t begin,
std::size_t end,
std::size_t grainSize)
: group_(group),
worker_(worker),
begin_(begin),
end_(end),
grainSize_(grainSize)
{
}

void operator()() const
{
TBBParallelForExecutor executor(worker_, begin_, end_, grainSize_);
group_.run_and_wait(executor);
}

private:

tbb::task_group& group_;
Worker& worker_;
std::size_t begin_;
std::size_t end_;
std::size_t grainSize_;
};

void tbbParallelFor(std::size_t begin,
std::size_t end,
Worker& worker,
std::size_t grainSize,
int numThreads)
{
ThreadStackSizeControl control;

tbb::task_group group;
TBBArenaParallelForExecutor executor(group, worker, begin, end, grainSize);

tbb::task_arena arena(numThreads);
arena.execute(executor);
}


// TBB Parallel Reduce ----

struct TBBReducer
{
explicit TBBReducer(ReducerWrapper& reducer)
: pSplitReducer_(NULL), reducer_(reducer)
{
}

TBBReducer(TBBReducer& tbbReducer, tbb::split)
: pSplitReducer_(new ReducerWrapper(tbbReducer.reducer_, RcppParallel::Split())),
reducer_(*pSplitReducer_)
{
}

virtual ~TBBReducer() { delete pSplitReducer_; }

void operator()(const tbb::blocked_range<size_t>& r)
{
reducer_(r.begin(), r.end());
}

void join(const TBBReducer& tbbReducer)
{
reducer_.join(tbbReducer.reducer_);
}

private:
ReducerWrapper* pSplitReducer_;
ReducerWrapper& reducer_;
};

class TBBParallelReduceExecutor
{
public:

TBBParallelReduceExecutor(ReducerWrapper& reducer,
std::size_t begin,
std::size_t end,
std::size_t grainSize)
: reducer_(reducer),
begin_(begin),
end_(end),
grainSize_(grainSize)
{
}

void operator()() const
{
TBBReducer tbbReducer(reducer_);
tbb::parallel_reduce(
tbb::blocked_range<std::size_t>(begin_, end_, grainSize_),
tbbReducer
);
}

private:
ReducerWrapper& reducer_;
std::size_t begin_;
std::size_t end_;
std::size_t grainSize_;
};

class TBBArenaParallelReduceExecutor
{
public:

TBBArenaParallelReduceExecutor(tbb::task_group& group,
ReducerWrapper& reducer,
std::size_t begin,
std::size_t end,
std::size_t grainSize)
: group_(group),
reducer_(reducer),
begin_(begin),
end_(end),
grainSize_(grainSize)
{
}

void operator()() const
{
TBBParallelReduceExecutor executor(reducer_, begin_, end_, grainSize_);
group_.run_and_wait(executor);
}

private:

tbb::task_group& group_;
ReducerWrapper& reducer_;
std::size_t begin_;
std::size_t end_;
std::size_t grainSize_;
};

void tbbParallelReduceImpl(std::size_t begin,
std::size_t end,
ReducerWrapper& reducer,
std::size_t grainSize,
int numThreads)
{
ThreadStackSizeControl control;

tbb::task_group group;
TBBArenaParallelReduceExecutor executor(group, reducer, begin, end, grainSize);

tbb::task_arena arena(numThreads);
arena.execute(executor);
}

} // end namespace RcppParallel

#endif /* RCPP_PARALLEL_USE_TBB */