Skip to content

Commit

Permalink
Create a topK table and do sampling based on the number of requests. (#…
Browse files Browse the repository at this point in the history
…14)

* Create a topK table and do sampling based on the number of requests.

Signed-off-by: thomas.ebner <[email protected]>
  • Loading branch information
samohte authored Feb 7, 2024
1 parent 56678a4 commit 7eec5d2
Show file tree
Hide file tree
Showing 23 changed files with 1,415 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// [#extension: envoy.tracers.opentelemetry.samplers.dynatrace]

message DynatraceSamplerConfig {
string tenant_id = 1;
string tenant = 1;

string cluster_id = 2;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ SamplingResult AlwaysOnSampler::shouldSample(const absl::optional<SpanContext> p
OptRef<const Tracing::TraceContext> /*trace_context*/,
const std::vector<SpanContext>& /*links*/) {
SamplingResult result;
result.decision = Decision::RECORD_AND_SAMPLE;
result.decision = Decision::RecordAndSample;
if (parent_context.has_value()) {
result.tracestate = parent_context.value().tracestate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ envoy_cc_library(
name = "dynatrace_sampler_lib",
srcs = [
"dynatrace_sampler.cc",
"sampler_config.cc",
"sampler_config_fetcher.cc",
"sampling_controller.cc",
],
hdrs = [
"dynatrace_sampler.h",
"sampler_config.h",
"sampler_config_fetcher.h",
"sampling_controller.h",
"stream_summary.h",
"tenant_id.h",
],
deps = [
"//source/common/config:datasource_lib",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "config.h"
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/config.h"

#include <memory>

#include "envoy/extensions/tracers/opentelemetry/samplers/v3/dynatrace_sampler.pb.validate.h"

Expand All @@ -16,11 +18,14 @@ DynatraceSamplerFactory::createSampler(const Protobuf::Message& config,
Server::Configuration::TracerFactoryContext& context) {
auto mptr = Envoy::Config::Utility::translateAnyToFactoryConfig(
dynamic_cast<const ProtobufWkt::Any&>(config), context.messageValidationVisitor(), *this);
return std::make_shared<DynatraceSampler>(
MessageUtil::downcastAndValidate<
const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig&>(
*mptr, context.messageValidationVisitor()),
context);

const auto& proto_config = MessageUtil::downcastAndValidate<
const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig&>(
*mptr, context.messageValidationVisitor());

SamplerConfigFetcherPtr cf = std::make_unique<SamplerConfigFetcherImpl>(
context, proto_config.http_uri(), proto_config.token());
return std::make_shared<DynatraceSampler>(proto_config, context, std::move(cf));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace OpenTelemetry {
class DynatraceSamplerFactory : public SamplerFactory {
public:
/**
* @brief Create a Sampler which samples every span
* @brief Create a Dynatrace sampler
*
* @param context
* @return SamplerSharedPtr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,139 @@
#include <sstream>
#include <string>

#include "source/common/common/hash.h"
#include "source/common/config/datasource.h"
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/tenant_id.h"
#include "source/extensions/tracers/opentelemetry/samplers/sampler.h"
#include "source/extensions/tracers/opentelemetry/span_context.h"
#include "source/extensions/tracers/opentelemetry/trace_state.h"

#include "absl/strings/str_cat.h"

namespace Envoy {
namespace Extensions {
namespace Tracers {
namespace OpenTelemetry {

static const char* SAMPLING_EXTRAPOLATION_SPAN_ATTRIBUTE_NAME =
"sampling_extrapolation_set_in_sampler";
namespace {

constexpr std::chrono::minutes SAMPLING_UPDATE_TIMER_DURATION{1};
const char* SAMPLING_EXTRAPOLATION_SPAN_ATTRIBUTE_NAME = "sampling_extrapolation_set_in_sampler";

class DynatraceTag {
public:
static DynatraceTag createInvalid() { return {false, false, 0, 0}; }

static DynatraceTag create(bool ignored, uint32_t sampling_exponent, uint32_t path_info) {
return {true, ignored, sampling_exponent, path_info};
}

static DynatraceTag create(const std::string& value) {
std::vector<absl::string_view> tracestate_components =
absl::StrSplit(value, ';', absl::AllowEmpty());
if (tracestate_components.size() < 8) {
return createInvalid();
}

if (tracestate_components[0] != "fw4") {
return createInvalid();
}
bool ignored = tracestate_components[5] == "1";
uint32_t sampling_exponent;
uint32_t path_info;
if (absl::SimpleAtoi(tracestate_components[6], &sampling_exponent) &&
absl::SimpleHexAtoi(tracestate_components[7], &path_info)) {
return {true, ignored, sampling_exponent, path_info};
}
return createInvalid();
}

std::string asString() const {
std::string ret = absl::StrCat("fw4;0;0;0;0;", ignored_ ? "1" : "0", ";", sampling_exponent_,
";", absl::Hex(path_info_));
return ret;
}

bool isValid() const { return valid_; };
bool isIgnored() const { return ignored_; };
int getSamplingExponent() const { return sampling_exponent_; };
uint32_t getPathInfo() const { return path_info_; };

private:
DynatraceTag(bool valid, bool ignored, uint32_t sampling_exponent, uint32_t path_info)
: valid_(valid), ignored_(ignored), sampling_exponent_(sampling_exponent),
path_info_(path_info) {}

bool valid_;
bool ignored_;
uint32_t sampling_exponent_;
uint32_t path_info_;
};

} // namespace

DynatraceSampler::DynatraceSampler(
const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig& config,
Server::Configuration::TracerFactoryContext& context)
: tenant_id_(config.tenant_id()), cluster_id_(config.cluster_id()),
dt_tracestate_key_(absl::StrCat(absl::string_view(config.tenant_id()), "-",
Server::Configuration::TracerFactoryContext& context,
SamplerConfigFetcherPtr sampler_config_fetcher)
: dt_tracestate_key_(absl::StrCat(calculateTenantId(config.tenant()), "-",
absl::string_view(config.cluster_id()), "@dt")),
sampler_config_fetcher_(context, config.http_uri(), config.token()), counter_(0) {}
sampling_controller_(std::move(sampler_config_fetcher)) {

timer_ = context.serverFactoryContext().mainThreadDispatcher().createTimer([this]() -> void {
sampling_controller_.update();
timer_->enableTimer(SAMPLING_UPDATE_TIMER_DURATION);
});
timer_->enableTimer(SAMPLING_UPDATE_TIMER_DURATION);
}

SamplingResult DynatraceSampler::shouldSample(const absl::optional<SpanContext> parent_context,
const std::string& /*trace_id*/,
const std::string& trace_id,
const std::string& /*name*/, OTelSpanKind /*kind*/,
OptRef<const Tracing::TraceContext> /*trace_context*/,
OptRef<const Tracing::TraceContext> trace_context,
const std::vector<SpanContext>& /*links*/) {

SamplingResult result;
std::map<std::string, std::string> att;

// trace_context->path() returns path and query. query part is removed in getSamplingKey()
const std::string sampling_key =
trace_context.has_value()
? sampling_controller_.getSamplingKey(trace_context->path(), trace_context->method())
: "";

// add it to stream summary containing the number of requests
sampling_controller_.offer(sampling_key);

auto trace_state =
TraceState::fromHeader(parent_context.has_value() ? parent_context->tracestate() : "");

std::string trace_state_value;

if (trace_state->get(dt_tracestate_key_, trace_state_value)) {
// we found a DT trace decision in tracestate header
if (FW4Tag fw4_tag = FW4Tag::create(trace_state_value); fw4_tag.isValid()) {
result.decision = fw4_tag.isIgnored() ? Decision::Drop : Decision::RecordAndSample;
if (DynatraceTag dynatrace_tag = DynatraceTag::create(trace_state_value);
dynatrace_tag.isValid()) {
result.decision = dynatrace_tag.isIgnored() ? Decision::Drop : Decision::RecordAndSample;
// TODO: change attribute name and value in scope of OA-26680
att[SAMPLING_EXTRAPOLATION_SPAN_ATTRIBUTE_NAME] =
std::to_string(fw4_tag.getSamplingExponent());
std::to_string(dynatrace_tag.getSamplingExponent());
result.tracestate = parent_context->tracestate();
}
} else { // make a sampling decision
// this is just a demo, we sample every second request here
uint32_t current_counter = ++counter_;
bool sample;
int sampling_exponent;
if (current_counter % 2) {
sample = true;
sampling_exponent = 1;
} else {
sample = false;
sampling_exponent = 0;
}

} else {
// do a decision based on the calculated exponent
// at the moment we use a hash of the trace_id as random number
const auto hash = MurmurHash::murmurHash2(trace_id);
const auto sampling_state = sampling_controller_.getSamplingState(sampling_key);
const bool sample = sampling_state.shouldSample(hash);
const auto sampling_exponent = sampling_state.getExponent();
// TODO: change attribute name and value in scope of OA-26680
att[SAMPLING_EXTRAPOLATION_SPAN_ATTRIBUTE_NAME] = std::to_string(sampling_exponent);

result.decision = sample ? Decision::RecordAndSample : Decision::Drop;
// create new forward tag and add it to tracestate
FW4Tag new_tag = FW4Tag::create(!sample, sampling_exponent);
DynatraceTag new_tag =
DynatraceTag::create(!sample, sampling_exponent, static_cast<uint8_t>(hash));
trace_state = trace_state->set(dt_tracestate_key_, new_tag.asString());
result.tracestate = trace_state->toHeader();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,66 +1,33 @@
#pragma once

#include <memory>

#include "envoy/extensions/tracers/opentelemetry/samplers/v3/dynatrace_sampler.pb.h"
#include "envoy/server/factory_context.h"

#include "source/common/common/logger.h"
#include "source/common/config/datasource.h"
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.h"
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampling_controller.h"
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/stream_summary.h"
#include "source/extensions/tracers/opentelemetry/samplers/sampler.h"

#include "absl/synchronization/mutex.h"

namespace Envoy {
namespace Extensions {
namespace Tracers {
namespace OpenTelemetry {

class FW4Tag {
public:
static FW4Tag createInvalid() { return {false, false, 0}; }

static FW4Tag create(bool ignored, int sampling_exponent) {
return {true, ignored, sampling_exponent};
}

static FW4Tag create(const std::string& value) {
std::vector<absl::string_view> tracestate_components =
absl::StrSplit(value, ';', absl::AllowEmpty());
if (tracestate_components.size() < 7) {
return createInvalid();
}

if (tracestate_components[0] != "fw4") {
return createInvalid();
}
bool ignored = tracestate_components[5] == "1";
int sampling_exponent = std::stoi(std::string(tracestate_components[6]));
return {true, ignored, sampling_exponent};
}

std::string asString() const {
return absl::StrCat("fw4;0;0;0;0;", ignored_ ? "1" : "0", ";", sampling_exponent_, ";0");
}

bool isValid() const { return valid_; };
bool isIgnored() const { return ignored_; };
int getSamplingExponent() const { return sampling_exponent_; };

private:
FW4Tag(bool valid, bool ignored, int sampling_exponent)
: valid_(valid), ignored_(ignored), sampling_exponent_(sampling_exponent) {}

bool valid_;
bool ignored_;
int sampling_exponent_;
};

/**
* @brief A Dynatrace specific sampler *
* @brief A Dynatrace specific sampler
*/
class DynatraceSampler : public Sampler, Logger::Loggable<Logger::Id::tracing> {
public:
DynatraceSampler(
const envoy::extensions::tracers::opentelemetry::samplers::v3::DynatraceSamplerConfig& config,
Server::Configuration::TracerFactoryContext& context);
Server::Configuration::TracerFactoryContext& context,
SamplerConfigFetcherPtr sampler_config_fetcher);

SamplingResult shouldSample(const absl::optional<SpanContext> parent_context,
const std::string& trace_id, const std::string& name,
Expand All @@ -71,11 +38,9 @@ class DynatraceSampler : public Sampler, Logger::Loggable<Logger::Id::tracing> {
std::string getDescription() const override;

private:
std::string tenant_id_;
std::string cluster_id_;
std::string dt_tracestate_key_;
SamplerConfigFetcher sampler_config_fetcher_;
std::atomic<uint32_t> counter_; // request counter for dummy sampling
Event::TimerPtr timer_;
SamplingController sampling_controller_;
};

} // namespace OpenTelemetry
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#include <utility>

#include "source/common/json/json_loader.h"
#include "source/extensions/tracers/opentelemetry/samplers/dynatrace/sampler_config_fetcher.h"

namespace Envoy {
namespace Extensions {
namespace Tracers {
namespace OpenTelemetry {

void SamplerConfig::parse(const std::string& json) {
const auto result = Envoy::Json::Factory::loadFromStringNoThrow(json);
if (result.ok()) {
const auto& obj = result.value();
if (obj->hasObject("rootSpansPerMinute")) {
const auto value = obj->getInteger("rootSpansPerMinute", ROOT_SPANS_PER_MINUTE_DEFAULT);
root_spans_per_minute_.store(value);
return;
}
}
// didn't get a value, reset to default
root_spans_per_minute_.store(ROOT_SPANS_PER_MINUTE_DEFAULT);
}

} // namespace OpenTelemetry
} // namespace Tracers
} // namespace Extensions
} // namespace Envoy
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
#include <atomic>
#include <cstdint>
#include <string>
#include <utility>

#include "source/common/json/json_loader.h"

namespace Envoy {
namespace Extensions {
Expand All @@ -14,25 +11,14 @@ namespace OpenTelemetry {

class SamplerConfig {
public:
static constexpr uint64_t ROOT_SPANS_PER_MINUTE_DEFAULT = 1000;
static constexpr uint32_t ROOT_SPANS_PER_MINUTE_DEFAULT = 1000;

void parse(const std::string& json) {
root_spans_per_minute_.store(ROOT_SPANS_PER_MINUTE_DEFAULT); // reset to default
auto result = Envoy::Json::Factory::loadFromStringNoThrow(json);
if (result.ok()) {
auto obj = result.value();
if (obj->hasObject("rootSpansPerMinute")) {
auto value = obj->getInteger("rootSpansPerMinute", ROOT_SPANS_PER_MINUTE_DEFAULT);
root_spans_per_minute_.store(value);
}
(void)obj;
}
}
void parse(const std::string& json);

uint64_t getRootSpansPerMinute() const { return root_spans_per_minute_.load(); }
uint32_t getRootSpansPerMinute() const { return root_spans_per_minute_.load(); }

private:
std::atomic<uint64_t> root_spans_per_minute_ = ROOT_SPANS_PER_MINUTE_DEFAULT;
std::atomic<uint32_t> root_spans_per_minute_{ROOT_SPANS_PER_MINUTE_DEFAULT};
};

} // namespace OpenTelemetry
Expand Down
Loading

0 comments on commit 7eec5d2

Please sign in to comment.