Skip to content

Commit

Permalink
[coro_http][fix and update]update coro_http (#524)
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Dec 12, 2023
1 parent 30cf244 commit 13a24b8
Show file tree
Hide file tree
Showing 6 changed files with 759 additions and 37 deletions.
76 changes: 44 additions & 32 deletions include/ylt/thirdparty/cinatra/coro_http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
};

coro_http_client(asio::io_context::executor_type executor)
: socket_(std::make_shared<socket_t>(executor)),
: executor_wrapper_(executor),
timer_(&executor_wrapper_),
socket_(std::make_shared<socket_t>(executor)),
read_buf_(socket_->read_buf_),
chunked_buf_(socket_->chunked_buf_),
executor_wrapper_(executor),
timer_(&executor_wrapper_) {}
chunked_buf_(socket_->chunked_buf_) {}

coro_http_client(
coro_io::ExecutorWrapper<> *executor = coro_io::get_global_executor())
Expand Down Expand Up @@ -280,14 +280,14 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

auto [ok, u] = handle_uri(data, no_schema ? append_uri : uri);
if (!ok) {
co_return resp_data{{}, 404};
co_return resp_data{std::make_error_code(std::errc::protocol_error), 404};
}

auto future = start_timer(req_timeout_duration_, "connect timer");

data = co_await connect(u);
if (auto ec = co_await wait_timer(std::move(future)); ec) {
co_return resp_data{{}, 404};
co_return resp_data{ec, 404};
}
if (!data.net_err) {
data.status = 200;
Expand Down Expand Up @@ -644,7 +644,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
std::error_code err_code;
timer_.cancel(err_code);
auto ret = co_await std::move(future);
co_await std::move(future);
if (is_timeout_) {
co_return std::make_error_code(std::errc::timed_out);
}
Expand All @@ -659,14 +659,15 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
});
if (form_data_.empty()) {
CINATRA_LOG_WARNING << "no multipart";
co_return resp_data{{}, 404};
co_return resp_data{std::make_error_code(std::errc::invalid_argument),
404};
}

req_context<> ctx{req_content_type::multipart, "", ""};
resp_data data{};
auto [ok, u] = handle_uri(data, uri);
if (!ok) {
co_return resp_data{{}, 404};
co_return resp_data{std::make_error_code(std::errc::protocol_error), 404};
}

size_t content_len = multipart_content_len();
Expand All @@ -678,17 +679,19 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::error_code ec{};
size_t size = 0;

