From 8e6f1205fe22389b659642c937ecf8c78fa1133a Mon Sep 17 00:00:00 2001 From: terrakuh Date: Fri, 10 Jun 2022 11:25:23 +0000 Subject: [PATCH] Fix synchronization --- CHANGELOG.md | 16 ++++++- CMakeLists.txt | 2 +- README.md | 13 ++--- curlio/detail/header_collector.hpp | 19 +++++--- curlio/error.hpp | 26 +++++----- curlio/quick/reader.hpp | 77 +++++++++++------------------- curlio/request.hpp | 14 ++++-- curlio/response.hpp | 48 +++++++++++++------ curlio/session.hpp | 9 ++-- examples/playground.cpp | 58 ++++++++++------------ examples/simple.cpp | 48 +++++++++++++++++++ 11 files changed, 195 insertions(+), 135 deletions(-) create mode 100644 examples/simple.cpp diff --git a/CHANGELOG.md b/CHANGELOG.md index 10fa682..9714723 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Change Log +## [v0.3.3] +### Added +- Convenience function to await all headers +- Simple example + +### Changed +- Use system_error of Boost instead of `std::system_error` + +### Fixed +- Getting stuck in multi threading mode +- Completion handlers with ambiguous signature +- Header collector synchronization + ## [v0.3.2] ### Fixed - Post completion handlers for execution instead of direct invocation @@ -30,7 +43,8 @@ ### Fixed - Cookie share support -[Unreleased]: https://github.com/terrakuh/curlio/compare/v0.3.2..dev +[Unreleased]: https://github.com/terrakuh/curlio/compare/v0.3.3..dev +[v0.3.3]: https://github.com/terrakuh/curlio/compare/v0.3.2..v0.3.3 [v0.3.2]: https://github.com/terrakuh/curlio/compare/v0.3.1..v0.3.2 [v0.3.1]: https://github.com/terrakuh/curlio/compare/v0.3..v0.3.1 [v0.3]: https://github.com/terrakuh/curlio/compare/v0.2..v0.3 diff --git a/CMakeLists.txt b/CMakeLists.txt index ab1dadc..7e4a42a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ cmake_minimum_required(VERSION 3.16) project( curlio - VERSION 0.3.2 + VERSION 0.3.3 DESCRIPTION "The simple glue for cURL and Boost.ASIO" HOMEPAGE_URL "https://github.com/terrakuh/curlio" LANGUAGES CXX diff --git a/README.md b/README.md index 9b0ad61..2c16d9f 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,6 @@ # cURLio -Simple header-only **Boost.ASIO** wrapper. + +Simple C++ 14 header-only **Boost.ASIO** wrapper. ## Example @@ -10,14 +11,8 @@ req.set_url("https://example.com"); curl_easy_setopt(req.native_handle(), CURLOPT_USERAGENT, "curl/7.80.0"); auto resp = session.start(req); -while (true) { - char buf[4096]; - auto [ec, n] = co_await resp.async_read_some(buffer(buf), use_nothrow_awaitable); - if (ec == error::eof) { - break; - } - std::cout.write(buf, n); -} +co_await resp.async_await_last_headers(use_awaitable); +const std::string content = co_await curlio::quick::async_read_all(resp, use_awaitable); co_await resp.async_await_completion(use_awaitable); co_return; ``` diff --git a/curlio/detail/header_collector.hpp b/curlio/detail/header_collector.hpp index 96cf306..ba3b247 100644 --- a/curlio/detail/header_collector.hpp +++ b/curlio/detail/header_collector.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include @@ -40,16 +41,19 @@ class Header_collector { public: typedef std::map Fields; + Header_collector(boost::asio::any_io_executor executor) : _executor{ std::move(executor) } {} Fields& fields() noexcept { return _fields; } const Fields& fields() const noexcept { return _fields; } void hook(CURL* handle) noexcept; void unhook(CURL* handle) noexcept; void finish(); + boost::asio::any_io_executor get_executor() { return _executor; } /// Waits until the header received flag is set. Clears it before completion. template auto async_await_headers(Token&& token); private: + boost::asio::any_io_executor _executor; Fields _fields; std::uint8_t _last_clear = 0; std::uint8_t _headers_received = 0; @@ -84,21 +88,22 @@ template inline auto Header_collector::async_await_headers(Token&& token) { return boost::asio::async_initiate( - [this](auto handler) { + [this](auto&& handler) { + auto executor = boost::asio::get_associated_executor(handler, _executor); + // already received if (_ready_to_await) { _ready_to_await = false; - std::move(handler)(boost::system::error_code{}); + boost::asio::post(executor, std::bind(std::move(handler), boost::system::error_code{})); } else if (_headers_received_handler) { - std::move(handler)(Code::multiple_headers_awaitings); + boost::asio::post(executor, + std::bind(std::move(handler), make_error_code(Code::multiple_headers_awaitings))); } // need to wait else { - _headers_received_handler = [this, + _headers_received_handler = [this, executor, handler = std::move(handler)](boost::system::error_code ec) mutable { _ready_to_await = false; - auto executor = boost::asio::get_associated_executor(handler); - boost::asio::post(executor, - [handler = std::move(handler), ec]() mutable { std::move(handler)(ec); }); + boost::asio::post(executor, std::bind(std::move(handler), ec)); }; } }, diff --git a/curlio/error.hpp b/curlio/error.hpp index 7a2e89e..e38ed80 100644 --- a/curlio/error.hpp +++ b/curlio/error.hpp @@ -1,6 +1,6 @@ #pragma once -#include +#include #include namespace curlio { @@ -23,14 +23,14 @@ enum class Condition { usage, }; -std::error_condition make_error_condition(Condition condition) noexcept; +boost::system::error_condition make_error_condition(Condition condition) noexcept; -inline const std::error_category& code_category() noexcept +inline const boost::system::error_category& code_category() noexcept { - static class : public std::error_category { + static class : public boost::system::error_category { public: const char* name() const noexcept override { return "curlio"; } - std::error_condition default_error_condition(int code) const noexcept override + boost::system::error_condition default_error_condition(int code) const noexcept override { if (code == 0) { return make_error_condition(Condition::success); @@ -60,9 +60,9 @@ inline const std::error_category& code_category() noexcept return category; } -inline const std::error_category& condition_category() noexcept +inline const boost::system::error_category& condition_category() noexcept { - static class : public std::error_category { + static class : public boost::system::error_category { public: const char* name() const noexcept override { return "curlio"; } std::string message(int condition) const override @@ -77,24 +77,24 @@ inline const std::error_category& condition_category() noexcept return category; } -inline std::error_code make_error_code(Code code) noexcept +inline boost::system::error_code make_error_code(Code code) noexcept { return { static_cast(code), code_category() }; } -inline std::error_condition make_error_condition(Condition condition) noexcept +inline boost::system::error_condition make_error_condition(Condition condition) noexcept { return { static_cast(condition), condition_category() }; } } // namespace curlio -namespace std { +namespace boost::system { template<> -struct is_error_code_enum : true_type {}; +struct is_error_code_enum : std::true_type {}; template<> -struct is_error_condition_enum : true_type {}; +struct is_error_condition_enum : std::true_type {}; -} // namespace std +} // namespace boost::system diff --git a/curlio/quick/reader.hpp b/curlio/quick/reader.hpp index e7e0733..280c1f8 100644 --- a/curlio/quick/reader.hpp +++ b/curlio/quick/reader.hpp @@ -4,38 +4,10 @@ #include "../response.hpp" #include -#include #include namespace curlio::quick { -/** - * Reads the content from the given stream into a string.The handler signature is - * `void(boost::system::error_code, std::string)`. - * - * @param stream This object must live as long as this operation is running. - * @tparam Size_step The maximum allowed of read size at once. - */ -template -inline auto async_read_all(Async_read_stream& stream, Token&& token) -{ - return boost::asio::async_compose( - [&stream, old_size = std::size_t{ 0 }, str = std::string{}](auto& self, boost::system::error_code ec = {}, - std::size_t bytes_read = 0) mutable { - str.resize(old_size + bytes_read); - if (ec == boost::asio::error::eof) { - self.complete(boost::system::error_code{}, std::move(str)); - } else if (ec) { - self.complete(ec, std::move(str)); - } else { - old_size = str.size(); - str.resize(old_size + Size_step); - stream.async_read_some(boost::asio::buffer(str.data() + old_size, Size_step), std::move(self)); - } - }, - token, stream); -} - /** * Reads the content from the given stream into a string.The handler signature is * `void(boost::system::error_code, std::string)`. @@ -46,32 +18,37 @@ inline auto async_read_all(Async_read_stream& stream, Token&& token) template inline auto async_read_all(Response& response, Token&& token) { - if (!response.is_active()) { - throw boost::system::system_error{ Code::request_not_active }; - } + return boost::asio::async_compose( + [&response, last_buffer_size = std::size_t{ 0 }, has_limit = false, buffer = std::string{}]( + auto& self, boost::system::error_code ec = {}, std::size_t bytes_read = 0) mutable { + if (!ec && bytes_read == 0 && !response.is_active()) { + ec = Code::request_not_active; + } - return boost::asio::async_initiate( - [&response](auto handler) { - const auto length = response.content_length(); - CURLIO_DEBUG("Content length for reading " << length); - if (length < 0) { - async_read_all<512>(response, std::move(handler)); + last_buffer_size += bytes_read; + if (ec) { + buffer.resize(last_buffer_size); + self.complete(ec == boost::asio::error::eof ? boost::system::error_code{} : ec, std::move(buffer)); } else { - auto str = std::make_unique(); - str->resize(static_cast(length)); - auto buffer = boost::asio::buffer(*str); - auto executor = boost::asio::get_associated_executor(handler, response.get_executor()); - boost::asio::async_read(response, buffer, - boost::asio::bind_executor( - executor, [handler = std::move(handler), str = std::move(str)]( - boost::system::error_code ec, std::size_t bytes_read) mutable { - CURLIO_DEBUG("Done so" << bytes_read); - str->resize(bytes_read); - std::move(handler)(ec, std::move(*str)); - })); + if (bytes_read == 0) { + const auto length = response.content_length(); + CURLIO_DEBUG("Content length for reading " << length); + if (length >= 0) { + has_limit = true; + buffer.resize(static_cast(length)); + } + } + + if (buffer.size() - last_buffer_size < 4096) { + buffer.resize(last_buffer_size + 4096); + } + + response.async_read_some( + boost::asio::buffer(buffer.data() + last_buffer_size, buffer.size() - last_buffer_size), + std::move(self)); } }, - token); + token, response.get_executor()); } } // namespace curlio::quick diff --git a/curlio/request.hpp b/curlio/request.hpp index a353bb8..95af77e 100644 --- a/curlio/request.hpp +++ b/curlio/request.hpp @@ -81,7 +81,7 @@ inline void Request::set_url(const char* url) { _lazy_init(); if (curl_easy_setopt(native_handle(), CURLOPT_URL, url) != CURLE_OK) { - throw std::system_error{ Code::bad_url }; + throw boost::system::system_error{ Code::bad_url }; } } @@ -130,9 +130,9 @@ inline auto Request::async_write_some(const Const_buffer_sequence& buffers, Toke // can immediately finish if (ptr == nullptr || ptr->status & detail::finished) { - boost::asio::post(executor, std::bind(std::move(handler), boost::asio::error::eof, 0)); + boost::asio::post(executor, std::bind(std::move(handler), make_error_code(boost::asio::error::eof), 0)); } else if (_send_handler) { - boost::asio::post(executor, std::bind(std::move(handler), Code::multiple_writes, 0)); + boost::asio::post(executor, std::bind(std::move(handler), make_error_code(Code::multiple_writes), 0)); } else { // set write handler when cURL calls the write callback _send_handler = [this, buffers, handler = std::move(handler), @@ -144,8 +144,12 @@ inline auto Request::async_write_some(const Const_buffer_sequence& buffers, Toke }; // TODO check for errors - ptr->pause_mask &= ~CURLPAUSE_SEND; - curl_easy_pause(ptr->handle, ptr->pause_mask); + boost::asio::dispatch(ptr->executor, [ptr] { + if ((ptr->pause_mask & CURLPAUSE_SEND) == CURLPAUSE_SEND) { + ptr->pause_mask &= ~CURLPAUSE_SEND; + curl_easy_pause(ptr->handle, ptr->pause_mask); + } + }); } }, token); diff --git a/curlio/response.hpp b/curlio/response.hpp index edd6a1f..6df8b03 100644 --- a/curlio/response.hpp +++ b/curlio/response.hpp @@ -41,17 +41,19 @@ class Response { CURL* native_handle() noexcept; detail::Header_collector::Fields& header_fields() noexcept { return _header_collector.fields(); } template - auto async_await_headers(Token&& token) + auto async_await_next_headers(Token&& token) { return _header_collector.async_await_headers(std::forward(token)); } + template + auto async_await_last_headers(Token&& token); /// Waits until the response is complete. Data must be read before this function. template auto async_await_completion(Token&& token); template auto async_read_some(const Mutable_buffer_sequence& buffers, Token&& token); /// Getting the executor of an invalid response is undefined behavior. - boost::asio::any_io_executor get_executor() noexcept { return _data.lock()->executor; } + boost::asio::any_io_executor get_executor() noexcept { return _header_collector.get_executor(); } Response& operator=(Response&& move) noexcept; private: @@ -116,7 +118,7 @@ inline long Response::response_code() long response = 0; const auto code = curl_easy_getinfo(native_handle(), CURLINFO_RESPONSE_CODE, &response); if (code != CURLE_OK) { - throw std::system_error{ Code::no_response_code, curl_easy_strerror(code) }; + throw boost::system::system_error{ Code::no_response_code, curl_easy_strerror(code) }; } return response; } @@ -127,6 +129,21 @@ inline CURL* Response::native_handle() noexcept return ptr == nullptr ? nullptr : ptr->handle; } +template +inline auto Response::async_await_last_headers(Token&& token) +{ + return boost::asio::async_compose( + [this, started = false](auto& self, boost::system::error_code ec = {}) mutable { + if (ec || (started && !is_redirect())) { + self.complete(ec); + } else { + started = true; + async_await_next_headers(std::move(self)); + } + }, + token, get_executor()); +} + template inline auto Response::async_await_completion(Token&& token) { @@ -138,7 +155,8 @@ inline auto Response::async_await_completion(Token&& token) if (ptr == nullptr || ptr->status & detail::finished) { boost::asio::post(executor, std::bind(std::move(handler), boost::system::error_code{})); } else if (_finish_handler) { - boost::asio::post(executor, std::bind(std::move(handler), Code::multiple_completion_awaitings)); + boost::asio::post( + executor, std::bind(std::move(handler), make_error_code(Code::multiple_completion_awaitings))); } else { _finish_handler = [handler = std::move(handler), executor]() mutable { boost::asio::post(executor, std::bind(std::move(handler), boost::system::error_code{})); @@ -156,7 +174,6 @@ inline auto Response::async_read_some(const Mutable_buffer_sequence& buffers, To [this, buffers](auto&& handler) { const auto ptr = _data.lock(); auto executor = boost::asio::get_associated_executor(handler, ptr->executor); - // executor = ptr->executor; // can immediately finish if (_input_buffer.size() > 0) { @@ -164,13 +181,13 @@ inline auto Response::async_read_some(const Mutable_buffer_sequence& buffers, To _input_buffer.consume(copied); boost::asio::post(executor, std::bind(std::move(handler), boost::system::error_code{}, copied)); } else if (_receive_handler) { - boost::asio::post(executor, std::bind(std::move(handler), Code::multiple_reads, 0)); + boost::asio::post(executor, std::bind(std::move(handler), make_error_code(Code::multiple_reads), 0)); } else if (ptr == nullptr || ptr->status & detail::finished) { - boost::asio::post(executor, std::bind(std::move(handler), boost::asio::error::eof, 0)); + boost::asio::post(executor, + std::bind(std::move(handler), make_error_code(boost::asio::error::eof), 0)); } else { // set write handler when cURL calls the write callback - // TODO figure out why it works with this executor for large downloads with multiple threads - _receive_handler = [this, buffers, executor = ptr->executor, + _receive_handler = [this, buffers, executor, handler = std::move(handler)](boost::system::error_code ec) mutable { std::size_t copied = 0; // copy data and finish @@ -183,10 +200,12 @@ inline auto Response::async_read_some(const Mutable_buffer_sequence& buffers, To }; // TODO check for errors - if (ptr->pause_mask & CURLPAUSE_RECV) { - ptr->pause_mask &= ~CURLPAUSE_RECV; - curl_easy_pause(ptr->handle, ptr->pause_mask); - } + boost::asio::dispatch(ptr->executor, [ptr] { + if ((ptr->pause_mask & CURLPAUSE_RECV) == CURLPAUSE_RECV) { + ptr->pause_mask &= ~CURLPAUSE_RECV; + curl_easy_pause(ptr->handle, ptr->pause_mask); + } + }); } }, token); @@ -199,7 +218,8 @@ inline Response& Response::operator=(Response&& move) noexcept return *this; } -inline Response::Response(const std::shared_ptr& data) noexcept : _data{ data } +inline Response::Response(const std::shared_ptr& data) noexcept + : _data{ data }, _header_collector{ data->executor } { if (data != nullptr) { curl_easy_setopt(data->handle, CURLOPT_WRITEFUNCTION, &Response::_receive_callback); diff --git a/curlio/session.hpp b/curlio/session.hpp index 8defb4e..d74ac67 100644 --- a/curlio/session.hpp +++ b/curlio/session.hpp @@ -36,6 +36,7 @@ class Session { bool watch_write = false; }; + /// Everything related to the multi handle is run synchronized. boost::asio::strand _strand; boost::asio::steady_timer _timer; CURLM* _multi_handle = nullptr; @@ -89,7 +90,7 @@ inline Session::~Session() noexcept inline Response Session::start(Request& request) { if (!request.is_valid() || request.is_active()) { - throw std::system_error{ Code::request_in_use }; + throw boost::system::system_error{ Code::request_in_use }; } CURLIO_DEBUG("Starting request " << &request); @@ -112,9 +113,11 @@ inline Response Session::start(Request& request) Response response{ data }; _active_requests.insert({ data.get(), std::move(data) }); - curl_multi_add_handle(_multi_handle, handle); // kickstart - _multi_timer_callback(_multi_handle, 0, this); + boost::asio::post(_strand, [this, handle] { + curl_multi_add_handle(_multi_handle, handle); + _multi_timer_callback(_multi_handle, 0, this); + }); return response; } diff --git a/examples/playground.cpp b/examples/playground.cpp index a2ceeda..d56aea0 100644 --- a/examples/playground.cpp +++ b/examples/playground.cpp @@ -4,6 +4,7 @@ #include #include #include +#include using namespace boost::asio; @@ -27,43 +28,34 @@ int main(int argc, char** argv) [&]() -> awaitable { try { curlio::Session session{ service.get_executor() }; - session.set_cookie_file("/tmp/cookme"); - for (int i = 0; i < 2; ++i) { - curlio::Request req{}; - req.set_url("https://example.com"); - // curl_easy_setopt(req.native_handle(), CURLOPT_VERBOSE, 1L); - curl_easy_setopt(req.native_handle(), CURLOPT_FOLLOWLOCATION, 1L); - // curl_easy_setopt(req.native_handle(), CURLOPT_USERAGENT, "curl/7.80.0"); - // curl_easy_setopt(req.native_handle(), CURLOPT_COOKIEFILE, "/tmp/cookme"); - // curl_easy_setopt(req.native_handle(), CURLOPT_COOKIEJAR, "/tmp/cookme"); + curlio::Request req{}; + req.set_url("http://localhost:8083/kartoffel.bin"); + auto resp = session.start(req); - auto resp = session.start(req); + // steady_timer timer{service}; + // timer.expires_after(std::chrono::minutes{3}); + // co_await timer.async_wait(use_awaitable); + // std::cout << "Done with artifical timeout\n"; - // steady_timer timer{service}; - // timer.expires_after(std::chrono::minutes{3}); - // co_await timer.async_wait(use_awaitable); - // std::cout << "Done with artifical timeout\n"; + do { + co_await resp.async_await_next_headers(use_awaitable); + std::cout << "=======RECEIVED HEADER======\n"; + } while (resp.is_redirect()); + std::cout << "Final headers received\n"; - do { - co_await resp.async_await_headers(use_awaitable); - std::cout << "=======RECEIVED HEADER======\n"; - } while (resp.is_redirect()); - std::cout << "Final headers received\n"; + // co_await curlio::quick::async_read_all(resp, use_awaitable); + // std::cout << "\nRead all data\n"; - co_await curlio::quick::async_read_all(resp, use_awaitable); - std::cout << "\nRead all data\n"; - - // while (true) { - // char buf[4096]; - // auto [ec, n] = co_await req.async_read_some(buffer(buf), use_nothrow_awaitable); - // if (ec == error::eof) { - // break; - // } - // // std::cout.write(buf, n); - // } - co_await resp.async_await_completion(use_awaitable); - break; + while (true) { + char buf[4096]; + auto [ec, n] = co_await resp.async_read_some(buffer(buf), use_nothrow_awaitable); + if (ec == error::eof) { + break; + } + // std::cout.write(buf, n); } + std::cout << "Done reading\n"; + co_await resp.async_await_completion(use_awaitable); } catch (const std::exception& e) { std::cerr << "Exception: " << e.what() << "\n"; } @@ -72,6 +64,8 @@ int main(int argc, char** argv) detached); std::cout << "Service running with cURL version: " << curl_version() << "\n"; + std::thread t{ [&] { service.run(); } }; service.run(); + t.join(); curl_global_cleanup(); } diff --git a/examples/simple.cpp b/examples/simple.cpp new file mode 100644 index 0000000..d97ad49 --- /dev/null +++ b/examples/simple.cpp @@ -0,0 +1,48 @@ +#include +#include +#include +#include + +using namespace boost::asio; + +int main(int argc, char** argv) +{ + curl_global_init(CURL_GLOBAL_ALL); + std::string s; + io_service service; + + co_spawn( + service, + [&]() -> awaitable { + curlio::Session session{ service.get_executor() }; + curlio::Request req{}; + req.set_url("http://example.com"); + auto resp = session.start(req); + + co_await resp.async_await_last_headers(use_awaitable); + std::cout << "Headers received\n"; + + const std::string content = co_await curlio::quick::async_read_all(resp, use_awaitable); + std::cout << content << "\n"; + std::cout << "Done reading " << content.length() << " bytes\n"; + + // // or manually + // char buf[4096]; + // while (true) { + // boost::system::error_code ec; + // const std::size_t read = + // co_await resp.async_read_some(buffer(buf), redirect_error(use_awaitable, ec)); + // if (ec == error::eof) { + // break; + // } + // std::cout.write(buf, read); + // } + + co_await resp.async_await_completion(use_awaitable); + co_return; + }, + detached); + + service.run(); + curl_global_cleanup(); +}