Skip to content

Commit

Permalink
chore(observability): (new style sinks) Emit EventsDropped and `Err…
Browse files Browse the repository at this point in the history
…or` internal events in the service driver (#14836)

* 'bit of a poc'

* completed: loki,vector,splunk_hec,new_relic,s3_common(dd archives, aws_s3)

* gcs_common

* datadog_logs

* datadog_metrics

* datadog_traces

* datadog_events

* aws_sqs

* amqp

* azure_common (datadog archives, azure blob)

* aws_cloudwatch_logs

* aws_kinesis_stream

* aws_kinesis_firehose + compilation errors

* cleanup

* cleanup

* don't emit Error events in sink retry logic anymore.

* Fixed issue with the new Error events not being detected by the test
framework

* fixed issue with deprecated batch sinks not emitting internal events

* clippy

* fix issue where BatchHttpService call within Service call was not emitting internal events

* feedback sg

* merging event files

* event file org for aws_kinesis PR

* Revert "event file org for aws_kinesis PR"

This reverts commit f9dac54.

* Revert "merging event files"

This reverts commit 6f06e53.

* feedback sw - from_batch

* feedback sw (usize/u64) + nasty merge issue with ComponentEventsDropped

* feedback sw- pub(crate)

* feedback sw

* feedback tz- rate limit

* feedback tz - double add

* feedback tz- optimize vector sink request metadata calc

* optimization to vector sink was incomplete

* feedback bg - use match

* feedback bg- impl Add

* feedback bg- event collection struct + MetaDescriptive return by value

* feedback bg- save

* feedback bg- refactor RequestBuilder

Co-authored-by: Jesse Szwedko <[email protected]>
  • Loading branch information
neuronull and jszwedko authored Oct 31, 2022
1 parent 30706de commit 9348d36
Show file tree
Hide file tree
Showing 103 changed files with 1,489 additions and 762 deletions.
74 changes: 74 additions & 0 deletions lib/vector-common/src/internal_event/component_events_dropped.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use super::{Count, InternalEvent, InternalEventHandle, RegisterInternalEvent};
use metrics::{register_counter, Counter};

pub const INTENTIONAL: bool = true;
pub const UNINTENTIONAL: bool = false;

#[derive(Debug)]
pub struct ComponentEventsDropped<'a, const INTENTIONAL: bool> {
pub count: usize,
pub reason: &'a str,
}

impl<'a, const INTENTIONAL: bool> InternalEvent for ComponentEventsDropped<'a, INTENTIONAL> {
fn emit(self) {
let count = self.count;
self.register().emit(Count(count));
}

fn name(&self) -> Option<&'static str> {
Some("ComponentEventsDropped")
}
}

impl<'a, const INTENTIONAL: bool> From<&'a str> for ComponentEventsDropped<'a, INTENTIONAL> {
fn from(reason: &'a str) -> Self {
Self { count: 0, reason }
}
}

impl<'a, const INTENTIONAL: bool> RegisterInternalEvent
for ComponentEventsDropped<'a, INTENTIONAL>
{
type Handle = DroppedHandle<'a, INTENTIONAL>;
fn register(self) -> Self::Handle {
Self::Handle {
discarded_events: register_counter!(
"component_discarded_events_total",
"intentional" => if INTENTIONAL { "true" } else { "false" },
),
reason: self.reason,
}
}
}

#[derive(Clone)]
pub struct DroppedHandle<'a, const INTENDED: bool> {
discarded_events: Counter,
reason: &'a str,
}

impl<'a, const INTENDED: bool> InternalEventHandle for DroppedHandle<'a, INTENDED> {
type Data = Count;
fn emit(&self, data: Self::Data) {
let message = "Events dropped";
if INTENDED {
debug!(
message,
intentional = INTENDED,
count = data.0,
reason = self.reason,
internal_log_rate_limit = true,
);
} else {
error!(
message,
intentional = INTENDED,
count = data.0,
reason = self.reason,
internal_log_rate_limit = true,
);
}
self.discarded_events.increment(data.0 as u64);
}
}
3 changes: 3 additions & 0 deletions lib/vector-common/src/internal_event/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod bytes_received;
mod bytes_sent;
pub mod component_events_dropped;
mod events_received;
mod events_sent;
mod prelude;
Expand All @@ -9,9 +10,11 @@ pub use metrics::SharedString;

pub use bytes_received::BytesReceived;
pub use bytes_sent::BytesSent;
pub use component_events_dropped::{ComponentEventsDropped, INTENTIONAL, UNINTENTIONAL};
pub use events_received::EventsReceived;
pub use events_sent::{EventsSent, DEFAULT_OUTPUT};
pub use prelude::{error_stage, error_type};
pub use service::{CallError, PollReadyError};

