diff --git a/opentelemetry-otlp/src/logs.rs b/opentelemetry-otlp/src/logs.rs index 3f21697fb0..6c936403c1 100644 --- a/opentelemetry-otlp/src/logs.rs +++ b/opentelemetry-otlp/src/logs.rs @@ -100,7 +100,7 @@ impl LogExporter { impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { async fn export<'a>( &mut self, - batch: Vec>, + batch: Vec>>, ) -> opentelemetry::logs::LogResult<()> { self.client.export(batch).await } diff --git a/opentelemetry-proto/src/transform/common.rs b/opentelemetry-proto/src/transform/common.rs index ff42479288..6e3bdfd8b5 100644 --- a/opentelemetry-proto/src/transform/common.rs +++ b/opentelemetry-proto/src/transform/common.rs @@ -108,6 +108,40 @@ pub mod tonic { } } + impl + From<( + Cow<'_, opentelemetry_sdk::InstrumentationLibrary>, + Option>, + )> for InstrumentationScope + { + fn from( + data: ( + Cow<'_, opentelemetry_sdk::InstrumentationLibrary>, + Option>, + ), + ) -> Self { + let (library, target) = data; + if let Some(t) = target { + InstrumentationScope { + name: t.to_string(), + version: String::new(), + attributes: vec![], + ..Default::default() + } + } else { + InstrumentationScope { + name: library.name.clone().into_owned(), + version: library + .version + .as_ref() + .map(ToString::to_string) + .unwrap_or_default(), + attributes: Attributes::from(library.attributes.clone()).0, + ..Default::default() + } + } + } + } /// Wrapper type for Vec<`KeyValue`> #[derive(Default, Debug)] pub struct Attributes(pub ::std::vec::Vec); diff --git a/opentelemetry-proto/src/transform/logs.rs b/opentelemetry-proto/src/transform/logs.rs index dfd845c5d8..d959194364 100644 --- a/opentelemetry-proto/src/transform/logs.rs +++ b/opentelemetry-proto/src/transform/logs.rs @@ -139,7 +139,7 @@ pub mod tonic { impl From<( - opentelemetry_sdk::export::logs::LogData, + opentelemetry_sdk::export::logs::LogData<'_>, &ResourceAttributesWithSchema, )> for ResourceLogs { @@ -164,15 +164,21 @@ pub mod tonic { .clone() .map(Into::into) .unwrap_or_default(), - scope: Some((log_data.instrumentation, log_data.record.target.clone()).into()), - log_records: vec![log_data.record.into()], + scope: Some( + ( + log_data.instrumentation.into_owned(), + log_data.record.target.clone(), + ) + .into(), + ), + log_records: vec![log_data.record.into_owned().into()], }], } } } pub fn group_logs_by_resource_and_scope( - logs: Vec, + logs: Vec>, resource: &ResourceAttributesWithSchema, ) -> Vec { // Group logs by target or instrumentation name @@ -180,14 +186,13 @@ pub mod tonic { HashMap::new(), |mut scope_map: HashMap< Cow<'static, str>, - Vec<&opentelemetry_sdk::export::logs::LogData>, + Vec<&opentelemetry_sdk::export::logs::LogData<'_>>, >, log| { - let key = log - .record - .target - .clone() - .unwrap_or_else(|| log.instrumentation.name.clone()); + let key = + log.record.target.clone().unwrap_or_else(|| { + Cow::Owned(log.instrumentation.name.clone().into_owned()) + }); scope_map.entry(key).or_default().push(log); scope_map }, @@ -197,13 +202,20 @@ pub mod tonic { .into_iter() .map(|(key, log_data)| ScopeLogs { scope: Some(InstrumentationScope::from(( - &log_data.first().unwrap().instrumentation, - Some(key), + Cow::Owned( + log_data + .first() + .unwrap() + .instrumentation + .clone() + .into_owned(), + ), + Some(key.into_owned().into()), ))), schema_url: resource.schema_url.clone().unwrap_or_default(), log_records: log_data .into_iter() - .map(|log_data| log_data.record.clone().into()) + .map(|log_data| log_data.record.clone().into_owned().into()) .collect(), }) .collect(); @@ -225,18 +237,21 @@ mod tests { use opentelemetry::logs::LogRecord as _; use opentelemetry_sdk::export::logs::LogData; use opentelemetry_sdk::{logs::LogRecord, Resource}; + use std::borrow::Cow; use std::time::SystemTime; - fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData { + fn create_test_log_data<'a>(instrumentation_name: &str, _message: &str) -> LogData<'a> { let mut logrecord = LogRecord::default(); logrecord.set_timestamp(SystemTime::now()); logrecord.set_observed_timestamp(SystemTime::now()); LogData { - instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder( - instrumentation_name.to_string(), - ) - .build(), - record: logrecord, + instrumentation: Cow::Owned( + opentelemetry_sdk::InstrumentationLibrary::builder( + instrumentation_name.to_string(), + ) + .build(), + ), + record: Cow::Owned(logrecord), } } diff --git a/opentelemetry-sdk/benches/log.rs b/opentelemetry-sdk/benches/log.rs index ba0a94cd17..07cca2c53f 100644 --- a/opentelemetry-sdk/benches/log.rs +++ b/opentelemetry-sdk/benches/log.rs @@ -27,8 +27,7 @@ use opentelemetry::trace::Tracer; use opentelemetry::trace::TracerProvider as _; use opentelemetry::Key; use opentelemetry_sdk::export::logs::LogData; -use opentelemetry_sdk::logs::LogProcessor; -use opentelemetry_sdk::logs::{Logger, LoggerProvider}; +use opentelemetry_sdk::logs::{LogProcessor, Logger, LoggerProvider}; use opentelemetry_sdk::trace; use opentelemetry_sdk::trace::{Sampler, TracerProvider}; diff --git a/opentelemetry-sdk/src/export/logs/mod.rs b/opentelemetry-sdk/src/export/logs/mod.rs index 9588339462..5d725df910 100644 --- a/opentelemetry-sdk/src/export/logs/mod.rs +++ b/opentelemetry-sdk/src/export/logs/mod.rs @@ -15,7 +15,7 @@ use std::fmt::Debug; #[async_trait] pub trait LogExporter: Send + Sync + Debug { /// Exports a batch of [`LogData`]. - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()>; + async fn export<'a>(&mut self, batch: Vec>>) -> LogResult<()>; /// Shuts down the exporter. fn shutdown(&mut self) {} #[cfg(feature = "logs_level_enabled")] @@ -29,11 +29,11 @@ pub trait LogExporter: Send + Sync + Debug { /// `LogData` represents a single log event without resource context. #[derive(Clone, Debug)] -pub struct LogData { - /// Log record - pub record: LogRecord, +pub struct LogData<'a> { + /// Log record, which can be borrowed or owned. + pub record: Cow<'a, LogRecord>, /// Instrumentation details for the emitter who produced this `LogEvent`. - pub instrumentation: InstrumentationLibrary, + pub instrumentation: Cow<'a, InstrumentationLibrary>, } /// Describes the result of an export. diff --git a/opentelemetry-sdk/src/growable_array.rs b/opentelemetry-sdk/src/growable_array.rs new file mode 100644 index 0000000000..f174bedea2 --- /dev/null +++ b/opentelemetry-sdk/src/growable_array.rs @@ -0,0 +1,374 @@ +/// The default max capacity for the stack portion of `GrowableArray`. +const DEFAULT_MAX_INLINE_CAPACITY: usize = 10; +/// The default initial capacity for the vector portion of `GrowableArray`. +const DEFAULT_INITIAL_OVERFLOW_CAPACITY: usize = 5; + +#[derive(Debug, Clone, PartialEq)] +/// A hybrid vector that starts with a fixed-size array and grows dynamically with a vector. +/// +/// `GrowableArray` uses an internal fixed-size array (`inline`) for storing elements until it reaches +/// `MAX_INLINE_CAPACITY`. When this capacity is exceeded, additional elements are stored in a heap-allocated +/// vector (`overflow`). This structure allows for efficient use of stack memory for small numbers of elements, +/// while still supporting dynamic growth. +/// +pub(crate) struct GrowableArray< + T: Default + Clone + PartialEq, + const MAX_INLINE_CAPACITY: usize = DEFAULT_MAX_INLINE_CAPACITY, + const INITIAL_OVERFLOW_CAPACITY: usize = DEFAULT_INITIAL_OVERFLOW_CAPACITY, +> { + inline: [T; MAX_INLINE_CAPACITY], + overflow: Option>, + count: usize, +} + +impl< + T: Default + Clone + PartialEq, + const MAX_INLINE_CAPACITY: usize, + const INITIAL_OVERFLOW_CAPACITY: usize, + > Default for GrowableArray +{ + fn default() -> Self { + Self { + inline: [(); MAX_INLINE_CAPACITY].map(|_| T::default()), + overflow: None, + count: 0, + } + } +} + +impl< + T: Default + Clone + PartialEq, + const MAX_INLINE_CAPACITY: usize, + const INITIAL_OVERFLOW_CAPACITY: usize, + > GrowableArray +{ + /// Creates a new `GrowableArray` with the default initial capacity. + #[allow(dead_code)] + pub(crate) fn new() -> Self { + Self::default() + } + + /// Pushes a value into the `GrowableArray`. + /// + /// If the internal array (`inline`) has reached its capacity (`MAX_INLINE_CAPACITY`), the value is pushed + /// into the heap-allocated vector (`overflow`). Otherwise, it is stored in the array. + #[allow(dead_code)] + #[inline] + pub(crate) fn push(&mut self, value: T) { + if self.count < MAX_INLINE_CAPACITY { + self.inline[self.count] = value; + self.count += 1; + } else { + self.overflow + .get_or_insert_with(|| Vec::with_capacity(INITIAL_OVERFLOW_CAPACITY)) + .push(value); + } + } + + /// Gets a reference to the value at the specified index. + /// + /// Returns `None` if the index is out of bounds. + #[allow(dead_code)] + #[inline] + pub(crate) fn get(&self, index: usize) -> Option<&T> { + if index < self.count { + Some(&self.inline[index]) + } else if let Some(ref overflow) = self.overflow { + overflow.get(index - MAX_INLINE_CAPACITY) + } else { + None + } + } + + /// Returns the number of elements in the `GrowableArray`. + #[allow(dead_code)] + #[inline] + pub(crate) fn len(&self) -> usize { + self.count + self.overflow.as_ref().map_or(0, Vec::len) + } + + /// Returns an iterator over the elements in the `GrowableArray`. + /// + /// The iterator yields elements from the internal array (`initial`) first, followed by elements + /// from the vector (`overflow`) if present. This allows for efficient iteration over both + /// stack-allocated and heap-allocated portions. + /// + #[allow(dead_code)] + #[inline] + pub(crate) fn iter(&self) -> impl Iterator { + if self.overflow.is_none() || self.overflow.as_ref().unwrap().is_empty() { + self.inline.iter().take(self.count).chain([].iter()) // Chaining with an empty array + // so that both `if` and `else` branch return the same type + } else { + self.inline + .iter() + .take(self.count) + .chain(self.overflow.as_ref().unwrap().iter()) + } + } +} + +// Implement `IntoIterator` for `GrowableArray` +impl IntoIterator + for GrowableArray +{ + type Item = T; + type IntoIter = GrowableArrayIntoIter; + + fn into_iter(self) -> Self::IntoIter { + GrowableArrayIntoIter::::new(self) + } +} + +/// Iterator for consuming a `GrowableArray`. +/// +#[derive(Debug)] +pub(crate) struct GrowableArrayIntoIter< + T: Default + Clone + PartialEq, + const INLINE_CAPACITY: usize, +> { + iter: std::iter::Chain< + std::iter::Take>, + std::vec::IntoIter, + >, +} + +impl + GrowableArrayIntoIter +{ + fn new(source: GrowableArray) -> Self { + Self { + iter: Self::get_iterator(source), + } + } + + fn get_iterator( + source: GrowableArray, + ) -> std::iter::Chain< + std::iter::Take>, + std::vec::IntoIter, + > { + if source.overflow.is_none() || source.overflow.as_ref().unwrap().is_empty() { + source + .inline + .into_iter() + .take(source.count) + .chain(Vec::::new()) + } else { + source + .inline + .into_iter() + .take(source.count) + .chain(source.overflow.unwrap()) + } + } +} + +impl Iterator + for GrowableArrayIntoIter +{ + type Item = T; + + fn next(&mut self) -> Option { + self.iter.next() + } +} + +#[cfg(test)] +mod tests { + use crate::growable_array::{ + GrowableArray, DEFAULT_INITIAL_OVERFLOW_CAPACITY, DEFAULT_MAX_INLINE_CAPACITY, + }; + use opentelemetry::logs::AnyValue; + use opentelemetry::Key; + + type KeyValuePair = Option<(Key, AnyValue)>; + + #[test] + fn test_push_and_get() { + let mut collection = GrowableArray::::new(); + for i in 0..15 { + collection.push(i); + } + for i in 0..15 { + assert_eq!(collection.get(i), Some(&(i as i32))); + } + } + + #[test] + fn test_len() { + let mut collection = GrowableArray::::new(); + for i in 0..15 { + collection.push(i); + } + assert_eq!(collection.len(), 15); + } + + #[test] + fn test_into_iter() { + let mut collection = GrowableArray::::new(); + for i in 0..15 { + collection.push(i); + } + let mut iter = collection.into_iter(); + for i in 0..15 { + assert_eq!(iter.next(), Some(i)); + } + assert_eq!(iter.next(), None); + } + + #[test] + fn test_ref_iter() { + let mut collection = GrowableArray::::new(); + for i in 0..15 { + collection.push(i); + } + let iter = collection.iter(); + let mut count = 0; + for value in iter { + assert_eq!(*value, count); + count += 1; + } + assert_eq!(count, 15); + } + + #[test] + fn test_key_value_pair_storage_growable_array() { + let mut collection = GrowableArray::::new(); + + let key1 = Key::from("key1"); + let value1 = AnyValue::String("value1".into()); + let key2 = Key::from("key2"); + let value2 = AnyValue::Int(42); + + collection.push(Some((key1.clone(), value1.clone()))); + collection.push(Some((key2.clone(), value2.clone()))); + + assert_eq!( + collection + .get(0) + .and_then(|kv| kv.as_ref().map(|kv| (&kv.0, &kv.1))), + Some((&key1, &value1)) + ); + assert_eq!( + collection + .get(1) + .and_then(|kv| kv.as_ref().map(|kv| (&kv.0, &kv.1))), + Some((&key2, &value2)) + ); + assert_eq!(collection.len(), 2); + + // Test iterating over the key-value pairs + let mut iter = collection.into_iter(); + assert_eq!(iter.next(), Some(Some((key1, value1)))); + assert_eq!(iter.next(), Some(Some((key2, value2)))); + assert_eq!(iter.next(), None); + } + + #[test] + fn test_empty_attributes() { + let collection = GrowableArray::::new(); + assert_eq!(collection.len(), 0); + assert_eq!(collection.get(0), None); + + let mut iter = collection.into_iter(); + assert_eq!(iter.next(), None); + } + + #[test] + fn test_less_than_max_stack_capacity() { + let mut collection = GrowableArray::::new(); + for i in 0..DEFAULT_MAX_INLINE_CAPACITY - 1 { + collection.push(i as i32); + } + assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY - 1); + + for i in 0..DEFAULT_MAX_INLINE_CAPACITY - 1 { + assert_eq!(collection.get(i), Some(&(i as i32))); + } + assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY - 1), None); + assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY), None); + + let mut iter = collection.into_iter(); + for i in 0..DEFAULT_MAX_INLINE_CAPACITY - 1 { + assert_eq!(iter.next(), Some(i as i32)); + } + assert_eq!(iter.next(), None); + } + + #[test] + fn test_exactly_max_stack_capacity() { + let mut collection = GrowableArray::::new(); + for i in 0..DEFAULT_MAX_INLINE_CAPACITY { + collection.push(i as i32); + } + assert_eq!(collection.len(), DEFAULT_MAX_INLINE_CAPACITY); + + for i in 0..DEFAULT_MAX_INLINE_CAPACITY { + assert_eq!(collection.get(i), Some(&(i as i32))); + } + assert_eq!(collection.get(DEFAULT_MAX_INLINE_CAPACITY), None); + + let mut iter = collection.into_iter(); + for i in 0..DEFAULT_MAX_INLINE_CAPACITY { + assert_eq!(iter.next(), Some(i as i32)); + } + assert_eq!(iter.next(), None); + } + + #[test] + fn test_exceeds_stack_but_not_initial_vec_capacity() { + let mut collection = GrowableArray::::new(); + for i in 0..(DEFAULT_MAX_INLINE_CAPACITY + DEFAULT_INITIAL_OVERFLOW_CAPACITY - 1) { + collection.push(i as i32); + } + assert_eq!( + collection.len(), + DEFAULT_MAX_INLINE_CAPACITY + DEFAULT_INITIAL_OVERFLOW_CAPACITY - 1 + ); + + for i in 0..(DEFAULT_MAX_INLINE_CAPACITY + DEFAULT_INITIAL_OVERFLOW_CAPACITY - 1) { + assert_eq!(collection.get(i), Some(&(i as i32))); + } + assert_eq!( + collection.get(DEFAULT_MAX_INLINE_CAPACITY + DEFAULT_INITIAL_OVERFLOW_CAPACITY - 1), + None + ); + assert_eq!( + collection.get(DEFAULT_MAX_INLINE_CAPACITY + DEFAULT_INITIAL_OVERFLOW_CAPACITY), + None + ); + + let mut iter = collection.into_iter(); + for i in 0..(DEFAULT_MAX_INLINE_CAPACITY + DEFAULT_INITIAL_OVERFLOW_CAPACITY - 1) { + assert_eq!(iter.next(), Some(i as i32)); + } + assert_eq!(iter.next(), None); + } + + #[test] + fn test_exceeds_both_stack_and_initial_vec_capacities() { + let mut collection = GrowableArray::::new(); + for i in 0..(DEFAULT_MAX_INLINE_CAPACITY + DEFAULT_INITIAL_OVERFLOW_CAPACITY + 5) { + collection.push(i as i32); + } + assert_eq!( + collection.len(), + DEFAULT_MAX_INLINE_CAPACITY + DEFAULT_INITIAL_OVERFLOW_CAPACITY + 5 + ); + + for i in 0..(DEFAULT_MAX_INLINE_CAPACITY + DEFAULT_INITIAL_OVERFLOW_CAPACITY + 5) { + assert_eq!(collection.get(i), Some(&(i as i32))); + } + assert_eq!( + collection.get(DEFAULT_MAX_INLINE_CAPACITY + DEFAULT_INITIAL_OVERFLOW_CAPACITY + 5), + None + ); + + let mut iter = collection.into_iter(); + for i in 0..(DEFAULT_MAX_INLINE_CAPACITY + DEFAULT_INITIAL_OVERFLOW_CAPACITY + 5) { + assert_eq!(iter.next(), Some(i as i32)); + } + assert_eq!(iter.next(), None); + } +} diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index 852f0b8327..5ce2be9474 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -121,6 +121,7 @@ #![cfg_attr(test, deny(warnings))] pub mod export; +pub(crate) mod growable_array; mod instrumentation; #[cfg(feature = "logs")] #[cfg_attr(docsrs, doc(cfg(feature = "logs")))] diff --git a/opentelemetry-sdk/src/logs/log_emitter.rs b/opentelemetry-sdk/src/logs/log_emitter.rs index faadf473b3..74f4890f6b 100644 --- a/opentelemetry-sdk/src/logs/log_emitter.rs +++ b/opentelemetry-sdk/src/logs/log_emitter.rs @@ -40,10 +40,6 @@ pub struct LoggerProvider { /// Default logger name if empty string is provided. const DEFAULT_COMPONENT_NAME: &str = "rust.opentelemetry.io/sdk/logger"; -// According to a Go-specific study mentioned on https://go.dev/blog/slog, -// up to 5 attributes is the most common case. We have chosen 8 as the default -// capacity for attributes to avoid reallocation in common scenarios. -const PREALLOCATED_ATTRIBUTE_CAPACITY: usize = 8; impl opentelemetry::logs::LoggerProvider for LoggerProvider { type Logger = Logger; @@ -250,11 +246,7 @@ impl opentelemetry::logs::Logger for Logger { type LogRecord = LogRecord; fn create_log_record(&self) -> Self::LogRecord { - // Reserve attributes memory for perf optimization. This may change in future. - LogRecord { - attributes: Some(Vec::with_capacity(PREALLOCATED_ATTRIBUTE_CAPACITY)), - ..Default::default() - } + LogRecord::default() } /// Emit a `LogRecord`. @@ -274,8 +266,8 @@ impl opentelemetry::logs::Logger for Logger { } let mut data = LogData { - record: log_record, - instrumentation: self.instrumentation_library().clone(), + record: Cow::Borrowed(&log_record), + instrumentation: Cow::Borrowed(self.instrumentation_library()), }; for p in processors { @@ -336,7 +328,7 @@ mod tests { } impl LogProcessor for ShutdownTestLogProcessor { - fn emit(&self, _data: &mut LogData) { + fn emit(&self, _data: &mut LogData<'_>) { self.is_shutdown .lock() .map(|is_shutdown| { @@ -571,7 +563,7 @@ mod tests { } impl LogProcessor for LazyLogProcessor { - fn emit(&self, _data: &mut LogData) { + fn emit(&self, _data: &mut LogData<'_>) { // nothing to do. } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index 57bb8afe71..711a37b99d 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -55,7 +55,7 @@ pub trait LogProcessor: Send + Sync + Debug { /// /// # Parameters /// - `data`: A mutable reference to `LogData` representing the log record. - fn emit(&self, data: &mut LogData); + fn emit(&self, data: &mut LogData<'_>); /// Force the logs lying in the cache to be exported. fn force_flush(&self) -> LogResult<()>; /// Shuts down the processor. @@ -90,7 +90,7 @@ impl SimpleLogProcessor { } impl LogProcessor for SimpleLogProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { // noop after shutdown if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) { return; @@ -152,10 +152,14 @@ impl Debug for BatchLogProcessor { } impl LogProcessor for BatchLogProcessor { - fn emit(&self, data: &mut LogData) { + fn emit(&self, data: &mut LogData<'_>) { + let owned_data = LogData { + record: Cow::Owned(data.record.clone().into_owned()), + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; let result = self .message_sender - .try_send(BatchMessage::ExportLog(data.clone())); + .try_send(BatchMessage::ExportLog(owned_data)); if let Err(err) = result { global::handle_error(LogError::Other(err.into())); @@ -307,7 +311,7 @@ async fn export_with_timeout<'a, R, E>( time_out: Duration, exporter: &mut E, runtime: &R, - batch: Vec>, + batch: Vec>>, ) -> ExportResult where R: RuntimeChannel, @@ -497,7 +501,7 @@ where #[derive(Debug)] enum BatchMessage { /// Export logs, usually called when the log is emitted. - ExportLog(LogData), + ExportLog(LogData<'static>), /// Flush the current buffer to the backend, it can be triggered by /// pre configured interval or a call to `force_push` function. Flush(Option>), @@ -528,9 +532,9 @@ mod tests { Resource, }; use async_trait::async_trait; - use opentelemetry::logs::AnyValue; #[cfg(feature = "logs_level_enabled")] use opentelemetry::logs::Severity; + use opentelemetry::logs::{AnyValue, LogRecord}; use opentelemetry::logs::{Logger, LoggerProvider as _}; use opentelemetry::Key; use opentelemetry::{logs::LogResult, KeyValue}; @@ -545,7 +549,7 @@ mod tests { #[async_trait] impl LogExporter for MockLogExporter { - async fn export<'a>(&mut self, _batch: Vec>) -> LogResult<()> { + async fn export<'a>(&mut self, _batch: Vec>>) -> LogResult<()> { Ok(()) } @@ -814,20 +818,29 @@ mod tests { #[derive(Debug)] struct FirstProcessor { - pub(crate) logs: Arc>>, + pub(crate) logs: Arc>>>, } impl LogProcessor for FirstProcessor { - fn emit(&self, data: &mut LogData) { - // add attribute - data.record.attributes.get_or_insert(vec![]).push(( + fn emit(&self, data: &mut LogData<'_>) { + // Ensure the record is owned before modifying + let record = data.record.to_mut(); + // Add attribute + record.add_attribute( Key::from_static_str("processed_by"), AnyValue::String("FirstProcessor".into()), - )); - // update body - data.record.body = Some("Updated by FirstProcessor".into()); + ); + + // Update body + record.body = Some(AnyValue::String("Updated by FirstProcessor".into())); + + // Convert the modified LogData to an owned version + let owned_data = LogData { + record: Cow::Owned(record.clone()), // Since record is already owned, no need to clone deeply + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; - self.logs.lock().unwrap().push(data.clone()); //clone as the LogProcessor is storing the data. + self.logs.lock().unwrap().push(owned_data); // Clone as the LogProcessor is storing the data. } #[cfg(feature = "logs_level_enabled")] @@ -846,22 +859,26 @@ mod tests { #[derive(Debug)] struct SecondProcessor { - pub(crate) logs: Arc>>, + pub(crate) logs: Arc>>>, } impl LogProcessor for SecondProcessor { - fn emit(&self, data: &mut LogData) { - assert!(data.record.attributes.as_ref().map_or(false, |attrs| { - attrs.iter().any(|(key, value)| { - key.as_str() == "processed_by" - && value == &AnyValue::String("FirstProcessor".into()) - }) - })); + fn emit(&self, data: &mut LogData<'_>) { + assert!(data.record.attributes_contains( + &Key::from_static_str("processed_by"), + &AnyValue::String("FirstProcessor".into()) + )); assert!( data.record.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) ); - self.logs.lock().unwrap().push(data.clone()); + // Convert the modified LogData to an owned version before storing it + let record = data.record.to_mut(); + let owned_data = LogData { + record: Cow::Owned(record.clone()), // Convert the record to owned + instrumentation: Cow::Owned(data.instrumentation.clone().into_owned()), + }; + self.logs.lock().unwrap().push(owned_data); } #[cfg(feature = "logs_level_enabled")] @@ -906,19 +923,15 @@ mod tests { let first_log = &first_processor_logs.lock().unwrap()[0]; let second_log = &second_processor_logs.lock().unwrap()[0]; - assert!(first_log.record.attributes.iter().any(|attrs| { - attrs.iter().any(|(key, value)| { - key.as_str() == "processed_by" - && value == &AnyValue::String("FirstProcessor".into()) - }) - })); + assert!(first_log.record.attributes_contains( + &Key::from_static_str("processed_by"), + &AnyValue::String("FirstProcessor".into()) + )); + assert!(second_log.record.attributes_contains( + &Key::from_static_str("processed_by"), + &AnyValue::String("FirstProcessor".into()) + )); - assert!(second_log.record.attributes.iter().any(|attrs| { - attrs.iter().any(|(key, value)| { - key.as_str() == "processed_by" - && value == &AnyValue::String("FirstProcessor".into()) - }) - })); assert!( first_log.record.body.clone().unwrap() == AnyValue::String("Updated by FirstProcessor".into()) diff --git a/opentelemetry-sdk/src/logs/mod.rs b/opentelemetry-sdk/src/logs/mod.rs index 207da4255c..23063d7d06 100644 --- a/opentelemetry-sdk/src/logs/mod.rs +++ b/opentelemetry-sdk/src/logs/mod.rs @@ -1,8 +1,7 @@ //! # OpenTelemetry Log SDK - mod log_emitter; mod log_processor; -mod record; +pub(crate) mod record; pub use log_emitter::{Builder, Logger, LoggerProvider}; pub use log_processor::{ @@ -80,17 +79,12 @@ mod tests { .expect("Atleast one log is expected to be present."); assert_eq!(log.instrumentation.name, "test-logger"); assert_eq!(log.record.severity_number, Some(Severity::Error)); - let attributes: Vec<(Key, AnyValue)> = log - .record - .attributes - .clone() - .expect("Attributes are expected"); - assert_eq!(attributes.len(), 10); + assert_eq!(log.record.attributes_len(), 10); for i in 1..=10 { - assert!(log.record.attributes.clone().unwrap().contains(&( - Key::new(format!("key{}", i)), - AnyValue::String(format!("value{}", i).into()) - ))); + assert!(log.record.attributes_contains( + &Key::new(format!("key{}", i)), + &AnyValue::String(format!("value{}", i).into()) + )); } // validate Resource diff --git a/opentelemetry-sdk/src/logs/record.rs b/opentelemetry-sdk/src/logs/record.rs index 856cb7d64e..4699623e50 100644 --- a/opentelemetry-sdk/src/logs/record.rs +++ b/opentelemetry-sdk/src/logs/record.rs @@ -1,3 +1,4 @@ +use crate::growable_array::GrowableArray; use opentelemetry::{ logs::{AnyValue, Severity}, trace::{SpanContext, SpanId, TraceFlags, TraceId}, @@ -5,6 +6,14 @@ use opentelemetry::{ }; use std::{borrow::Cow, time::SystemTime}; +// According to a Go-specific study mentioned on https://go.dev/blog/slog, +// up to 5 attributes is the most common case. +const PREALLOCATED_ATTRIBUTE_CAPACITY: usize = 5; + +/// A vector of `Option<(Key, AnyValue)>` with default capacity. +pub(crate) type LogRecordAttributes = + GrowableArray, PREALLOCATED_ATTRIBUTE_CAPACITY>; + #[derive(Debug, Default, Clone, PartialEq)] #[non_exhaustive] /// LogRecord represents all data carried by a log record, and @@ -34,7 +43,7 @@ pub struct LogRecord { pub body: Option, /// Additional attributes associated with this record - pub(crate) attributes: Option>, + pub(crate) attributes: LogRecordAttributes, } impl opentelemetry::logs::LogRecord for LogRecord { @@ -89,34 +98,29 @@ impl opentelemetry::logs::LogRecord for LogRecord { K: Into, V: Into, { - if let Some(ref mut attrs) = self.attributes { - attrs.push((key.into(), value.into())); - } else { - self.attributes = Some(vec![(key.into(), value.into())]); - } + self.attributes.push(Some((key.into(), value.into()))); } } impl LogRecord { - /// Provides an iterator over the attributes in the `LogRecord`. + /// Provides an iterator over the attributes. pub fn attributes_iter(&self) -> impl Iterator { - self.attributes - .as_ref() - .map_or_else(|| [].iter(), |attrs| attrs.iter()) + self.attributes.iter().filter_map(|opt| opt.as_ref()) } #[allow(dead_code)] /// Returns the number of attributes in the `LogRecord`. pub(crate) fn attributes_len(&self) -> usize { - self.attributes.as_ref().map_or(0, |attrs| attrs.len()) + self.attributes.len() } #[allow(dead_code)] - /// Returns true if the `LogRecord` contains the specified attribute. + /// Checks if the `LogRecord` contains the specified attribute. pub(crate) fn attributes_contains(&self, key: &Key, value: &AnyValue) -> bool { - self.attributes.as_ref().map_or(false, |attrs| { - attrs.iter().any(|(k, v)| k == key && v == value) - }) + self.attributes + .iter() + .flatten() + .any(|(k, v)| k == key && v == value) } } @@ -246,7 +250,7 @@ mod tests { #[test] fn compare_log_record() { - let log_record = LogRecord { + let mut log_record = LogRecord { event_name: Some(Cow::Borrowed("test_event")), target: Some(Cow::Borrowed("foo::bar")), timestamp: Some(SystemTime::now()), @@ -254,13 +258,14 @@ mod tests { severity_text: Some(Cow::Borrowed("ERROR")), severity_number: Some(Severity::Error), body: Some(AnyValue::String("Test body".into())), - attributes: Some(vec![(Key::new("key"), AnyValue::String("value".into()))]), + attributes: LogRecordAttributes::new(), trace_context: Some(TraceContext { trace_id: TraceId::from_u128(1), span_id: SpanId::from_u64(1), trace_flags: Some(TraceFlags::default()), }), }; + log_record.add_attribute(Key::new("key"), AnyValue::String("value".into())); let log_record_cloned = log_record.clone(); diff --git a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs index 8068fafaec..e73a6e27c1 100644 --- a/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/logs/in_memory_exporter.rs @@ -39,11 +39,20 @@ use std::sync::{Arc, Mutex}; /// #[derive(Clone, Debug)] pub struct InMemoryLogsExporter { - logs: Arc>>, + logs: Arc>>, resource: Arc>, should_reset_on_shutdown: bool, } +/// `OwnedLogData` represents a single log event without resource context. +#[derive(Debug, Clone)] +pub struct OwnedLogData { + /// Log record, which can be borrowed or owned. + pub record: LogRecord, + /// Instrumentation details for the emitter who produced this `LogEvent`. + pub instrumentation: InstrumentationLibrary, +} + impl Default for InMemoryLogsExporter { fn default() -> Self { InMemoryLogsExporterBuilder::new().build() @@ -175,10 +184,14 @@ impl InMemoryLogsExporter { #[async_trait] impl LogExporter for InMemoryLogsExporter { - async fn export<'a>(&mut self, batch: Vec>) -> LogResult<()> { + async fn export<'a>(&mut self, batch: Vec>>) -> LogResult<()> { let mut logs_guard = self.logs.lock().map_err(LogError::from)?; for log in batch.into_iter() { - logs_guard.push(log.into_owned()); + let owned_log = OwnedLogData { + record: log.record.clone().into_owned(), + instrumentation: log.instrumentation.clone().into_owned(), + }; + logs_guard.push(owned_log); } Ok(()) } diff --git a/opentelemetry-stdout/src/logs/exporter.rs b/opentelemetry-stdout/src/logs/exporter.rs index dacefa3d8b..6befdf0fa7 100644 --- a/opentelemetry-stdout/src/logs/exporter.rs +++ b/opentelemetry-stdout/src/logs/exporter.rs @@ -45,7 +45,7 @@ impl fmt::Debug for LogExporter { #[async_trait] impl opentelemetry_sdk::export::logs::LogExporter for LogExporter { /// Export spans to stdout - async fn export<'a>(&mut self, batch: Vec>) -> ExportResult { + async fn export<'a>(&mut self, batch: Vec>>) -> ExportResult { if let Some(writer) = &mut self.writer { // TODO - Avoid cloning logdata if it is borrowed. let log_data = crate::logs::transform::LogData::from(( diff --git a/opentelemetry-stdout/src/logs/transform.rs b/opentelemetry-stdout/src/logs/transform.rs index 2f3199bd25..4a55a8a1ee 100644 --- a/opentelemetry-stdout/src/logs/transform.rs +++ b/opentelemetry-stdout/src/logs/transform.rs @@ -16,7 +16,7 @@ pub struct LogData { impl From<( - Vec, + Vec>, &opentelemetry_sdk::Resource, )> for LogData { @@ -31,7 +31,7 @@ impl for sdk_log in sdk_logs { let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); let schema_url = sdk_log.instrumentation.schema_url.clone(); - let scope: Scope = sdk_log.instrumentation.clone().into(); + let scope: Scope = sdk_log.instrumentation.clone().into_owned().into(); let resource: Resource = sdk_resource.into(); let rl = resource_logs @@ -104,7 +104,7 @@ struct LogRecord { trace_id: Option, } -impl From for LogRecord { +impl From> for LogRecord { fn from(value: opentelemetry_sdk::export::logs::LogData) -> Self { LogRecord { attributes: { @@ -142,6 +142,7 @@ impl From for LogRecord { flags: value .record .trace_context + .as_ref() .map(|c| c.trace_flags.map(|f| f.to_u8())) .unwrap_or_default(), time_unix_nano: value.record.timestamp, @@ -154,8 +155,8 @@ impl From for LogRecord { .map(|u| u as u32) .unwrap_or_default(), dropped_attributes_count: 0, - severity_text: value.record.severity_text, - body: value.record.body.map(|a| a.into()), + severity_text: value.record.severity_text.clone(), + body: value.record.body.clone().map(|a| a.into()), } } }