Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix future reduction with non copyable types #378

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 55 additions & 105 deletions stlab/concurrency/future.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <stlab/concurrency/config.hpp>
#include <stlab/concurrency/executor_base.hpp>
#include <stlab/concurrency/immediate_executor.hpp>
#include <stlab/concurrency/optional.hpp>
#include <stlab/concurrency/task.hpp>
#include <stlab/concurrency/traits.hpp>
Expand All @@ -35,7 +36,6 @@
#define STLAB_FUTURE_COROUTINES_SUPPORT() 1
#include <experimental/coroutine>
#include <stlab/concurrency/default_executor.hpp>
#include <stlab/concurrency/immediate_executor.hpp>
#endif
#endif

Expand Down Expand Up @@ -225,6 +225,19 @@ using reduced_t = typename reduced_<T>::type;
template <typename T>
struct reduction_helper;

template <typename R>
R&& reduce(R&& r) {
return std::forward<R>(r);
}

static inline auto reduce(future<future<void>>&& r) -> future<void>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think static does anything useful here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I will propose a fix in the next patch set.


template <typename R>
auto reduce(future<future<R>>&& r) -> future<R> {
return std::move(r).then(stlab::immediate_executor,
[](auto&& f) { return *std::forward<decltype(f)>(f).get_try(); });
}

/**************************************************************************************************/

template <typename T, typename = void>
Expand Down Expand Up @@ -318,7 +331,7 @@ struct shared_base<T, enable_if_copyable<T>> : std::enable_shared_from_this<shar
}
if (ready) executor(std::move(p.first));

return reduce(std::move(p.second));
return detail::reduce(std::move(p.second));
}

template <typename F>
Expand Down Expand Up @@ -355,24 +368,14 @@ struct shared_base<T, enable_if_copyable<T>> : std::enable_shared_from_this<shar
}
if (ready) executor(std::move(p.first));

return reduce(std::move(p.second));
return detail::reduce(std::move(p.second));
}

void _detach() {
std::unique_lock<std::mutex> lock(_mutex);
if (!_ready) _then.emplace_back([](auto&&) {}, [_p = this->shared_from_this()] {});
}

template <typename R>
auto reduce(R&& r) {
return std::forward<R>(r);
}

auto reduce(future<future<void>>&& r) -> future<void>;

template <typename R>
auto reduce(future<future<R>>&& r) -> future<R>;

void set_exception(std::exception_ptr error) {
_exception = std::move(error);
then_t then;
Expand Down Expand Up @@ -486,24 +489,14 @@ struct shared_base<T, enable_if_not_copyable<T>> : std::enable_shared_from_this<
}
if (ready) executor(std::move(p.first));

return reduce(std::move(p.second));
return detail::reduce(std::move(p.second));
}

void _detach() {
std::unique_lock<std::mutex> lock(_mutex);
if (!_ready) _then = then_t([](auto&&){}, [_p = this->shared_from_this()] {});
}

template <typename R>
auto reduce(R&& r) {
return std::forward<R>(r);
}

template <typename R>
auto reduce(future<future<R>>&& r) -> future<R>;

auto reduce(future<future<void>>&& r)->future<void>;

