Skip to content

Commit

Permalink
Timeout after which the asynchronous (background) transfer will inter…
Browse files Browse the repository at this point in the history
…rupt data transmission during process termination (#106) (#107)

* Fixed timeout calculation
* Improved synchronization
  • Loading branch information
intuibase authored Oct 8, 2024
1 parent fa88ab1 commit 5b61306
Show file tree
Hide file tree
Showing 14 changed files with 514 additions and 237 deletions.
1 change: 1 addition & 0 deletions docs/configure.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ _Currently there are no additional `OTEL_` options waiting to be contributed ups
|---|---|---|---|
|ELASTIC_OTEL_ENABLED|true|true or false|Enables the automatic bootstrapping of instrumentation code|
|ELASTIC_OTEL_ASYNC_TRANSPORT|true| true or false | Use asynchronous (background) transfer of traces, metrics and logs. If false - brings back original OpenTelemetry SDK transfer modes|
|ELASTIC_OTEL_ASYNC_TRANSPORT_SHUTDOWN_TIMEOUT| 30s | interger numberwith time duration. Set to 0 to disable the timeout. Optional units: ms (default), s, m | Timeout after which the asynchronous (background) transfer will interrupt data transmission during process termination|
|ELASTIC_OTEL_MAX_SEND_QUEUE_SIZE|2MB| integer number with optional units: B, MB or GB | Set the maximum buffer size for asynchronous (background) transfer. It is set per worker process.|
|ELASTIC_OTEL_VERIFY_SERVER_CERT|true|true or false|Enables server certificate verification for asynchronous sending|
|ELASTIC_OTEL_LOG_FILE||Filesystem path|Log file name. You can use the %p placeholder where the process ID will appear in the file name, and %t where the timestamp will appear. Please note that the PHP process must have write permissions for the specified path.|
Expand Down
6 changes: 4 additions & 2 deletions prod/native/libcommon/code/AgentGlobals.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ class LogSinkFile;
class InstrumentedFunctionHooksStorageInterface;
namespace transport {
class CurlSender;
template <typename Sender> class HttpTransportAsync;
class HttpEndpoints;
template <typename Sender, typename Endpoints>
class HttpTransportAsync;
}

// clang-format off
Expand All @@ -61,7 +63,7 @@ class AgentGlobals {
std::shared_ptr<InstrumentedFunctionHooksStorageInterface> hooksStorage_;
std::shared_ptr<PhpSapi> sapi_;
std::unique_ptr<PeriodicTaskExecutor> periodicTaskExecutor_;
std::unique_ptr<transport::HttpTransportAsync<transport::CurlSender>> httpTransportAsync_;
std::unique_ptr<transport::HttpTransportAsync<transport::CurlSender, transport::HttpEndpoints> > httpTransportAsync_;
std::shared_ptr<SharedMemoryState> sharedMemory_;
std::shared_ptr<RequestScope> requestScope_;

Expand Down
1 change: 1 addition & 0 deletions prod/native/libcommon/code/CommonUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <charconv>
#include <chrono>
#include <cstdarg>
#include <optional>
#include <string_view>
#include <signal.h>
#include <stddef.h>
Expand Down
3 changes: 2 additions & 1 deletion prod/native/libcommon/code/ConfigurationManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ class ConfigurationManager {
BUILD_METADATA(ELASTIC_OTEL_CFG_OPT_NAME_DEBUG_DIAGNOSTICS_FILE, OptionMetadata::type::string, false),
BUILD_METADATA(ELASTIC_OTEL_CFG_OPT_NAME_VERIFY_SERVER_CERT, OptionMetadata::type::boolean, false),
BUILD_METADATA(ELASTIC_OTEL_CFG_OPT_NAME_MAX_SEND_QUEUE_SIZE, OptionMetadata::type::bytes, false),
BUILD_METADATA(ELASTIC_OTEL_CFG_OPT_NAME_ASYNC_TRANSPORT, OptionMetadata::type::boolean, false)};
BUILD_METADATA(ELASTIC_OTEL_CFG_OPT_NAME_ASYNC_TRANSPORT, OptionMetadata::type::boolean, false),
BUILD_METADATA(ELASTIC_OTEL_CFG_OPT_NAME_ASYNC_TRANSPORT_SHUTDOWN_TIMEOUT, OptionMetadata::type::duration, false)};
// clang-format on
};

Expand Down
2 changes: 2 additions & 0 deletions prod/native/libcommon/code/ConfigurationSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#define ELASTIC_OTEL_CFG_OPT_NAME_DEBUG_DIAGNOSTICS_FILE debug_diagnostic_file
#define ELASTIC_OTEL_CFG_OPT_NAME_MAX_SEND_QUEUE_SIZE max_send_queue_size
#define ELASTIC_OTEL_CFG_OPT_NAME_ASYNC_TRANSPORT async_transport
#define ELASTIC_OTEL_CFG_OPT_NAME_ASYNC_TRANSPORT_SHUTDOWN_TIMEOUT async_transport_shutdown_timeout

