diff --git a/google/cloud/opentelemetry/internal/recordable.cc b/google/cloud/opentelemetry/internal/recordable.cc index b86d0bce0264d..ca2623dc03c70 100644 --- a/google/cloud/opentelemetry/internal/recordable.cc +++ b/google/cloud/opentelemetry/internal/recordable.cc @@ -20,6 +20,7 @@ #include "google/cloud/internal/time_utils.h" #include "absl/time/time.h" #include +#include namespace google { namespace cloud { @@ -341,12 +342,23 @@ void Recordable::AddEventImpl( opentelemetry::nostd::string_view name, opentelemetry::common::SystemTimestamp timestamp, opentelemetry::common::KeyValueIterable const& attributes) { - // Accept the first N events. Drop the rest. + ++timed_event_count_; auto& events = *span_.mutable_time_events(); if (events.time_event().size() == kSpanAnnotationLimit) { events.set_dropped_annotations_count(1 + events.dropped_annotations_count()); - return; + auto const k = generator_(timed_event_count_); + auto& collection = *events.mutable_time_event(); + // Always preserve the first and last events. The rest are randomly sampled + // using https://en.wikipedia.org/wiki/Reservoir_sampling + if (k + 1 < collection.size()) { + // This is the normal reservoir sampling case. We swap the last element + // and the element to be removed. + collection.SwapElements(k + 1, collection.size() - 1); + } + // Just remove the last element, so we can insert the newest element and + // preserve the last element ever received. + collection.RemoveLast(); } auto& event = *events.add_time_event(); diff --git a/google/cloud/opentelemetry/internal/recordable.h b/google/cloud/opentelemetry/internal/recordable.h index 1429e35a9358a..578a9d6fc9729 100644 --- a/google/cloud/opentelemetry/internal/recordable.h +++ b/google/cloud/opentelemetry/internal/recordable.h @@ -22,6 +22,8 @@ #include #include #include +#include +#include namespace google { namespace cloud { @@ -98,7 +100,10 @@ void AddAttribute( */ class Recordable final : public opentelemetry::sdk::trace::Recordable { public: - explicit Recordable(Project project) : project_(std::move(project)) {} + using Generator = std::function; + + explicit Recordable(Project project, Generator generator) + : project_(std::move(project)), generator_(std::move(generator)) {} bool valid() const { return valid_; } @@ -165,6 +170,8 @@ class Recordable final : public opentelemetry::sdk::trace::Recordable { std::string scope_name_; std::string scope_version_; + Generator generator_; + int timed_event_count_ = 0; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/opentelemetry/internal/recordable_test.cc b/google/cloud/opentelemetry/internal/recordable_test.cc index a7d4216f592ec..28ad275423af4 100644 --- a/google/cloud/opentelemetry/internal/recordable_test.cc +++ b/google/cloud/opentelemetry/internal/recordable_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/opentelemetry/internal/recordable.h" +#include "google/cloud/internal/random.h" #include "google/cloud/internal/time_utils.h" #include "google/cloud/version.h" #include "absl/time/clock.h" @@ -181,6 +182,14 @@ Matcher Link( attributes_matcher)); } +auto TestGenerator() { + // Initialize using the googletest seed. + auto generator = internal::DefaultPRNG(::testing::FLAGS_gtest_random_seed); + return [g = std::move(generator)](int size) mutable { + return std::uniform_int_distribution(0, size - 1)(g); + }; +} + TEST(SetTruncatableString, LessThanLimit) { v2::TruncatableString proto; SetTruncatableString(proto, "value", 1000); @@ -365,7 +374,7 @@ TEST(Recordable, SetResourceCopiesResourceAttributes) { {"vector", MakeCompositeAttribute(5)}, }); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetResource(resource); auto proto = std::move(rec).as_proto(); EXPECT_THAT( @@ -414,7 +423,7 @@ TEST(Recordable, AddEvent) { auto expected_time = absl::FromChrono(now); auto event_attributes = KVIterable({{"key1", "value1"}, {"key2", "value2"}}); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.AddEvent("test-event", now, event_attributes); auto proto = std::move(rec).as_proto(); EXPECT_EQ(proto.time_events().dropped_annotations_count(), 0); @@ -432,7 +441,7 @@ TEST(Recordable, TruncatesEventName) { std::string const expected(kAnnotationDescriptionStringLimit, 'A'); auto now = std::chrono::system_clock::now(); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.AddEvent(name, now, KVIterable({})); auto proto = std::move(rec).as_proto(); EXPECT_THAT(proto.time_events().time_event(), @@ -442,13 +451,18 @@ TEST(Recordable, TruncatesEventName) { TEST(Recordable, DropsNewEventAtLimit) { auto now = std::chrono::system_clock::now(); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); for (std::size_t i = 0; i != kSpanAnnotationLimit + 10; ++i) { rec.AddEvent("event" + std::to_string(i), now, KVIterable({})); } auto proto = std::move(rec).as_proto(); EXPECT_EQ(proto.time_events().dropped_annotations_count(), 10); - EXPECT_THAT(proto.time_events().time_event(), SizeIs(kSpanAnnotationLimit)); + auto event_matcher = [](int i) { + return TimeEvent(_, Annotation("event" + std::to_string(i), _)); + }; + EXPECT_THAT(proto.time_events().time_event(), + AllOf(SizeIs(kSpanAnnotationLimit), Contains(event_matcher(0)), + Contains(event_matcher(kSpanAnnotationLimit + 9)))); } TEST(Recordable, DropsNewEventAttributeAtLimit) { @@ -461,7 +475,7 @@ TEST(Recordable, DropsNewEventAttributeAtLimit) { auto event_attributes = KVIterable(std::move(too_many_attributes), &iteration_count); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.AddEvent("test-event", now, event_attributes); auto proto = std::move(rec).as_proto(); EXPECT_THAT(proto.time_events().time_event(), @@ -486,7 +500,7 @@ TEST(Recordable, AddLink) { auto link_attributes = KVIterable({{"key1", "value1"}, {"key2", "value2"}}); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.AddLink(span_context, link_attributes); auto proto = std::move(rec).as_proto(); EXPECT_EQ(proto.links().dropped_links_count(), 0); @@ -499,7 +513,7 @@ TEST(Recordable, AddLink) { } TEST(Recordable, DropsNewLinkAtLimit) { - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); for (std::size_t i = 0; i != kSpanLinkLimit + 1; ++i) { rec.AddLink({false, false}, KVIterable({})); } @@ -517,7 +531,7 @@ TEST(Recordable, DropsNewLinkAttributeAtLimit) { auto link_attributes = KVIterable(std::move(too_many_attributes), &iteration_count); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.AddLink({false, false}, link_attributes); auto proto = std::move(rec).as_proto(); EXPECT_THAT(proto.links().link(), @@ -543,7 +557,7 @@ TEST(Recordable, SetStatus) { grpc::StatusCode::OK, ""}, {opentelemetry::trace::StatusCode::kError, "fail", grpc::StatusCode::UNKNOWN, "fail"}}) { - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetStatus(test.code, test.desc); auto proto = std::move(rec).as_proto(); EXPECT_EQ(proto.status().code(), test.expected_code); @@ -552,7 +566,7 @@ TEST(Recordable, SetStatus) { } TEST(Recordable, SetName) { - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetName("name"); auto proto = std::move(rec).as_proto(); EXPECT_EQ(proto.display_name().value(), "name"); @@ -562,7 +576,7 @@ TEST(Recordable, SetNameTruncates) { std::string const name(kDisplayNameStringLimit + 1, 'A'); std::string const expected(kDisplayNameStringLimit, 'A'); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetName(name); auto proto = std::move(rec).as_proto(); EXPECT_EQ(proto.display_name().value(), expected); @@ -586,7 +600,7 @@ TEST(Recordable, SetSpanKind) { google::devtools::cloudtrace::v2::Span::PRODUCER}, {opentelemetry::trace::SpanKind::kConsumer, google::devtools::cloudtrace::v2::Span::CONSUMER}}) { - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetSpanKind(test.input); auto proto = std::move(rec).as_proto(); EXPECT_EQ(proto.span_kind(), test.expected); @@ -608,7 +622,7 @@ TEST(Recordable, SetIdentity) { opentelemetry::trace::SpanContext span_context(trace_id, span_id, {}, false); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetIdentity(span_context, parent_span_id); auto proto = std::move(rec).as_proto(); @@ -633,7 +647,7 @@ TEST(Recordable, InvalidParentSpanIsOmitted) { opentelemetry::trace::SpanId const invalid_parent_span_id; EXPECT_FALSE(invalid_parent_span_id.IsValid()); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetIdentity(span_context, invalid_parent_span_id); auto proto = std::move(rec).as_proto(); @@ -641,7 +655,7 @@ TEST(Recordable, InvalidParentSpanIsOmitted) { } TEST(Recordable, SetAttribute) { - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetAttribute("key", "value"); auto proto = std::move(rec).as_proto(); EXPECT_THAT(proto.attributes(), @@ -649,7 +663,7 @@ TEST(Recordable, SetAttribute) { } TEST(Recordable, SetAttributeRespectsLimit) { - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); for (std::size_t i = 0; i != kSpanAttributeLimit + 1; ++i) { rec.SetAttribute("key" + std::to_string(i), "value"); } @@ -666,7 +680,7 @@ TEST(Recordable, SetResourceMapsMonitoredResources) { {sc::kCloudAvailabilityZone, "us-central1-a"}, }); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetResource(resource); auto proto = std::move(rec).as_proto(); EXPECT_THAT( @@ -682,7 +696,7 @@ TEST(Recordable, SetStartTime) { auto start = std::chrono::system_clock::now(); auto expected = absl::FromChrono(start); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetStartTime(start); auto proto = std::move(rec).as_proto(); auto actual = internal::ToAbslTime(proto.start_time()); @@ -694,7 +708,7 @@ TEST(Recordable, SetDuration) { auto duration = std::chrono::nanoseconds(12345); auto expected = absl::FromChrono(start) + absl::FromChrono(duration); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetStartTime(start); rec.SetDuration(duration); auto proto = std::move(rec).as_proto(); @@ -707,7 +721,7 @@ TEST(Recordable, SetInstrumentationScope) { opentelemetry::sdk::instrumentationscope::InstrumentationScope::Create( "test-name", "test-version"); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetInstrumentationScope(*scope); auto proto = std::move(rec).as_proto(); EXPECT_THAT(proto.attributes(), @@ -721,7 +735,7 @@ TEST(Recordable, SetInstrumentationScopeOmitsEmptyVersion) { opentelemetry::sdk::instrumentationscope::InstrumentationScope::Create( "test-name"); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetInstrumentationScope(*scope); auto proto = std::move(rec).as_proto(); EXPECT_THAT(proto.attributes(), @@ -739,7 +753,7 @@ TEST(Recordable, ConstantAttributesOnlyOnRootSpan) { auto resource = opentelemetry::sdk::resource::Resource::Create({{"key", "value"}}); - auto rec = Recordable(Project(kProjectId)); + auto rec = Recordable(Project(kProjectId), TestGenerator()); rec.SetInstrumentationScope(*scope); rec.SetIdentity(opentelemetry::trace::SpanContext::GetInvalid(), parent); rec.SetResource(resource); diff --git a/google/cloud/opentelemetry/trace_exporter.cc b/google/cloud/opentelemetry/trace_exporter.cc index 7cbac45d55934..e18e6c166b97f 100644 --- a/google/cloud/opentelemetry/trace_exporter.cc +++ b/google/cloud/opentelemetry/trace_exporter.cc @@ -15,7 +15,11 @@ #include "google/cloud/opentelemetry/trace_exporter.h" #include "google/cloud/trace/v2/trace_client.h" #include "google/cloud/internal/noexcept_action.h" +#include "google/cloud/internal/random.h" #include "google/cloud/log.h" +#include +#include +#include namespace google { namespace cloud { @@ -23,17 +27,36 @@ namespace otel { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { +class ThreadSafeGenerator { + public: + ThreadSafeGenerator() : generator_(internal::MakeDefaultPRNG()) {} + + auto generate(int size) { + std::lock_guard lk(mu_); + return std::uniform_int_distribution(0, size - 1)(generator_); + } + + private: + std::mutex mu_; + internal::DefaultPRNG generator_; +}; + class TraceExporter final : public opentelemetry::sdk::trace::SpanExporter { public: explicit TraceExporter(Project project, std::shared_ptr conn) - : project_(std::move(project)), client_(std::move(conn)) {} + : project_(std::move(project)), + client_(std::move(conn)), + generator_([state = std::make_shared()](int size) { + return state->generate(size); + }) {} std::unique_ptr MakeRecordable() noexcept override { auto recordable = internal::NoExceptAction< - std::unique_ptr>( - [&] { return std::make_unique(project_); }); + std::unique_ptr>([&] { + return std::make_unique(project_, generator_); + }); if (recordable) return *std::move(recordable); GCP_LOG(WARNING) << "Exception thrown while creating span."; return nullptr; @@ -76,6 +99,7 @@ class TraceExporter final : public opentelemetry::sdk::trace::SpanExporter { Project project_; trace_v2::TraceServiceClient client_; + otel_internal::Recordable::Generator generator_; }; } // namespace