Skip to content

Commit

Permalink
Functor as reducer for TeamThreadRange, ThreadVectorRange and TeamVec…
Browse files Browse the repository at this point in the history
…torRange in HostThreadTeam
  • Loading branch information
ldh4 committed Mar 30, 2024
1 parent 68c6684 commit 338240d
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 49 deletions.
134 changes: 85 additions & 49 deletions core/src/impl/Kokkos_HostThreadTeam.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,50 +542,54 @@ class HostThreadTeamMember {
team_reduce(reducer, reducer.reference());
}

template <typename ReducerType>
KOKKOS_INLINE_FUNCTION std::enable_if_t<is_reducer<ReducerType>::value>
team_reduce(ReducerType const& reducer,
typename ReducerType::value_type contribution) const noexcept {
KOKKOS_IF_ON_HOST((
if (1 < m_data.m_team_size) {
using value_type = typename ReducerType::value_type;

if (0 != m_data.m_team_rank) {
// Non-root copies to their local buffer:
/*reducer.copy( (value_type*) m_data.team_reduce_local()
, reducer.data() );*/
*((value_type*)m_data.team_reduce_local()) = contribution;
}
template <typename ReducerType, typename ValueType>
KOKKOS_INLINE_FUNCTION // std::enable_if_t<is_reducer<ReducerType>::value>
void
team_reduce(ReducerType const& reducer, ValueType& contribution) const
noexcept {
KOKKOS_IF_ON_HOST((if (1 < m_data.m_team_size) {
using value_type = ValueType;

// Root does not overwrite shared memory until all threads arrive
// and copy to their local buffer.
if (0 != m_data.m_team_rank) {
// Non-root copies to their local buffer:
/*reducer.copy( (value_type*) m_data.team_reduce_local()
, reducer.data() );*/
*((value_type*)m_data.team_reduce_local()) = contribution;
}

if (m_data.team_rendezvous()) {
// All threads have entered 'team_rendezvous'
// only this thread returned from 'team_rendezvous'
// with a return value of 'true'
//
// This thread sums contributed values
for (int i = 1; i < m_data.m_team_size; ++i) {
value_type* const src =
(value_type*)m_data.team_member(i)->team_reduce_local();
// Root does not overwrite shared memory until all threads arrive
// and copy to their local buffer.

reducer.join(contribution, *src);
}
if (m_data.team_rendezvous()) {
// All threads have entered 'team_rendezvous'
// only this thread returned from 'team_rendezvous'
// with a return value of 'true'
//
// This thread sums contributed values
for (int i = 1; i < m_data.m_team_size; ++i) {
value_type* const src =
(value_type*)m_data.team_member(i)->team_reduce_local();

// Copy result to root member's buffer:
// reducer.copy( (value_type*) m_data.team_reduce() , reducer.data()
// );
*((value_type*)m_data.team_reduce()) = contribution;
reducer.reference() = contribution;
m_data.team_rendezvous_release();
// This thread released all other threads from 'team_rendezvous'
// with a return value of 'false'
} else {
// Copy from root member's buffer:
reducer.reference() = *((value_type*)m_data.team_reduce());
}
} else { reducer.reference() = contribution; }))
reducer.join(contribution, *src);
}

// Copy result to root member's buffer:
// reducer.copy( (value_type*) m_data.team_reduce() , reducer.data()
// );
*((value_type*)m_data.team_reduce()) = contribution;

m_data.team_rendezvous_release();
// This thread released all other threads from 'team_rendezvous'
// with a return value of 'false'
} else {
// Copy from root member's buffer:
contribution = *((value_type*)m_data.team_reduce());
}
}))

if constexpr (is_reducer_v<ReducerType>) {
reducer.reference() = contribution;
}

KOKKOS_IF_ON_DEVICE(((void)reducer; (void)contribution;
Kokkos::abort("HostThreadTeamMember team_reduce\n");))
Expand Down Expand Up @@ -786,17 +790,37 @@ KOKKOS_INLINE_FUNCTION
parallel_reduce(Impl::TeamThreadRangeBoundariesStruct<iType, Member> const&
loop_boundaries,
Closure const& closure, ValueType& result) {
ValueType val;
Sum<ValueType> reducer(val);
reducer.init(val);
using functor_analysis_type = typename Impl::FunctorAnalysis<
Impl::FunctorPatternInterface::REDUCE,
TeamPolicy<Kokkos::DefaultHostExecutionSpace>, Closure, ValueType>;

constexpr bool is_reducer_closure =
functor_analysis_type::has_join_member_function &&
functor_analysis_type::has_init_member_function;

using ReducerSelector =
typename Kokkos::Impl::if_c<is_reducer_closure, Closure,
Sum<ValueType>>::type;

auto run_closure = [&](ValueType& value) {
for (iType i = loop_boundaries.start; i < loop_boundaries.end;
i += loop_boundaries.increment) {
closure(i, value);
}
};

for (iType i = loop_boundaries.start; i < loop_boundaries.end;
i += loop_boundaries.increment) {
closure(i, reducer.reference());
ValueType val{};
if constexpr (is_reducer_closure) {
closure.init(val);
run_closure(val);
loop_boundaries.thread.team_reduce(closure, val);
} else {
ReducerSelector reducer(val);
reducer.init(val);
run_closure(val);
loop_boundaries.thread.team_reduce(reducer, val);
}

loop_boundaries.thread.team_reduce(reducer);
result = reducer.reference();
result = val;
}

/*template< typename iType, class Space
Expand Down Expand Up @@ -840,7 +864,19 @@ KOKKOS_INLINE_FUNCTION
parallel_reduce(const Impl::ThreadVectorRangeBoundariesStruct<
iType, Member>& loop_boundaries,
const Lambda& lambda, ValueType& result) {
using wrapped_reducer_type = typename Impl::FunctorAnalysis<
Impl::FunctorPatternInterface::REDUCE,
TeamPolicy<Kokkos::DefaultHostExecutionSpace>, Lambda,
ValueType>::Reducer;

constexpr bool is_reducer_closure =
wrapped_reducer_type::has_join_member_function() &&
wrapped_reducer_type::has_init_member_function();

result = ValueType();

if constexpr (is_reducer_closure) lambda.init(result);

for (iType i = loop_boundaries.start; i < loop_boundaries.end;
i += loop_boundaries.increment) {
lambda(i, result);
Expand Down
129 changes: 129 additions & 0 deletions core/unit_test/TestTeam.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1753,4 +1753,133 @@ struct TestRepeatedTeamReduce {

} // namespace Test

namespace Test {

struct SimpleTestValue {
int value;
};

struct SimpleFunctorReducer {
using value_type = SimpleTestValue;

KOKKOS_INLINE_FUNCTION
void init(value_type &init) const { init.value = 1; }

KOKKOS_INLINE_FUNCTION
void join(value_type &dst, value_type const &src) const {
dst.value *= src.value;
}

KOKKOS_INLINE_FUNCTION
void final(value_type &) const {}

KOKKOS_INLINE_FUNCTION
void operator()(const int i, value_type &update) const {
update.value *= (i + 1);
}
};

struct SimpleReducer {
using reducer = SimpleReducer;
using value_type = SimpleTestValue;

KOKKOS_INLINE_FUNCTION
SimpleReducer(value_type &val) : local(val) {}

KOKKOS_INLINE_FUNCTION
void init(value_type &init) const { init.value = 1; }

KOKKOS_INLINE_FUNCTION
void join(value_type &dst, value_type const &src) const {
dst.value *= src.value;
}

KOKKOS_INLINE_FUNCTION
void final(value_type &) const {}

KOKKOS_INLINE_FUNCTION
value_type &reference() const { return local; }

value_type &local;
};

namespace {

template <typename ExecSpace>
class TestTeamFunctorReducer {
public:
using execution_space = ExecSpace;
using team_policy_type = Kokkos::TeamPolicy<execution_space>;
using member_type = typename team_policy_type::member_type;
using value_type = SimpleTestValue;
using functor_type = SimpleFunctorReducer;
using reducer_type = SimpleReducer;
using index_type = int;

void run_test_team_thread() {
auto policy = KOKKOS_LAMBDA(member_type const &member, index_type count) {
return Kokkos::TeamThreadRange(member, count);
};
run_test_team_policies(policy);
};

void run_test_thread_vector() {
auto policy = KOKKOS_LAMBDA(member_type const &member, index_type count) {
return Kokkos::ThreadVectorRange(member, count);
};
run_test_team_policies(policy);
};

void run_test_team_vector() {
auto policy = KOKKOS_LAMBDA(member_type const &member, index_type count) {
return Kokkos::TeamVectorRange(member, count);
};
run_test_team_policies(policy);
};

template <typename Policy>
void run_test_team_policies(Policy &policy) {
constexpr index_type league_size = 3;
constexpr index_type test_count = 8;

Kokkos::View<int[league_size], execution_space> test_result("result");
Kokkos::View<int[league_size], execution_space> expected_result("expected");

Kokkos::parallel_for(
team_policy_type(league_size, Kokkos::AUTO),
KOKKOS_LAMBDA(member_type const &team) {
const int league = team.league_rank();
value_type result{};

Kokkos::parallel_reduce(policy(team, test_count), functor_type{},
result);
test_result(league) = result.value;

value_type expected{};
reducer_type reducer(expected);
Kokkos::parallel_reduce(
policy(team, test_count),
[&](const int i, value_type &update) { update.value *= (i + 1); },
reducer);

Kokkos::single(Kokkos::PerTeam(team),
[=]() { expected_result(league) = expected.value; });
});
Kokkos::fence();

auto test = Kokkos::create_mirror_view_and_copy(
Kokkos::DefaultHostExecutionSpace{}, test_result);
auto check = Kokkos::create_mirror_view_and_copy(
Kokkos::DefaultHostExecutionSpace{}, expected_result);

for (unsigned i = 0; i < test.extent(0); ++i) {
ASSERT_EQ(test(i), check(i));
}
}
};

} // namespace

} // namespace Test

/*--------------------------------------------------------------------------*/
6 changes: 6 additions & 0 deletions core/unit_test/TestTeamReductionScan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,5 +120,11 @@ TEST(TEST_CATEGORY, repeated_team_reduce) {
TestRepeatedTeamReduce<TEST_EXECSPACE>();
}

TEST(TEST_CATEGORY, nested_team_reduce_functor_as_reducer) {
TestTeamFunctorReducer<TEST_EXECSPACE>().run_test_team_thread();
TestTeamFunctorReducer<TEST_EXECSPACE>().run_test_thread_vector();
TestTeamFunctorReducer<TEST_EXECSPACE>().run_test_team_vector();
}

} // namespace Test
#endif

0 comments on commit 338240d

Please sign in to comment.