From 0b94d71258be109f16da573ad79a75027dcc6442 Mon Sep 17 00:00:00 2001 From: "Alex E." <36134278+chusitoo@users.noreply.github.com> Date: Wed, 18 Dec 2024 11:48:05 -0500 Subject: [PATCH] [EXPORTER] Optimize OTLP HTTP compression (#3178) --- exporters/otlp/src/otlp_http_client.cc | 2 +- ext/src/http/client/curl/http_client_curl.cc | 128 ++++++++++++++---- .../http/client/curl/http_operation_curl.cc | 4 +- ext/test/http/curl_http_test.cc | 127 ++++++++++++++++- 4 files changed, 232 insertions(+), 29 deletions(-) diff --git a/exporters/otlp/src/otlp_http_client.cc b/exporters/otlp/src/otlp_http_client.cc index 506295ad7f..a2620756cc 100644 --- a/exporters/otlp/src/otlp_http_client.cc +++ b/exporters/otlp/src/otlp_http_client.cc @@ -890,7 +890,7 @@ OtlpHttpClient::createSession( // Parse uri and store it to cache if (http_uri_.empty()) { - auto parse_url = opentelemetry::ext::http::common::UrlParser(std::string(options_.url)); + const auto parse_url = opentelemetry::ext::http::common::UrlParser(options_.url); if (!parse_url.success_) { std::string error_message = "[OTLP HTTP Client] Export failed, invalid url: " + options_.url; diff --git a/ext/src/http/client/curl/http_client_curl.cc b/ext/src/http/client/curl/http_client_curl.cc index e6292f54f2..a2b73ecb2b 100644 --- a/ext/src/http/client/curl/http_client_curl.cc +++ b/ext/src/http/client/curl/http_client_curl.cc @@ -3,14 +3,17 @@ #include #include +#include +#include #include #include -#include #include +#include #include #include #include #include +#include #include #include #include @@ -57,11 +60,85 @@ nostd::shared_ptr HttpCurlGlobalInitializer::GetInsta return shared_initializer; } +#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW +// Original source: +// https://stackoverflow.com/questions/12398377/is-it-possible-to-have-zlib-read-from-and-write-to-the-same-memory-buffer/12412863#12412863 +int deflateInPlace(z_stream *strm, unsigned char *buf, uint32_t len, uint32_t *max_len) +{ + // must be large enough to hold zlib or gzip header (if any) and one more byte -- 11 works for the + // worst case here, but if gzip encoding is used and a deflateSetHeader() call is inserted in this + // code after the deflateReset(), then the 11 needs to be increased to accommodate the resulting + // gzip header size plus one + std::array temp{}; + + // kick start the process with a temporary output buffer -- this allows deflate to consume a large + // chunk of input data in order to make room for output data there + strm->next_in = buf; + strm->avail_in = len; + if (*max_len < len) + { + *max_len = len; + } + strm->next_out = temp.data(); + strm->avail_out = (std::min)(static_cast(temp.size()), *max_len); + auto ret = deflate(strm, Z_FINISH); + if (ret == Z_STREAM_ERROR) + { + return ret; + } + + // if we can, copy the temporary output data to the consumed portion of the input buffer, and then + // continue to write up to the start of the consumed input for as long as possible + auto have = strm->next_out - temp.data(); // number of bytes in temp[] + if (have <= static_cast(strm->avail_in ? len - strm->avail_in : *max_len)) + { + std::memcpy(buf, temp.data(), have); + strm->next_out = buf + have; + have = 0; + while (ret == Z_OK) + { + strm->avail_out = + strm->avail_in ? strm->next_in - strm->next_out : (buf + *max_len) - strm->next_out; + ret = deflate(strm, Z_FINISH); + } + if (ret != Z_BUF_ERROR || strm->avail_in == 0) + { + *max_len = strm->next_out - buf; + return ret == Z_STREAM_END ? Z_OK : ret; + } + } + + // the output caught up with the input due to insufficiently compressible data -- copy the + // remaining input data into an allocated buffer and complete the compression from there to the + // now empty input buffer (this will only occur for long incompressible streams, more than ~20 MB + // for the default deflate memLevel of 8, or when *max_len is too small and less than the length + // of the header plus one byte) + auto hold = static_cast>( + strm->zalloc(strm->opaque, strm->avail_in, 1)); // allocated buffer to hold input data + if (hold == Z_NULL) + { + return Z_MEM_ERROR; + } + std::memcpy(hold, strm->next_in, strm->avail_in); + strm->next_in = hold; + if (have) + { + std::memcpy(buf, temp.data(), have); + strm->next_out = buf + have; + } + strm->avail_out = (buf + *max_len) - strm->next_out; + ret = deflate(strm, Z_FINISH); + strm->zfree(strm->opaque, hold); + *max_len = strm->next_out - buf; + return ret == Z_OK ? Z_BUF_ERROR : (ret == Z_STREAM_END ? Z_OK : ret); +} +#endif // ENABLE_OTLP_COMPRESSION_PREVIEW + void Session::SendRequest( std::shared_ptr callback) noexcept { is_session_active_.store(true, std::memory_order_release); - std::string url = host_ + std::string(http_request_->uri_); + const auto &url = host_ + http_request_->uri_; auto callback_ptr = callback.get(); bool reuse_connection = false; @@ -76,44 +153,47 @@ void Session::SendRequest( if (http_request_->compression_ == opentelemetry::ext::http::client::Compression::kGzip) { #ifdef ENABLE_OTLP_COMPRESSION_PREVIEW - http_request_->AddHeader("Content-Encoding", "gzip"); - - opentelemetry::ext::http::client::Body compressed_body(http_request_->body_.size()); - z_stream zs; - zs.zalloc = Z_NULL; - zs.zfree = Z_NULL; - zs.opaque = Z_NULL; - zs.avail_in = static_cast(http_request_->body_.size()); - zs.next_in = http_request_->body_.data(); - zs.avail_out = static_cast(compressed_body.size()); - zs.next_out = compressed_body.data(); + z_stream zs{}; + zs.zalloc = Z_NULL; + zs.zfree = Z_NULL; + zs.opaque = Z_NULL; // ZLIB: Have to maually specify 16 bits for the Gzip headers - const int window_bits = 15 + 16; + static constexpr int kWindowBits = MAX_WBITS + 16; + static constexpr int kMemLevel = MAX_MEM_LEVEL; - int stream = - deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits, 8, Z_DEFAULT_STRATEGY); + auto stream = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, kWindowBits, kMemLevel, + Z_DEFAULT_STRATEGY); if (stream == Z_OK) { - deflate(&zs, Z_FINISH); - deflateEnd(&zs); - compressed_body.resize(zs.total_out); - http_request_->SetBody(compressed_body); + auto size = static_cast(http_request_->body_.size()); + auto max_size = size; + stream = deflateInPlace(&zs, http_request_->body_.data(), size, &max_size); + + if (stream == Z_OK) + { + http_request_->AddHeader("Content-Encoding", "gzip"); + http_request_->body_.resize(max_size); + } } - else + + if (stream != Z_OK) { if (callback) { - callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed, ""); + callback->OnEvent(opentelemetry::ext::http::client::SessionState::CreateFailed, + zs.msg ? zs.msg : ""); } is_session_active_.store(false, std::memory_order_release); } + + deflateEnd(&zs); #else OTEL_INTERNAL_LOG_ERROR( "[HTTP Client Curl] Set WITH_OTLP_HTTP_COMPRESSION=ON to use gzip compression with the " "OTLP HTTP Exporter"); -#endif +#endif // ENABLE_OTLP_COMPRESSION_PREVIEW } curl_operation_.reset(new HttpOperation( @@ -226,7 +306,7 @@ HttpClient::~HttpClient() std::shared_ptr HttpClient::CreateSession( nostd::string_view url) noexcept { - auto parsedUrl = common::UrlParser(std::string(url)); + const auto parsedUrl = common::UrlParser(std::string(url)); if (!parsedUrl.success_) { return std::make_shared(*this); diff --git a/ext/src/http/client/curl/http_operation_curl.cc b/ext/src/http/client/curl/http_operation_curl.cc index b80624d069..c27cff1b41 100644 --- a/ext/src/http/client/curl/http_operation_curl.cc +++ b/ext/src/http/client/curl/http_operation_curl.cc @@ -301,9 +301,7 @@ HttpOperation::HttpOperation(opentelemetry::ext::http::client::Method method, { for (auto &kv : this->request_headers_) { - std::string header = std::string(kv.first); - header += ": "; - header += std::string(kv.second); + const auto header = std::string(kv.first).append(": ").append(kv.second); curl_resource_.headers_chunk = curl_slist_append(curl_resource_.headers_chunk, header.c_str()); } diff --git a/ext/test/http/curl_http_test.cc b/ext/test/http/curl_http_test.cc index 497c5bb529..f1fbe495b7 100644 --- a/ext/test/http/curl_http_test.cc +++ b/ext/test/http/curl_http_test.cc @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -558,7 +559,6 @@ TEST_F(BasicCurlHttpTests, ElegantQuitQuick) ASSERT_TRUE(handler->is_called_); ASSERT_TRUE(handler->got_response_); } - TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore) { { @@ -581,3 +581,128 @@ TEST_F(BasicCurlHttpTests, BackgroundThreadWaitMore) ASSERT_TRUE(http_client.MaybeSpawnBackgroundThread()); } } + +#ifdef ENABLE_OTLP_COMPRESSION_PREVIEW +struct GzipEventHandler : public CustomEventHandler +{ + ~GzipEventHandler() override = default; + + void OnResponse(http_client::Response & /* response */) noexcept override {} + + void OnEvent(http_client::SessionState state, nostd::string_view reason) noexcept override + { + is_called_ = true; + state_ = state; + reason_ = std::string{reason}; + } + + bool is_called_ = false; + http_client::SessionState state_ = static_cast(-1); + std::string reason_; +}; + +TEST_F(BasicCurlHttpTests, GzipCompressibleData) +{ + received_requests_.clear(); + auto session_manager = http_client::HttpClientFactory::Create(); + EXPECT_TRUE(session_manager != nullptr); + + auto session = session_manager->CreateSession("http://127.0.0.1:19000"); + auto request = session->CreateRequest(); + request->SetUri("post/"); + request->SetMethod(http_client::Method::Post); + + const auto original_size = 500UL; + http_client::Body body(original_size); + std::iota(body.begin(), body.end(), 0); + request->SetBody(body); + request->AddHeader("Content-Type", "text/plain"); + request->SetCompression(opentelemetry::ext::http::client::Compression::kGzip); + auto handler = std::make_shared(); + session->SendRequest(handler); + ASSERT_TRUE(waitForRequests(30, 1)); + session->FinishSession(); + ASSERT_TRUE(handler->is_called_); + ASSERT_EQ(handler->state_, http_client::SessionState::Response); + ASSERT_TRUE(handler->reason_.empty()); + + auto http_request = + dynamic_cast(request.get()); + ASSERT_TRUE(http_request != nullptr); + ASSERT_LT(http_request->body_.size(), original_size); + + session_manager->CancelAllSessions(); + session_manager->FinishAllSessions(); +} + +TEST_F(BasicCurlHttpTests, GzipIncompressibleData) +{ + received_requests_.clear(); + auto session_manager = http_client::HttpClientFactory::Create(); + EXPECT_TRUE(session_manager != nullptr); + + auto session = session_manager->CreateSession("http://127.0.0.1:19000"); + auto request = session->CreateRequest(); + request->SetUri("post/"); + request->SetMethod(http_client::Method::Post); + + // Random data generated using code snippet below. + // const auto original_size = 500UL; + // http_client::Body body(original_size); + // std::random_device rd; + // std::mt19937 gen(rd()); + // std::uniform_int_distribution<> uid(1, 255); + // std::generate(body.begin(), body.end(), [&]() { return uid(gen); }); + + // The input values are fixed to make the test repeatable in the event that some distributions + // might yield results that are, in fact, compressible. + http_client::Body body = { + 140, 198, 12, 56, 165, 185, 173, 20, 13, 83, 127, 223, 77, 38, 224, 43, 236, 10, 178, + 75, 169, 157, 136, 199, 74, 30, 148, 195, 51, 30, 225, 21, 121, 219, 7, 155, 198, 121, + 205, 102, 80, 38, 132, 202, 45, 229, 206, 90, 150, 202, 53, 221, 54, 37, 172, 90, 238, + 248, 191, 240, 109, 227, 248, 41, 251, 121, 35, 226, 107, 122, 15, 242, 203, 45, 64, 195, + 186, 23, 1, 158, 61, 196, 182, 26, 201, 47, 211, 241, 251, 209, 255, 170, 181, 192, 89, + 133, 176, 60, 178, 97, 168, 223, 152, 9, 118, 98, 169, 240, 170, 15, 13, 161, 24, 57, + 123, 117, 230, 30, 244, 117, 238, 255, 198, 232, 95, 148, 37, 61, 67, 103, 31, 240, 52, + 21, 145, 175, 201, 86, 19, 61, 228, 76, 131, 185, 111, 149, 203, 143, 16, 142, 95, 173, + 42, 106, 39, 203, 116, 235, 20, 162, 112, 173, 112, 70, 126, 191, 210, 219, 90, 145, 126, + 118, 43, 241, 101, 66, 175, 179, 5, 233, 208, 164, 180, 83, 214, 194, 173, 29, 179, 149, + 75, 202, 17, 152, 139, 130, 94, 247, 142, 249, 159, 224, 205, 131, 93, 82, 186, 226, 210, + 84, 17, 212, 155, 61, 226, 103, 152, 37, 3, 193, 216, 219, 203, 101, 99, 33, 59, 38, + 106, 62, 232, 127, 44, 125, 90, 169, 148, 238, 34, 106, 12, 221, 90, 173, 67, 122, 232, + 161, 89, 198, 43, 241, 195, 248, 219, 35, 47, 200, 11, 227, 168, 246, 243, 103, 38, 17, + 203, 237, 203, 158, 204, 89, 231, 19, 24, 25, 199, 160, 233, 43, 117, 144, 196, 117, 152, + 42, 121, 189, 217, 202, 221, 250, 157, 237, 47, 29, 64, 32, 10, 32, 243, 28, 114, 158, + 228, 102, 36, 191, 139, 217, 161, 162, 186, 19, 141, 212, 49, 1, 239, 153, 107, 249, 31, + 235, 138, 73, 80, 58, 152, 15, 149, 50, 42, 84, 75, 95, 82, 56, 86, 143, 45, 214, + 11, 184, 164, 181, 249, 74, 184, 26, 207, 165, 162, 240, 154, 90, 56, 175, 72, 4, 166, + 188, 78, 232, 87, 243, 50, 59, 62, 175, 213, 210, 182, 31, 123, 91, 118, 98, 249, 23, + 170, 240, 228, 236, 121, 87, 132, 129, 250, 41, 227, 204, 250, 147, 145, 109, 149, 210, 21, + 174, 165, 127, 234, 64, 211, 52, 93, 126, 117, 231, 216, 210, 15, 16, 2, 167, 215, 178, + 104, 245, 119, 211, 235, 120, 135, 202, 117, 150, 101, 94, 201, 136, 179, 205, 167, 212, 236, + 7, 178, 132, 228, 65, 230, 90, 171, 109, 31, 83, 31, 210, 123, 136, 76, 186, 81, 205, + 63, 35, 21, 121, 152, 22, 242, 199, 106, 217, 199, 211, 206, 165, 88, 77, 112, 108, 193, + 122, 8, 193, 74, 91, 50, 6, 156, 185, 165, 15, 92, 116, 3, 18, 244, 165, 191, 2, + 183, 9, 164, 116, 75, 127}; + const auto original_size = body.size(); + + request->SetBody(body); + request->AddHeader("Content-Type", "text/plain"); + request->SetCompression(opentelemetry::ext::http::client::Compression::kGzip); + auto handler = std::make_shared(); + session->SendRequest(handler); + ASSERT_TRUE(waitForRequests(30, 1)); + session->FinishSession(); + ASSERT_TRUE(handler->is_called_); + ASSERT_EQ(handler->state_, http_client::SessionState::Response); + ASSERT_TRUE(handler->reason_.empty()); + + auto http_request = + dynamic_cast(request.get()); + ASSERT_TRUE(http_request != nullptr); + ASSERT_EQ(http_request->body_.size(), original_size); + + session_manager->CancelAllSessions(); + session_manager->FinishAllSessions(); +} +#endif // ENABLE_OTLP_COMPRESSION_PREVIEW