namespace elasticapm::php {

Expand All @@ -52,6 +53,7 @@ struct ConfigurationSnapshot {
bool ELASTIC_OTEL_CFG_OPT_NAME_VERIFY_SERVER_CERT = true;
std::size_t ELASTIC_OTEL_CFG_OPT_NAME_MAX_SEND_QUEUE_SIZE = 2 * 1024 * 1204;
bool ELASTIC_OTEL_CFG_OPT_NAME_ASYNC_TRANSPORT = true;
std::chrono::milliseconds ELASTIC_OTEL_CFG_OPT_NAME_ASYNC_TRANSPORT_SHUTDOWN_TIMEOUT = std::chrono::seconds(30);

uint64_t revision = 0;
};
Expand Down
2 changes: 1 addition & 1 deletion prod/native/libcommon/code/RequestScope.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class RequestScope {
}

bool handleError(int type, std::string_view errorFilename, uint32_t errorLineno, std::string_view message) {
ELOG_DEBUG(log_, "RequestScope::handleError type: %d fn: %s:%d msg: %s\n", type, errorFilename.data(), errorLineno, message.data());
ELOG_DEBUG(log_, "RequestScope::handleError type: %d fn: %s:%d msg: %s", type, errorFilename.data(), errorLineno, message.data());

bridge_->callPHPSideErrorHandler(type, errorFilename, errorLineno, message);

Expand Down
7 changes: 6 additions & 1 deletion prod/native/libcommon/code/transport/CurlSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ static int CurlDebugFunc(CURL *handle, curl_infotype type, char *data, size_t si
return 0;
}

static size_t CurlWriteFunc(char *data, size_t size, size_t nmemb, void *clientp) {
return size * nmemb;
}

CurlSender::CurlSender(std::shared_ptr<LoggerInterface> logger, std::chrono::milliseconds timeout, bool verifyCert) : log_(std::move(logger)) {

handle_ = curl_easy_init();
Expand All @@ -49,6 +53,7 @@ CurlSender::CurlSender(std::shared_ptr<LoggerInterface> logger, std::chrono::mil

curl_easy_setopt(handle_, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(handle_, CURLOPT_FORBID_REUSE, 0L);
curl_easy_setopt(handle_, CURLOPT_WRITEFUNCTION, CurlWriteFunc);

if (log_ && log_->doesMeetsLevelCondition(LogLevel::logLevel_trace)) {
curl_easy_setopt(handle_, CURLOPT_DEBUGFUNCTION, CurlDebugFunc);
Expand All @@ -61,9 +66,9 @@ int16_t CurlSender::sendPayload(std::string const &endpointUrl, struct curl_slis
curl_easy_setopt(handle_, CURLOPT_URL, endpointUrl.c_str());
curl_easy_setopt(handle_, CURLOPT_HTTPHEADER, headers);

curl_easy_setopt(handle_, CURLOPT_POST, 1L);
curl_easy_setopt(handle_, CURLOPT_POSTFIELDS, payload.data());
curl_easy_setopt(handle_, CURLOPT_POSTFIELDSIZE, payload.size());
curl_easy_setopt(handle_, CURLOPT_POST, 1L);

CURLcode res = curl_easy_perform(handle_);
if (res != CURLE_OK) {
Expand Down
6 changes: 3 additions & 3 deletions prod/native/libcommon/code/transport/CurlSender.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@

#include <boost/noncopyable.hpp>

using namespace std::literals;

namespace elasticapm::php::transport {

class CurlSender : public boost::noncopyable {
class CurlSender {
public:
CurlSender(std::shared_ptr<LoggerInterface> logger, std::chrono::milliseconds timeout, bool verifyCert);

CurlSender(CurlSender &&) = delete;
CurlSender &operator=(CurlSender &&) = delete;
CurlSender(CurlSender const &) = delete;
CurlSender &operator=(CurlSender const &) = delete;

~CurlSender() {
if (handle_) {
Expand Down
107 changes: 107 additions & 0 deletions prod/native/libcommon/code/transport/HttpEndpoint.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include "CommonUtils.h"

#include <chrono>
#include <memory>
#include <string>
#include <string_view>
#include <curl/curl.h>

namespace elasticapm::php::transport {

using namespace std::literals;

class HttpEndpoint {
public:
using enpointHeaders_t = std::vector<std::pair<std::string_view, std::string_view>>;
using connectionId_t = std::size_t;

HttpEndpoint(HttpEndpoint &&) = delete;
HttpEndpoint &operator=(HttpEndpoint &&) = delete;
HttpEndpoint(HttpEndpoint const &) = delete;
HttpEndpoint &operator=(HttpEndpoint const &) = delete;

HttpEndpoint(std::string endpoint, std::string_view contentType, enpointHeaders_t const &headers, std::size_t maxRetries, std::chrono::milliseconds retryDelay) : endpoint_(std::move(endpoint)), maxRetries_(maxRetries), retryDelay_(retryDelay) {
auto connectionDetails = utils::getConnectionDetailsFromURL(endpoint_);
if (!connectionDetails) {
std::string msg = "Unable to parse connection details from endpoint: "s;
msg.append(endpoint_);
throw std::runtime_error(msg);
}
connectionId_ = std::hash<std::string>{}(connectionDetails.value());

fillCurlHeaders(contentType, headers);
}

~HttpEndpoint() {
if (curlHeaders_) {
curl_slist_free_all(curlHeaders_);
curlHeaders_ = nullptr;
}
}

std::string const &getEndpoint() const {
return endpoint_;
}

struct curl_slist *getHeaders() {
return curlHeaders_;
}

connectionId_t getConnectionId() const {
return connectionId_;
}

std::size_t getMaxRetries() const {
return maxRetries_;
}

std::chrono::milliseconds getRetryDelay() const {
return retryDelay_;
}

private:
void fillCurlHeaders(std::string_view contentType, enpointHeaders_t const &headers) {
if (!contentType.empty()) {
std::string cType = "Content-Type: "s;
cType.append(contentType);
curlHeaders_ = curl_slist_append(curlHeaders_, cType.c_str());
}

for (auto const &hdr : headers) {
std::string header;
header.append(hdr.first);
header.append(": "sv);
header.append(hdr.second);
curlHeaders_ = curl_slist_append(curlHeaders_, header.c_str());
}
}

std::string endpoint_;
std::size_t maxRetries_ = 1;
std::chrono::milliseconds retryDelay_ = 0ms;
connectionId_t connectionId_;
struct curl_slist *curlHeaders_ = nullptr;
};

} // namespace elasticapm::php::transport
84 changes: 84 additions & 0 deletions prod/native/libcommon/code/transport/HttpEndpoints.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include "CommonUtils.h"
#include "CurlSender.h"
#include "HttpEndpoint.h"
#include "LoggerInterface.h"

#include <chrono>
#include <memory>
#include <string>
#include <string_view>
#include <unordered_map>

namespace elasticapm::php::transport {

using namespace std::literals;

class HttpEndpoints {
public:
using endpointUrlHash_t = std::size_t;

HttpEndpoints(std::shared_ptr<LoggerInterface> log) : log_(log) {
}

bool add(std::string endpointUrl, size_t endpointHash, bool verifyServerCertificate, std::string contentType, HttpEndpoint::enpointHeaders_t const &endpointHeaders, std::chrono::milliseconds timeout, std::size_t maxRetries, std::chrono::milliseconds retryDelay) {
std::lock_guard<std::mutex> lock(mutex_);
auto result = endpoints_.try_emplace(endpointHash, std::move(endpointUrl), std::move(contentType), endpointHeaders, maxRetries, retryDelay);
if (connections_.try_emplace(result.first->second.getConnectionId(), log_, timeout, verifyServerCertificate).second) { // CurlSender
ELOG_DEBUG(log_, "HttpEndpoints::add endpointUrl '%s' enpointHash: %X initialize new connectionId: %X", result.first->second.getEndpoint().c_str(), endpointHash, result.first->second.getConnectionId());
return true;
}
return false;
}

std::tuple<std::string, curl_slist *, HttpEndpoint::connectionId_t, CurlSender &, std::size_t, std::chrono::milliseconds> getConnection(size_t endpointHash) {
std::lock_guard<std::mutex> lock(mutex_);
auto const &endpoint = endpoints_.find(endpointHash);
if (endpoint == std::end(endpoints_)) {
std::stringstream stream;
stream << "HttpEnpoints missing enpointHash:" << std::hex << endpointHash;
throw std::runtime_error(stream.str());
}

auto const &connection = connections_.find(endpoint->second.getConnectionId());
if (connection == std::end(connections_)) {
std::stringstream stream;
stream << "HttpEndpoints enpointHash:" << std::hex << endpointHash << " missing connectionId " << std::hex << endpoint->second.getConnectionId();
throw std::runtime_error(stream.str());
}

auto &conn = connection->second;
auto maxRetries = std::max(static_cast<std::size_t>(1), static_cast<std::size_t>(endpoint->second.getMaxRetries()));
auto retryDelay = endpoint->second.getRetryDelay();

return {endpoint->second.getEndpoint(), endpoint->second.getHeaders(), endpoint->second.getConnectionId(), conn, maxRetries, retryDelay};
}

protected:
std::shared_ptr<LoggerInterface> log_;
std::mutex mutex_;
std::unordered_map<endpointUrlHash_t, HttpEndpoint> endpoints_;
std::unordered_map<HttpEndpoint::connectionId_t, CurlSender> connections_;
};

} // namespace elasticapm::php::transport
Loading

0 comments on commit 5b61306

Please sign in to comment.