void set_exception(std::exception_ptr error) {
_exception = std::move(error);
then_t then;
Expand All @@ -520,8 +513,6 @@ struct shared_base<T, enable_if_not_copyable<T>> : std::enable_shared_from_this<

bool is_ready() const { return _ready; }

auto get_try() -> stlab::optional<T> { return get_try_r(true); }

auto get_try_r(bool) -> stlab::optional<T> {
bool ready = false;
{
Expand Down Expand Up @@ -598,16 +589,6 @@ struct shared_base<void> : std::enable_shared_from_this<shared_base<void>> {
return recover(std::forward<E>(executor), std::forward<F>(f));
}

template <typename R>
auto reduce(R&& r) {
return std::forward<R>(r);
}

auto reduce(future<future<void>>&& r) -> future<void>;

template <typename R>
auto reduce(future<future<R>>&& r) -> future<R>;

void set_exception(std::exception_ptr error) {
_exception = std::move(error);
then_t then;
Expand Down Expand Up @@ -1099,7 +1080,6 @@ class STLAB_NODISCARD() future<T, enable_if_not_copyable<T>> {

bool is_ready() const& { return _p && _p->is_ready(); }

auto get_try() const& { return _p->get_try(); }

auto get_try() && { return _p->get_try_r(unique_usage(_p)); }

Expand Down Expand Up @@ -1705,8 +1685,8 @@ auto when_any(E executor, F&& f, std::pair<I, I> range) {
/**************************************************************************************************/

template <typename E, typename F, typename... Args>
auto async(E executor, F&& f, Args&&... args)
-> future<detail::result_t<std::decay_t<F>, std::decay_t<Args>...>> {
auto async(E executor, F&& f, Args&&... args) -> future<
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss this as a breaking change on the (public) stlab slack - (let me know if you need an invite). Personally, I think I'm fine with it as I doubt there are any (or many) cases this would break and likely the break is minor but we might consider alternate names or using namespace versioning.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this need to be discussed. I think need an invite, I can't sign in into stlab.slack.com.

detail::reduced_t<std::decay_t<detail::result_t<std::decay_t<F>, std::decay_t<Args>...>>>> {
using result_type = detail::result_t<std::decay_t<F>, std::decay_t<Args>...>;

auto p = package<result_type()>(
Expand All @@ -1719,7 +1699,7 @@ auto async(E executor, F&& f, Args&&... args)

executor(std::move(p.first));

return std::move(p.second);
return detail::reduce(std::move(p.second));
}

/**************************************************************************************************/
Expand All @@ -1739,6 +1719,10 @@ struct reduction_helper<future<void>> {
future<void> value;
};

static inline auto reduce(future<future<void>>&& r) -> future<void> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove static and move this definition above - if this is circular we can use a tagged dispatch in a common - reduce to resolve or another technique.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I will propose a fix in the next patch set.

return std::move(r).then(stlab::immediate_executor, [](auto) {});
}

/**************************************************************************************************/

template <typename T>
Expand Down Expand Up @@ -1766,31 +1750,32 @@ struct value_<T, enable_if_copyable<T>> {
sb._result = f(std::forward<Args>(args)...);
sb._reduction_helper.value =
(*sb._result)
.recover([_p = sb.shared_from_this()](future<R> f) {
if (f.exception()) {
_p->_exception = std::move(f).exception();
proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
return *f.get_try();
})
.then([_p = sb.shared_from_this()](auto) { proceed(*_p); });
.recover(stlab::immediate_executor,
[_p = sb.shared_from_this()](future<R> f) {
if (f.exception()) {
_p->_exception = std::move(f).exception();
proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
})
.then(stlab::immediate_executor, [_p = sb.shared_from_this()]() { proceed(*_p); });
}

template <typename F, typename... Args>
static void set(shared_base<future<void>>& sb, F& f, Args&&... args) {
sb._result = f(std::forward<Args>(args)...);
sb._reduction_helper.value =
(*sb._result)
.recover([_p = sb.shared_from_this()](future<void> f) {
if (f.exception()) {
_p->_exception = std::move(f).exception();
value_::proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
return;
})
.then([_p = sb.shared_from_this()]() { proceed(*_p); });
.recover(stlab::immediate_executor,
[_p = sb.shared_from_this()](future<void> f) {
if (f.exception()) {
_p->_exception = std::move(f).exception();
value_::proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
return;
})
.then(stlab::immediate_executor, [_p = sb.shared_from_this()]() { proceed(*_p); });
}
};

Expand Down Expand Up @@ -1818,15 +1803,17 @@ struct value_<T, enable_if_not_copyable<T>> {
sb._result = f(std::forward<Args>(args)...);
sb._reduction_helper.value =
std::move(*sb._result)
.recover([_p = sb.shared_from_this()](future<R> f) {
if (auto ex = std::move(f).exception()) {
_p->_exception = ex;
proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
return *f.get_try();
})
.then([_p = sb.shared_from_this()](auto) { proceed(*_p); });
.recover(stlab::immediate_executor,
[_p = sb.shared_from_this()](future<R> f) {
if (auto ex = std::move(f).exception()) {
_p->_exception = ex;
proceed(*_p);
throw future_error(future_error_codes::reduction_failed);
}
// We could move out the data to put it back in place in the
// next 'then' call or just leave it in place and do nothing.
})
.then(stlab::immediate_executor, [_p = sb.shared_from_this()]() { proceed(*_p); });
}
};

Expand Down Expand Up @@ -1888,44 +1875,7 @@ auto shared_base<void>::recover(E&& executor, F&& f)
}
if (ready) executor(std::move(p.first));

return reduce(std::move(p.second));
}

/**************************************************************************************************/

template <typename T>
auto shared_base<T, enable_if_copyable<T>>::reduce(future<future<void>>&& r) -> future<void> {
return std::move(r).then([](auto) {});
}

template <typename T>
template <typename R>
auto shared_base<T, enable_if_copyable<T>>::reduce(future<future<R>>&& r) -> future<R> {
return std::move(r).then([](auto&& f) { return *std::forward<decltype(f)>(f).get_try(); });
}

/**************************************************************************************************/

template <typename T>
auto shared_base<T, enable_if_not_copyable<T>>::reduce(future<future<void>>&& r) -> future<void> {
return std::move(r).then([](auto){});
}

template <typename T>
template <typename R>
auto shared_base<T, enable_if_not_copyable<T>>::reduce(future<future<R>>&& r) -> future<R> {
return std::move(r).then([](auto&& f) { return *std::forward<decltype(f)>(f).get_try(); });
}

/**************************************************************************************************/

inline auto shared_base<void>::reduce(future<future<void>>&& r) -> future<void> {
return std::move(r).then([](auto) {});
}

template <typename R>
auto shared_base<void>::reduce(future<future<R>>&& r) -> future<R> {
return std::move(r).then([](auto&& f) { return *std::forward<decltype(f)>(f).get_try(); });
return detail::reduce(std::move(p.second));
}

/**************************************************************************************************/
Expand Down
4 changes: 2 additions & 2 deletions test/future_recover_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ BOOST_AUTO_TEST_CASE(future_recover_move_only_with_broken_promise) {
return std::move(p.second).recover([&check](auto f) {
check = true;
try {
return *std::move(f.get_try());
return *std::move(f).get_try();
} catch (const exception&) {
throw;
}
Expand All @@ -1179,7 +1179,7 @@ BOOST_AUTO_TEST_CASE(future_recover_move_only_with_broken_promise) {
return std::move(p.second) ^ [&check](auto f) {
check = true;
try {
return *std::move(f.get_try());
return *std::move(f).get_try();
} catch (const exception&) {
throw;
}
Expand Down
12 changes: 5 additions & 7 deletions test/future_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,10 @@ struct test_fixture {

template <typename F>
auto wait_until_future_r_completed(F& f) {
auto result = f.get_try();
while (!result) {
while (!f.is_ready()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
result = f.get_try();
}
return result;
return std::move(f).get_try();
}

void check_valid_future() {}
Expand All @@ -135,7 +133,7 @@ struct test_fixture {

template <typename E, typename F>
static void check_failure(F& f, const char* message) {
BOOST_REQUIRE_EXCEPTION(f.get_try(), E, ([_m = message](const auto& e) {
BOOST_REQUIRE_EXCEPTION(std::move(f).get_try(), E, ([_m = message](const auto& e) {
return std::string(_m) == std::string(e.what());
}));
}
Expand All @@ -156,15 +154,15 @@ struct test_fixture {
private:
template <typename F>
void wait_until_future_is_ready(F& f) {
while (!f.get_try()) {
while (!f.is_ready()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}

template <typename E, typename F>
void wait_until_this_future_fails(F& f) {
try {
while (!f.get_try()) {
while (!f.is_ready()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
} catch (const E&) {
Expand Down
Loading