pub trait InternalEvent: Sized {
fn emit(self);
Expand Down
37 changes: 36 additions & 1 deletion lib/vector-common/src/internal_event/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use metrics::counter;

use super::{error_stage, error_type, InternalEvent};
use super::{emit, error_stage, error_type, ComponentEventsDropped, InternalEvent, UNINTENTIONAL};

#[derive(Debug)]
pub struct PollReadyError<E> {
Expand All @@ -27,3 +27,38 @@ impl<E: std::fmt::Debug> InternalEvent for PollReadyError<E> {
Some("ServicePollReadyError")
}
}

#[derive(Debug)]
pub struct CallError<E> {
pub error: E,
pub request_id: usize,
pub count: usize,
}

impl<E: std::fmt::Debug> InternalEvent for CallError<E> {
fn emit(self) {
let reason = "Service call failed. No retries or retries exhausted.";
error!(
message = reason,
error = ?self.error,
request_id = self.request_id,
error_type = error_type::REQUEST_FAILED,
stage = error_stage::SENDING,
internal_log_rate_secs = true,
);
counter!(
"component_errors_total", 1,
"error_type" => error_type::REQUEST_FAILED,
"stage" => error_stage::SENDING,
);

emit(ComponentEventsDropped::<UNINTENTIONAL> {
reason,
count: self.count,
});
}

fn name(&self) -> Option<&'static str> {
Some("ServiceCallError")
}
}
2 changes: 2 additions & 0 deletions lib/vector-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ pub use finalizer::EmptyStream;

pub mod internal_event;

pub mod request_metadata;

pub mod shutdown;

#[cfg(feature = "sensitive_string")]
Expand Down
96 changes: 96 additions & 0 deletions lib/vector-common/src/request_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::ops::Add;

/// Metadata for batch requests.
#[derive(Clone, Debug, Default, Copy)]
pub struct RequestMetadata {
/// Number of events represented by this batch request.
event_count: usize,
/// Size, in bytes, of the in-memory representation of all events in this batch request.
events_byte_size: usize,
/// Size, in bytes, of the estimated JSON-encoded representation of all events in this batch request.
events_estimated_json_encoded_byte_size: usize,
/// Uncompressed size, in bytes, of the encoded events in this batch request.
request_encoded_size: usize,
/// On-the-wire size, in bytes, of the batch request itself after compression, etc.
///
/// This is akin to the bytes sent/received over the network, regardless of whether or not compression was used.
request_wire_size: usize,
}

// TODO: Make this struct the object which emits the actual internal telemetry i.e. events sent, bytes sent, etc.
impl RequestMetadata {
#[must_use]
pub fn new(
event_count: usize,
events_byte_size: usize,
request_encoded_size: usize,
request_wire_size: usize,
events_estimated_json_encoded_byte_size: usize,
) -> Self {
Self {
event_count,
events_byte_size,
events_estimated_json_encoded_byte_size,
request_encoded_size,
request_wire_size,
}
}

#[must_use]
pub const fn event_count(&self) -> usize {
self.event_count
}

#[must_use]
pub const fn events_byte_size(&self) -> usize {
self.events_byte_size
}

#[must_use]
pub const fn events_estimated_json_encoded_byte_size(&self) -> usize {
self.events_estimated_json_encoded_byte_size
}

#[must_use]
pub const fn request_encoded_size(&self) -> usize {
self.request_encoded_size
}

#[must_use]
pub const fn request_wire_size(&self) -> usize {
self.request_wire_size
}

/// Constructs a `RequestMetadata` by summation of the "batch" of `RequestMetadata` provided.
#[must_use]
pub fn from_batch<T: IntoIterator<Item = RequestMetadata>>(metadata_iter: T) -> Self {
let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, 0);

for metadata in metadata_iter {
metadata_sum = metadata_sum + &metadata;
}
metadata_sum
}
}

impl<'a> Add<&'a RequestMetadata> for RequestMetadata {
type Output = RequestMetadata;

/// Adds the other `RequestMetadata` to this one.
fn add(self, other: &'a Self::Output) -> Self::Output {
Self::Output {
event_count: self.event_count + other.event_count,
events_byte_size: self.events_byte_size + other.events_byte_size,
events_estimated_json_encoded_byte_size: self.events_estimated_json_encoded_byte_size
+ other.events_estimated_json_encoded_byte_size,
request_encoded_size: self.request_encoded_size + other.request_encoded_size,
request_wire_size: self.request_wire_size + other.request_wire_size,
}
}
}

