Skip to content

Commit

Permalink
impl: reservoir sample for span events (#14407)
Browse files Browse the repository at this point in the history
We can only upload 32 events per span to Cloud Trace. Some spans may
have a lot more. Use a reservoir sampler to randomly select which events
are sent.  The first and last event are always preserved because they
usually contain something more important than any middle events.
  • Loading branch information
coryan authored Jul 1, 2024
1 parent 2983243 commit 4ce1922
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 29 deletions.
16 changes: 14 additions & 2 deletions google/cloud/opentelemetry/internal/recordable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "google/cloud/internal/time_utils.h"
#include "absl/time/time.h"
#include <grpcpp/grpcpp.h>
#include <iterator>

namespace google {
namespace cloud {
Expand Down Expand Up @@ -341,12 +342,23 @@ void Recordable::AddEventImpl(
opentelemetry::nostd::string_view name,
opentelemetry::common::SystemTimestamp timestamp,
opentelemetry::common::KeyValueIterable const& attributes) {
// Accept the first N events. Drop the rest.
++timed_event_count_;
auto& events = *span_.mutable_time_events();
if (events.time_event().size() == kSpanAnnotationLimit) {
events.set_dropped_annotations_count(1 +
events.dropped_annotations_count());
return;
auto const k = generator_(timed_event_count_);
auto& collection = *events.mutable_time_event();
// Always preserve the first and last events. The rest are randomly sampled
// using https://en.wikipedia.org/wiki/Reservoir_sampling
if (k + 1 < collection.size()) {
// This is the normal reservoir sampling case. We swap the last element
// and the element to be removed.
collection.SwapElements(k + 1, collection.size() - 1);
}
// Just remove the last element, so we can insert the newest element and
// preserve the last element ever received.
collection.RemoveLast();
}

auto& event = *events.add_time_event();
Expand Down
9 changes: 8 additions & 1 deletion google/cloud/opentelemetry/internal/recordable.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <opentelemetry/common/attribute_value.h>
#include <opentelemetry/sdk/trace/recordable.h>
#include <opentelemetry/version.h>
#include <functional>
#include <utility>

namespace google {
namespace cloud {
Expand Down Expand Up @@ -98,7 +100,10 @@ void AddAttribute(
*/
class Recordable final : public opentelemetry::sdk::trace::Recordable {
public:
explicit Recordable(Project project) : project_(std::move(project)) {}
using Generator = std::function<int(int)>;

explicit Recordable(Project project, Generator generator)
: project_(std::move(project)), generator_(std::move(generator)) {}

bool valid() const { return valid_; }

Expand Down Expand Up @@ -165,6 +170,8 @@ class Recordable final : public opentelemetry::sdk::trace::Recordable {

std::string scope_name_;
std::string scope_version_;
Generator generator_;
int timed_event_count_ = 0;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
60 changes: 37 additions & 23 deletions google/cloud/opentelemetry/internal/recordable_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/opentelemetry/internal/recordable.h"
#include "google/cloud/internal/random.h"
#include "google/cloud/internal/time_utils.h"
#include "google/cloud/version.h"
#include "absl/time/clock.h"
Expand Down Expand Up @@ -181,6 +182,14 @@ Matcher<v2::Span::Link const&> Link(
attributes_matcher));
}

auto TestGenerator() {
// Initialize using the googletest seed.
auto generator = internal::DefaultPRNG(::testing::FLAGS_gtest_random_seed);
return [g = std::move(generator)](int size) mutable {
return std::uniform_int_distribution<int>(0, size - 1)(g);
};
}

TEST(SetTruncatableString, LessThanLimit) {
v2::TruncatableString proto;
SetTruncatableString(proto, "value", 1000);
Expand Down Expand Up @@ -365,7 +374,7 @@ TEST(Recordable, SetResourceCopiesResourceAttributes) {
{"vector<uint8>", MakeCompositeAttribute<std::uint8_t>(5)},
});

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetResource(resource);
auto proto = std::move(rec).as_proto();
EXPECT_THAT(
Expand Down Expand Up @@ -414,7 +423,7 @@ TEST(Recordable, AddEvent) {
auto expected_time = absl::FromChrono(now);
auto event_attributes = KVIterable({{"key1", "value1"}, {"key2", "value2"}});

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.AddEvent("test-event", now, event_attributes);
auto proto = std::move(rec).as_proto();
EXPECT_EQ(proto.time_events().dropped_annotations_count(), 0);
Expand All @@ -432,7 +441,7 @@ TEST(Recordable, TruncatesEventName) {
std::string const expected(kAnnotationDescriptionStringLimit, 'A');
auto now = std::chrono::system_clock::now();

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.AddEvent(name, now, KVIterable({}));
auto proto = std::move(rec).as_proto();
EXPECT_THAT(proto.time_events().time_event(),
Expand All @@ -442,13 +451,18 @@ TEST(Recordable, TruncatesEventName) {
TEST(Recordable, DropsNewEventAtLimit) {
auto now = std::chrono::system_clock::now();

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
for (std::size_t i = 0; i != kSpanAnnotationLimit + 10; ++i) {
rec.AddEvent("event" + std::to_string(i), now, KVIterable({}));
}
auto proto = std::move(rec).as_proto();
EXPECT_EQ(proto.time_events().dropped_annotations_count(), 10);
EXPECT_THAT(proto.time_events().time_event(), SizeIs(kSpanAnnotationLimit));
auto event_matcher = [](int i) {
return TimeEvent(_, Annotation("event" + std::to_string(i), _));
};
EXPECT_THAT(proto.time_events().time_event(),
AllOf(SizeIs(kSpanAnnotationLimit), Contains(event_matcher(0)),
Contains(event_matcher(kSpanAnnotationLimit + 9))));
}

TEST(Recordable, DropsNewEventAttributeAtLimit) {
Expand All @@ -461,7 +475,7 @@ TEST(Recordable, DropsNewEventAttributeAtLimit) {
auto event_attributes =
KVIterable(std::move(too_many_attributes), &iteration_count);

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.AddEvent("test-event", now, event_attributes);
auto proto = std::move(rec).as_proto();
EXPECT_THAT(proto.time_events().time_event(),
Expand All @@ -486,7 +500,7 @@ TEST(Recordable, AddLink) {

auto link_attributes = KVIterable({{"key1", "value1"}, {"key2", "value2"}});

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.AddLink(span_context, link_attributes);
auto proto = std::move(rec).as_proto();
EXPECT_EQ(proto.links().dropped_links_count(), 0);
Expand All @@ -499,7 +513,7 @@ TEST(Recordable, AddLink) {
}

TEST(Recordable, DropsNewLinkAtLimit) {
auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
for (std::size_t i = 0; i != kSpanLinkLimit + 1; ++i) {
rec.AddLink({false, false}, KVIterable({}));
}
Expand All @@ -517,7 +531,7 @@ TEST(Recordable, DropsNewLinkAttributeAtLimit) {
auto link_attributes =
KVIterable(std::move(too_many_attributes), &iteration_count);

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.AddLink({false, false}, link_attributes);
auto proto = std::move(rec).as_proto();
EXPECT_THAT(proto.links().link(),
Expand All @@ -543,7 +557,7 @@ TEST(Recordable, SetStatus) {
grpc::StatusCode::OK, ""},
{opentelemetry::trace::StatusCode::kError, "fail",
grpc::StatusCode::UNKNOWN, "fail"}}) {
auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetStatus(test.code, test.desc);
auto proto = std::move(rec).as_proto();
EXPECT_EQ(proto.status().code(), test.expected_code);
Expand All @@ -552,7 +566,7 @@ TEST(Recordable, SetStatus) {
}

TEST(Recordable, SetName) {
auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetName("name");
auto proto = std::move(rec).as_proto();
EXPECT_EQ(proto.display_name().value(), "name");
Expand All @@ -562,7 +576,7 @@ TEST(Recordable, SetNameTruncates) {
std::string const name(kDisplayNameStringLimit + 1, 'A');
std::string const expected(kDisplayNameStringLimit, 'A');

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetName(name);
auto proto = std::move(rec).as_proto();
EXPECT_EQ(proto.display_name().value(), expected);
Expand All @@ -586,7 +600,7 @@ TEST(Recordable, SetSpanKind) {
google::devtools::cloudtrace::v2::Span::PRODUCER},
{opentelemetry::trace::SpanKind::kConsumer,
google::devtools::cloudtrace::v2::Span::CONSUMER}}) {
auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetSpanKind(test.input);
auto proto = std::move(rec).as_proto();
EXPECT_EQ(proto.span_kind(), test.expected);
Expand All @@ -608,7 +622,7 @@ TEST(Recordable, SetIdentity) {

opentelemetry::trace::SpanContext span_context(trace_id, span_id, {}, false);

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetIdentity(span_context, parent_span_id);
auto proto = std::move(rec).as_proto();

Expand All @@ -633,23 +647,23 @@ TEST(Recordable, InvalidParentSpanIsOmitted) {
opentelemetry::trace::SpanId const invalid_parent_span_id;
EXPECT_FALSE(invalid_parent_span_id.IsValid());

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetIdentity(span_context, invalid_parent_span_id);
auto proto = std::move(rec).as_proto();

EXPECT_THAT(proto.parent_span_id(), IsEmpty());
}

TEST(Recordable, SetAttribute) {
auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetAttribute("key", "value");
auto proto = std::move(rec).as_proto();
EXPECT_THAT(proto.attributes(),
Attributes(ElementsAre(Pair("key", AttributeValue("value")))));
}

TEST(Recordable, SetAttributeRespectsLimit) {
auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
for (std::size_t i = 0; i != kSpanAttributeLimit + 1; ++i) {
rec.SetAttribute("key" + std::to_string(i), "value");
}
Expand All @@ -666,7 +680,7 @@ TEST(Recordable, SetResourceMapsMonitoredResources) {
{sc::kCloudAvailabilityZone, "us-central1-a"},
});

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetResource(resource);
auto proto = std::move(rec).as_proto();
EXPECT_THAT(
Expand All @@ -682,7 +696,7 @@ TEST(Recordable, SetStartTime) {
auto start = std::chrono::system_clock::now();
auto expected = absl::FromChrono(start);

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetStartTime(start);
auto proto = std::move(rec).as_proto();
auto actual = internal::ToAbslTime(proto.start_time());
Expand All @@ -694,7 +708,7 @@ TEST(Recordable, SetDuration) {
auto duration = std::chrono::nanoseconds(12345);
auto expected = absl::FromChrono(start) + absl::FromChrono(duration);

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetStartTime(start);
rec.SetDuration(duration);
auto proto = std::move(rec).as_proto();
Expand All @@ -707,7 +721,7 @@ TEST(Recordable, SetInstrumentationScope) {
opentelemetry::sdk::instrumentationscope::InstrumentationScope::Create(
"test-name", "test-version");

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetInstrumentationScope(*scope);
auto proto = std::move(rec).as_proto();
EXPECT_THAT(proto.attributes(),
Expand All @@ -721,7 +735,7 @@ TEST(Recordable, SetInstrumentationScopeOmitsEmptyVersion) {
opentelemetry::sdk::instrumentationscope::InstrumentationScope::Create(
"test-name");

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetInstrumentationScope(*scope);
auto proto = std::move(rec).as_proto();
EXPECT_THAT(proto.attributes(),
Expand All @@ -739,7 +753,7 @@ TEST(Recordable, ConstantAttributesOnlyOnRootSpan) {
auto resource =
opentelemetry::sdk::resource::Resource::Create({{"key", "value"}});

auto rec = Recordable(Project(kProjectId));
auto rec = Recordable(Project(kProjectId), TestGenerator());
rec.SetInstrumentationScope(*scope);
rec.SetIdentity(opentelemetry::trace::SpanContext::GetInvalid(), parent);
rec.SetResource(resource);
Expand Down
30 changes: 27 additions & 3 deletions google/cloud/opentelemetry/trace_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,48 @@
#include "google/cloud/opentelemetry/trace_exporter.h"
#include "google/cloud/trace/v2/trace_client.h"
#include "google/cloud/internal/noexcept_action.h"
#include "google/cloud/internal/random.h"
#include "google/cloud/log.h"
#include <memory>
#include <mutex>
#include <random>

namespace google {
namespace cloud {
namespace otel {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {

class ThreadSafeGenerator {
public:
ThreadSafeGenerator() : generator_(internal::MakeDefaultPRNG()) {}

auto generate(int size) {
std::lock_guard<std::mutex> lk(mu_);
return std::uniform_int_distribution<int>(0, size - 1)(generator_);
}

private:
std::mutex mu_;
internal::DefaultPRNG generator_;
};

class TraceExporter final : public opentelemetry::sdk::trace::SpanExporter {
public:
explicit TraceExporter(Project project,
std::shared_ptr<trace_v2::TraceServiceConnection> conn)
: project_(std::move(project)), client_(std::move(conn)) {}
: project_(std::move(project)),
client_(std::move(conn)),
generator_([state = std::make_shared<ThreadSafeGenerator>()](int size) {
return state->generate(size);
}) {}

std::unique_ptr<opentelemetry::sdk::trace::Recordable>
MakeRecordable() noexcept override {
auto recordable = internal::NoExceptAction<
std::unique_ptr<opentelemetry::sdk::trace::Recordable>>(
[&] { return std::make_unique<otel_internal::Recordable>(project_); });
std::unique_ptr<opentelemetry::sdk::trace::Recordable>>([&] {
return std::make_unique<otel_internal::Recordable>(project_, generator_);
});
if (recordable) return *std::move(recordable);
GCP_LOG(WARNING) << "Exception thrown while creating span.";
return nullptr;
Expand Down Expand Up @@ -76,6 +99,7 @@ class TraceExporter final : public opentelemetry::sdk::trace::SpanExporter {

Project project_;
trace_v2::TraceServiceClient client_;
otel_internal::Recordable::Generator generator_;
};

} // namespace
Expand Down

0 comments on commit 4ce1922

Please sign in to comment.