From 1a9f473ce5462a4333c0583f5b6d19eb69005c0d Mon Sep 17 00:00:00 2001 From: terrakuh Date: Thu, 19 Sep 2024 16:14:39 +0000 Subject: [PATCH 1/7] Add per-operation cancellation support --- curlio/basic_request.hpp | 2 -- curlio/basic_request.inl | 18 ++++++++++ curlio/basic_response.inl | 9 +++++ curlio/detail/asio_include.hpp | 2 ++ curlio/detail/header_collector.hpp | 13 ++++++- examples/playground.cpp | 54 ++++++++++++++++++++---------- 6 files changed, 78 insertions(+), 20 deletions(-) diff --git a/curlio/basic_request.hpp b/curlio/basic_request.hpp index 6fbf16d..d6fd582 100644 --- a/curlio/basic_request.hpp +++ b/curlio/basic_request.hpp @@ -33,8 +33,6 @@ class BasicRequest { BasicRequest& operator=(const BasicRequest& copy) = delete; BasicRequest& operator=(BasicRequest&& move) = delete; - static std::shared_ptr make_request(BasicSession& session); - private: friend class BasicSession; friend class BasicResponse; diff --git a/curlio/basic_request.inl b/curlio/basic_request.inl index 203ba2e..8e91d56 100644 --- a/curlio/basic_request.inl +++ b/curlio/basic_request.inl @@ -62,6 +62,15 @@ inline auto BasicRequest::async_write_some(const auto& buffers, auto&& std::move(executor), std::bind(std::move(handler), make_error_code(Code::multiple_writes), std::size_t{ 0 })); } else { +#if CURLIO_ASIO_HAS_CANCEL + if (auto slot = boost::asio::get_associated_cancellation_slot(handler); slot.is_connected()) { + slot.assign([this](boost::asio::cancellation_type /* type */) { + _send_handler(boost::asio::error::operation_aborted, nullptr, 0); + _send_handler.reset(); + }); + } +#endif + _send_handler = [this, buffers = std::move(buffers), handler = std::move(handler), executor = std::move(executor)](detail::asio_error_code ec, char* data, std::size_t size) mutable { @@ -90,6 +99,15 @@ inline auto BasicRequest::async_abort(auto&& token) _send_handler.reset(); } +#if CURLIO_ASIO_HAS_CANCEL + if (auto slot = boost::asio::get_associated_cancellation_slot(handler); slot.is_connected()) { + slot.assign([this](boost::asio::cancellation_type /* type */) { + _send_handler(boost::asio::error::operation_aborted, nullptr, 0); + _send_handler.reset(); + }); + } +#endif + auto executor = CURLIO_ASIO_NS::get_associated_executor(handler, get_executor()); _send_handler = [this, handler = std::move(handler), executor = std::move(executor)]( diff --git a/curlio/basic_response.inl b/curlio/basic_response.inl index 6580ea3..2d04f4e 100644 --- a/curlio/basic_response.inl +++ b/curlio/basic_response.inl @@ -52,6 +52,15 @@ inline auto BasicResponse::async_read_some(const auto& buffers, auto&& std::bind(std::move(handler), make_error_code(Code::multiple_reads), std::size_t{ 0 })); } // Wait for more data. else { +#if CURLIO_ASIO_HAS_CANCEL + if (auto slot = boost::asio::get_associated_cancellation_slot(handler); slot.is_connected()) { + slot.assign([this](boost::asio::cancellation_type /* type */) { + _receive_handler(boost::asio::error::operation_aborted, nullptr, 0); + _receive_handler.reset(); + }); + } +#endif + _receive_handler = [this, buffers = std::move(buffers), executor = std::move(executor), handler = std::move(handler)](detail::asio_error_code ec, const char* data, std::size_t size) mutable { diff --git a/curlio/detail/asio_include.hpp b/curlio/detail/asio_include.hpp index e331d4a..4e930ed 100644 --- a/curlio/detail/asio_include.hpp +++ b/curlio/detail/asio_include.hpp @@ -3,9 +3,11 @@ #if defined(CURLIO_USE_STANDALONE_ASIO) # include # define CURLIO_ASIO_NS asio +# define CURLIO_ASIO_HAS_CANCEL __has_include() #else // Fall back to Boost.ASIO # include # define CURLIO_ASIO_NS boost::asio +# define CURLIO_ASIO_HAS_CANCEL __has_include() #endif namespace curlio::detail { diff --git a/curlio/detail/header_collector.hpp b/curlio/detail/header_collector.hpp index c1e93bb..757bcfc 100644 --- a/curlio/detail/header_collector.hpp +++ b/curlio/detail/header_collector.hpp @@ -75,9 +75,20 @@ class HeaderCollector { fields_type{})); } // Need to wait. else { +#if CURLIO_ASIO_HAS_CANCEL + if (auto slot = boost::asio::get_associated_cancellation_slot(handler); slot.is_connected()) { + slot.assign([this](boost::asio::cancellation_type /* type */) { + _headers_received_handler(boost::asio::error::operation_aborted); + _headers_received_handler.reset(); + }); + } +#endif + _headers_received_handler = [this, executor = std::move(executor), handler = std::move(handler)](asio_error_code ec) mutable { - _ready_to_await = false; + if (!ec) { + _ready_to_await = false; + } CURLIO_ASIO_NS::post(std::move(executor), std::bind(std::move(handler), ec, std::move(_fields))); }; } diff --git a/examples/playground.cpp b/examples/playground.cpp index c67f7c4..88bb695 100644 --- a/examples/playground.cpp +++ b/examples/playground.cpp @@ -3,31 +3,53 @@ #include #include -#include -#include -#include using namespace boost::asio; +awaitable await_headers(curlio::Response& response, unsigned int max_redirects) +{ + for (unsigned int i = 0; true; ++i) { + auto headers = co_await response.async_wait_headers(use_awaitable); + const int status = co_await response.async_get_info(use_awaitable); + if (status < 300 || status >= 400) { + std::cout << "Final headers received\n"; + co_return headers; + } else if (i == max_redirects) { + break; + } + std::cout << "Waiting for next header: " << headers.size() << " status: " << status << "\n"; + } + throw std::runtime_error{ "max redirects" }; +} + awaitable async_main() -try { - auto session = curlio::make_session(co_await this_coro::executor); - auto request = curlio::make_request(session); +{ + curlio::Session session{ co_await this_coro::executor }; - request->set_option("http://127.0.0.1:8088/"); + auto request = std::make_shared(session); + request->set_option("http://localhost:8088"); request->set_option(3); request->set_option(1); - auto response = co_await session->async_start(request, use_awaitable); + std::shared_ptr response{}; + + for (int i = 0; i < 2; ++i) { + response = co_await session.async_start(request, use_awaitable); + const auto headers = co_await await_headers(*response, 3); + const int status = co_await response->async_get_info(use_awaitable); + std::cout << "Got " << headers.size() << " headers\n"; + std::cout << "Status: " << status << "\n"; - std::ofstream file{"/workspaces/downer/backend/asd", std::ios::out|std::ios::binary}; - char data[10*1024]; - while (true) { - const std::size_t bytes_transferred = co_await response->async_read_some(buffer(data), use_awaitable); - file.write(data, bytes_transferred); + std::size_t bytes_transferred = 0; + try { + char data[10 * 1024]; + while (true) { + bytes_transferred += co_await response->async_read_some(buffer(data), use_awaitable); + } + } catch (const std::exception& e) { + } + std::cout << "Finished with " << bytes_transferred << " bytes\n"; } -} catch (const std::exception& e) { - std::cerr << "Failed with: " << e.what() << "\n"; } int main(int argc, char** argv) @@ -35,8 +57,6 @@ int main(int argc, char** argv) curl_global_init(CURL_GLOBAL_ALL); io_service service{}; - auto session = curlio::make_session(service.get_executor()); - co_spawn(service, async_main(), detached); std::cout << "Service running with cURL version: " << curl_version() << "\n"; From 11cb45fec1f85f458f5f71bcb988b06e33b1ef25 Mon Sep 17 00:00:00 2001 From: terrakuh Date: Sat, 5 Oct 2024 23:31:50 +0200 Subject: [PATCH 2/7] Fix re-registering of handles too fast --- curlio/basic_session.inl | 3 +++ 1 file changed, 3 insertions(+) diff --git a/curlio/basic_session.inl b/curlio/basic_session.inl index cd3ad47..52717f8 100644 --- a/curlio/basic_session.inl +++ b/curlio/basic_session.inl @@ -41,6 +41,9 @@ inline auto BasicSession::async_start(request_pointer request, auto&& [this, request = std::move(request)](auto handler) mutable { CURLIO_ASIO_NS::dispatch( *_strand, [this, request = std::move(request), handler = std::move(handler)]() mutable { + // If the handle was already registered but the start was too fast, we need to clean it first. + _perform(CURL_SOCKET_TIMEOUT, 0); + const auto easy_handle = request->native_handle(); // TODO error From 4bb5b9c3d30e0ea745ce0ef008a1ef20f3408f62 Mon Sep 17 00:00:00 2001 From: terrakuh Date: Sat, 5 Oct 2024 23:32:05 +0200 Subject: [PATCH 3/7] Add more error checks for CURL API functions --- curlio/basic_request.hpp | 6 +-- curlio/basic_request.inl | 31 ++++++++------- curlio/basic_response.hpp | 4 +- curlio/basic_response.inl | 49 ++++++++++++++++------- curlio/basic_session.hpp | 6 +-- curlio/basic_session.inl | 53 ++++++++++++++----------- curlio/debug.hpp | 63 ++++++++++++++++++++++++++++++ curlio/detail/header_collector.hpp | 41 +++++++++++++------ curlio/error.hpp | 22 ++++++++++- curlio/log.hpp | 25 ------------ curlio/quick/reader.hpp | 2 +- 11 files changed, 202 insertions(+), 100 deletions(-) create mode 100644 curlio/debug.hpp delete mode 100644 curlio/log.hpp diff --git a/curlio/basic_request.hpp b/curlio/basic_request.hpp index b53076c..694179b 100644 --- a/curlio/basic_request.hpp +++ b/curlio/basic_request.hpp @@ -16,15 +16,15 @@ class BasicRequest { using executor_type = Executor; using strand_type = CURLIO_ASIO_NS::strand; - BasicRequest(BasicSession& session) noexcept; - BasicRequest(const BasicRequest& copy) noexcept; + BasicRequest(BasicSession& session); + BasicRequest(const BasicRequest& copy); BasicRequest(BasicRequest&& move) = delete; ~BasicRequest(); template void set_option(detail::option_type