/// Objects implementing this trait have metadata that describes the request.
pub trait MetaDescriptive {
/// Returns the `RequestMetadata` associated with this object.
fn get_metadata(&self) -> RequestMetadata;
}
51 changes: 40 additions & 11 deletions lib/vector-core/src/stream/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use futures::{poll, FutureExt, Stream, StreamExt, TryFutureExt};
use tokio::{pin, select};
use tower::Service;
use tracing::Instrument;
use vector_common::internal_event::{service, BytesSent, CountByteSize};
use vector_common::internal_event::{BytesSent, CallError, CountByteSize, PollReadyError};
use vector_common::request_metadata::{MetaDescriptive, RequestMetadata};

use super::FuturesUnorderedCount;
use crate::{
Expand Down Expand Up @@ -53,7 +54,7 @@ impl<St, Svc> Driver<St, Svc> {
impl<St, Svc> Driver<St, Svc>
where
St: Stream,
St::Item: Finalizable,
St::Item: Finalizable + MetaDescriptive,
Svc: Service<St::Item>,
Svc::Error: fmt::Debug + 'static,
Svc::Future: Send + 'static,
Expand Down Expand Up @@ -122,7 +123,7 @@ where
let svc = match maybe_ready {
Poll::Ready(Ok(())) => &mut service,
Poll::Ready(Err(error)) => {
emit(service::PollReadyError{ error });
emit(PollReadyError{ error });
return Err(())
}
Poll::Pending => {
Expand All @@ -142,9 +143,11 @@ where
);
let finalizers = req.take_finalizers();

let metadata = req.get_metadata();

let fut = svc.call(req)
.err_into()
.map(move |result| Self::handle_response(result, request_id, finalizers))
.map(move |result| Self::handle_response(result, request_id, finalizers, &metadata))
.instrument(info_span!("request", request_id).or_current());

in_flight.push(fut);
Expand All @@ -167,11 +170,11 @@ where
result: Result<Svc::Response, Svc::Error>,
request_id: usize,
finalizers: EventFinalizers,
metadata: &RequestMetadata,
) {
match result {
Err(error) => {
// `Error` and `EventsDropped` internal events are emitted in the sink retry logic.
error!(message = "Service call failed.", ?error, request_id);
Self::emit_call_error(Some(error), request_id, metadata.event_count());
finalizers.update_status(EventStatus::Rejected);
}
Ok(response) => {
Expand All @@ -190,11 +193,26 @@ where
byte_size: cbs.1,
output: None,
});

// This condition occurs specifically when the `HttpBatchService::call()` is called *within* the `Service::call()`
} else if response.event_status() == EventStatus::Rejected {
Self::emit_call_error(None, request_id, metadata.event_count());
finalizers.update_status(EventStatus::Rejected);
}
}
};
drop(finalizers); // suppress "argument not consumed" warning
}

/// Emit the `Error` and `EventsDropped` internal events.
/// This scenario occurs after retries have been attempted.
fn emit_call_error(error: Option<Svc::Error>, request_id: usize, count: usize) {
emit(CallError {
error,
request_id,
count,
});
}
}

#[cfg(test)]
Expand All @@ -216,17 +234,18 @@ mod tests {
};
use tokio_util::sync::PollSemaphore;
use tower::Service;
use vector_common::finalization::{
BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, Finalizable,
use vector_common::{
finalization::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, Finalizable},
request_metadata::RequestMetadata,
};
use vector_common::internal_event::CountByteSize;
use vector_common::{internal_event::CountByteSize, request_metadata::MetaDescriptive};

use super::{Driver, DriverResponse};

type Counter = Arc<AtomicUsize>;

#[derive(Debug)]
struct DelayRequest(usize, EventFinalizers);
struct DelayRequest(usize, EventFinalizers, RequestMetadata);

impl DelayRequest {
fn new(value: usize, counter: &Counter) -> Self {
Expand All @@ -236,7 +255,11 @@ mod tests {
receiver.await;
counter.fetch_add(value, Ordering::Relaxed);
});
Self(value, EventFinalizers::new(EventFinalizer::new(batch)))
Self(
value,
EventFinalizers::new(EventFinalizer::new(batch)),
RequestMetadata::default(),
)
}
}

Expand All @@ -246,6 +269,12 @@ mod tests {
}
}

impl MetaDescriptive for DelayRequest {
fn get_metadata(&self) -> RequestMetadata {
self.2
}
}

struct DelayResponse;

impl DriverResponse for DelayResponse {
Expand Down
9 changes: 4 additions & 5 deletions src/internal_events/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ pub mod source {

#[cfg(feature = "sinks-amqp")]
pub mod sink {
use crate::{
emit,
internal_events::{ComponentEventsDropped, UNINTENTIONAL},
};
use crate::emit;
use metrics::counter;
use vector_common::internal_event::{error_stage, error_type};
use vector_common::internal_event::{
error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL,
};
use vector_core::internal_event::InternalEvent;

#[derive(Debug)]
Expand Down
Loading

0 comments on commit 9348d36

Please sign in to comment.