auto future = start_timer(req_timeout_duration_, "connect timer");
if (socket_->has_closed_) {
auto future = start_timer(req_timeout_duration_, "connect timer");

data = co_await connect(u);
if (ec = co_await wait_timer(std::move(future)); ec) {
co_return resp_data{{}, 404};
}
if (data.net_err) {
co_return data;
data = co_await connect(u);
if (ec = co_await wait_timer(std::move(future)); ec) {
co_return resp_data{ec, 404};
}
if (data.net_err) {
co_return data;
}
}

future = start_timer(req_timeout_duration_, "upload timer");
auto future = start_timer(req_timeout_duration_, "upload timer");
std::tie(ec, size) = co_await async_write(asio::buffer(header_str));
#ifdef INJECT_FOR_HTTP_CLIENT_TEST
if (inject_write_failed == ClientInjectAction::write_failed) {
Expand Down Expand Up @@ -733,7 +736,8 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::string uri, std::string name, std::string filename) {
if (!add_file_part(std::move(name), std::move(filename))) {
CINATRA_LOG_WARNING << "open file failed or duplicate test names";
co_return resp_data{{}, 404};
co_return resp_data{std::make_error_code(std::errc::invalid_argument),
404};
}
co_return co_await async_upload_multipart(std::move(uri));
}
Expand All @@ -752,6 +756,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {

req_context<> ctx{};
if (range.empty()) {
add_header("Transfer-Encoding", "chunked");
ctx = {req_content_type::none, "", "", std::move(file)};
}
else {
Expand Down Expand Up @@ -831,7 +836,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
resp_data data{};
auto [ok, u] = handle_uri(data, uri);
if (!ok) {
co_return resp_data{{}, 404};
co_return resp_data{std::make_error_code(std::errc::protocol_error), 404};
}

constexpr bool is_stream_file = is_stream_ptr_v<Source>;
Expand Down Expand Up @@ -862,17 +867,19 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
std::error_code ec{};
size_t size = 0;

auto future = start_timer(req_timeout_duration_, "connect timer");
if (socket_->has_closed_) {
auto future = start_timer(req_timeout_duration_, "connect timer");

data = co_await connect(u);
if (ec = co_await wait_timer(std::move(future)); ec) {
co_return resp_data{{}, 404};
}
if (data.net_err) {
co_return data;
data = co_await connect(u);
if (ec = co_await wait_timer(std::move(future)); ec) {
co_return resp_data{ec, 404};
}
if (data.net_err) {
co_return data;
}
}

future = start_timer(req_timeout_duration_, "upload timer");
auto future = start_timer(req_timeout_duration_, "upload timer");
std::tie(ec, size) = co_await async_write(asio::buffer(header_str));
if (ec) {
co_return resp_data{ec, 404};
Expand All @@ -895,12 +902,17 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
}
else if constexpr (std::is_same_v<Source, std::string> ||
std::is_same_v<Source, std::string_view>) {
coro_io::coro_file coro_file(source, coro_io::flags::read_only);
while (!coro_file.eof()) {
coro_io::coro_file file{};
bool ok = co_await file.async_open(source, coro_io::flags::read_only);
if (!ok) {
co_return resp_data{
std::make_error_code(std::errc::bad_file_descriptor), 404};
}
while (!file.eof()) {
auto [rd_ec, rd_size] =
co_await coro_file.async_read(file_data.data(), file_data.size());
co_await file.async_read(file_data.data(), file_data.size());
auto bufs = cinatra::to_chunked_buffers<asio::const_buffer>(
file_data.data(), rd_size, chunk_size_str, coro_file.eof());
file_data.data(), rd_size, chunk_size_str, file.eof());
if (std::tie(ec, size) = co_await async_write(bufs); ec) {
co_return resp_data{ec, 404};
}
Expand Down Expand Up @@ -1321,7 +1333,7 @@ class coro_http_client : public std::enable_shared_from_this<coro_http_client> {
co_return data;
}

bool is_ranges = parser_.is_ranges();
bool is_ranges = parser_.is_resp_ranges();
if (is_ranges) {
is_keep_alive = true;
}
Expand Down
41 changes: 41 additions & 0 deletions include/ylt/thirdparty/cinatra/coro_http_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ class coro_http_connection
#endif

async_simple::coro::Lazy<void> start() {
#ifdef CINATRA_ENABLE_SSL
bool has_shake = false;
#endif
while (true) {
#ifdef CINATRA_ENABLE_SSL
if (use_ssl_ && !has_shake) {
Expand Down Expand Up @@ -220,6 +222,45 @@ class coro_http_connection
co_return true;
}

async_simple::coro::Lazy<bool> write_data(std::string_view message) {
std::vector<asio::const_buffer> buffers;
buffers.push_back(asio::buffer(message));
auto [ec, _] = co_await async_write(buffers);
if (ec) {
CINATRA_LOG_ERROR << "async_write error: " << ec.message();
close();
co_return false;
}

if (!keep_alive_) {
// now in io thread, so can close socket immediately.
close();
}

co_return true;
}

async_simple::coro::Lazy<bool> write_chunked_data(std::string_view buf,
bool eof) {
std::string chunk_size_str = "";
std::vector<asio::const_buffer> buffers =
to_chunked_buffers<asio::const_buffer>(buf.data(), buf.length(),
chunk_size_str, eof);
auto [ec, _] = co_await async_write(std::move(buffers));
if (ec) {
CINATRA_LOG_ERROR << "async_write error: " << ec.message();
close();
co_return false;
}

if (!keep_alive_) {
// now in io thread, so can close socket immediately.
close();
}

co_return true;
}

bool sync_reply() { return async_simple::coro::syncAwait(reply()); }

async_simple::coro::Lazy<bool> begin_chunked() {
Expand Down
4 changes: 4 additions & 0 deletions include/ylt/thirdparty/cinatra/coro_http_request.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class coro_http_request {
return is_chunk;
}

bool is_resp_ranges() { return parser_.is_resp_ranges(); }

bool is_req_ranges() { return parser_.is_req_ranges(); }

content_type get_content_type() {
static content_type thread_local content_type = get_content_type_impl();
return content_type;
Expand Down
Loading

0 comments on commit 13a24b8

Please sign in to comment.