Skip to content

Set resource sync #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ now use `.with_resource(RESOURCE::default())` to configure Resource when using
- Fixing the OTLP HTTP/JSON exporter. [#1882](https://github.com/open-telemetry/opentelemetry-rust/pull/1882) - The exporter was broken in the
previous release.
- **Breaking** [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) The OTLP logs exporter now overrides the [InstrumentationScope::name](https://github.com/open-telemetry/opentelemetry-proto/blob/b3060d2104df364136d75a35779e6bd48bac449a/opentelemetry/proto/common/v1/common.proto#L73) field with the `target` from `LogRecord`, if target is populated.
- Groups batch of `LogRecord` and `Span` by their resource and instrumentation scope before exporting, for better efficiency [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873).


## v0.16.0

Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ impl LogExporter for OtlpHttpClient {
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let (body, content_type) = { self.build_logs_export_body(owned_batch, &self.resource)? };
let (body, content_type) = { self.build_logs_export_body(owned_batch)? };
let mut request = http::Request::builder()
.method(Method::POST)
.uri(&self.collector_endpoint)
Expand Down Expand Up @@ -57,7 +57,8 @@ impl LogExporter for OtlpHttpClient {
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> LogResult<()> {
self.resource = resource.into();
Ok(())
}
}
22 changes: 7 additions & 15 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use crate::{
use http::{HeaderName, HeaderValue, Uri};
use opentelemetry_http::HttpClient;
use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema;

#[cfg(feature = "logs")]
use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;
#[cfg(feature = "trace")]
use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;
#[cfg(feature = "logs")]
use opentelemetry_sdk::export::logs::LogData;
#[cfg(feature = "trace")]
Expand Down Expand Up @@ -307,16 +310,9 @@ impl OtlpHttpClient {
fn build_trace_export_body(
&self,
spans: Vec<SpanData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
) -> opentelemetry::trace::TraceResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::{
collector::trace::v1::ExportTraceServiceRequest, trace::v1::ResourceSpans,
};

let resource_spans = spans
.into_iter()
.map(|span| ResourceSpans::new(span, resource))
.collect::<Vec<_>>();
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
let resource_spans = group_spans_by_resource_and_scope(spans, &self.resource);

let req = ExportTraceServiceRequest { resource_spans };
match self.protocol {
Expand All @@ -333,13 +329,9 @@ impl OtlpHttpClient {
fn build_logs_export_body(
&self,
logs: Vec<LogData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
) -> opentelemetry::logs::LogResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
let resource_logs = logs
.into_iter()
.map(|log_event| (log_event, resource).into())
.collect::<Vec<_>>();
let resource_logs = group_logs_by_resource_and_scope(logs, &self.resource);
let req = ExportLogsServiceRequest { resource_logs };

match self.protocol {
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl SpanExporter for OtlpHttpClient {
Err(err) => return Box::pin(std::future::ready(Err(err))),
};

let (body, content_type) = match self.build_trace_export_body(batch, &self.resource) {
let (body, content_type) = match self.build_trace_export_body(batch) {
Ok(body) => body,
Err(e) => return Box::pin(std::future::ready(Err(e))),
};
Expand Down Expand Up @@ -67,7 +67,8 @@ impl SpanExporter for OtlpHttpClient {
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> ExportResult {
self.resource = resource.into();
Ok(())
}
}
21 changes: 11 additions & 10 deletions opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use opentelemetry_proto::tonic::collector::logs::v1::{
use opentelemetry_sdk::export::logs::{LogData, LogExporter};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope;

use super::BoxInterceptor;

pub(crate) struct TonicLogsClient {
Expand Down Expand Up @@ -65,15 +67,13 @@ impl LogExporter for TonicLogsClient {
None => return Err(LogError::Other("exporter is already shut down".into())),
};

// TODO: Avoid cloning here.
let resource_logs = {
batch
.into_iter()
.map(|log_data_cow| (log_data_cow.into_owned()))
.map(|log_data| (log_data, &self.resource))
.map(Into::into)
.collect()
};
//TODO: avoid cloning here.
let owned_batch = batch
.into_iter()
.map(|cow_log_data| cow_log_data.into_owned()) // Converts Cow to owned LogData
.collect::<Vec<LogData>>();

let resource_logs = group_logs_by_resource_and_scope(owned_batch, &self.resource);

client
.export(Request::from_parts(
Expand All @@ -91,7 +91,8 @@ impl LogExporter for TonicLogsClient {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> LogResult<()> {
self.resource = resource.into();
Ok(())
}
}
14 changes: 5 additions & 9 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use opentelemetry::trace::TraceError;
use opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
};
use opentelemetry_proto::tonic::trace::v1::ResourceSpans;
use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use opentelemetry_proto::transform::trace::tonic::group_spans_by_resource_and_scope;

use super::BoxInterceptor;

pub(crate) struct TonicTracesClient {
Expand Down Expand Up @@ -71,13 +72,7 @@ impl SpanExporter for TonicTracesClient {
}
};

// TODO: Avoid cloning here.
let resource_spans = {
batch
.into_iter()
.map(|log_data| ResourceSpans::new(log_data, &self.resource))
.collect()
};
let resource_spans = group_spans_by_resource_and_scope(batch, &self.resource);

Box::pin(async move {
client
Expand All @@ -97,7 +92,8 @@ impl SpanExporter for TonicTracesClient {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> ExportResult {
self.resource = resource.into();
Ok(())
}
}
7 changes: 5 additions & 2 deletions opentelemetry-otlp/src/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ impl opentelemetry_sdk::export::logs::LogExporter for LogExporter {
self.client.export(batch).await
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.client.set_resource(resource);
fn set_resource(
&mut self,
resource: &opentelemetry_sdk::Resource,
) -> opentelemetry::logs::LogResult<()> {
self.client.set_resource(resource)
}
}

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
self.0.export(batch)
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.0.set_resource(resource);
fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) -> ExportResult {
self.0.set_resource(resource)
}
}
2 changes: 2 additions & 0 deletions opentelemetry-proto/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## vNext

- Bump MSRV to 1.70 [1864](https://github.com/open-telemetry/opentelemetry-rust/pull/1874)
- Group log and Span batch by their resource and instrumentation scope before exporting [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873).
- Introduced `group_logs_by_resource_and_scope()` and `group_spans_by_resource_and_scope()` methods to group logs and spans by the resource and scope respectively.

## v0.6.0

Expand Down
6 changes: 4 additions & 2 deletions opentelemetry-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ path = "tests/json_deserialize.rs"


[features]
default = []
default = ["full"]

full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde"]

Expand All @@ -42,6 +42,7 @@ trace = ["opentelemetry/trace", "opentelemetry_sdk/trace"]
metrics = ["opentelemetry/metrics", "opentelemetry_sdk/metrics"]
logs = ["opentelemetry/logs", "opentelemetry_sdk/logs"]
zpages = ["trace"]
testing = ["opentelemetry/testing"]

# add ons
with-schemars = ["schemars"]
Expand All @@ -57,7 +58,8 @@ serde = { workspace = true, optional = true, features = ["serde_derive"] }
hex = { version = "0.4.3", optional = true }

[dev-dependencies]
opentelemetry = { version = "0.23", features = ["testing"], path = "../opentelemetry" }
tonic-build = { workspace = true }
prost-build = { workspace = true }
tempfile = "3.3.0"
serde_json = { workspace = true }
serde_json = { workspace = true }
126 changes: 125 additions & 1 deletion opentelemetry-proto/src/transform/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@
pub mod tonic {
use crate::{
tonic::{
common::v1::{any_value::Value, AnyValue, ArrayValue, KeyValue, KeyValueList},
common::v1::{
any_value::Value, AnyValue, ArrayValue, InstrumentationScope, KeyValue,
KeyValueList,
},
logs::v1::{LogRecord, ResourceLogs, ScopeLogs, SeverityNumber},
resource::v1::Resource,
Attributes,
},
transform::common::{to_nanos, tonic::ResourceAttributesWithSchema},
};
use opentelemetry::logs::{AnyValue as LogsAnyValue, Severity};
use std::borrow::Cow;
use std::collections::HashMap;

impl From<LogsAnyValue> for AnyValue {
fn from(value: LogsAnyValue) -> Self {
Expand Down Expand Up @@ -143,4 +148,123 @@ pub mod tonic {
}
}
}

pub fn group_logs_by_resource_and_scope(
logs: Vec<opentelemetry_sdk::export::logs::LogData>,
resource: &ResourceAttributesWithSchema,
) -> Vec<ResourceLogs> {
// Group logs by target or instrumentation name
let scope_map = logs.iter().fold(
HashMap::new(),
|mut scope_map: HashMap<
Cow<'static, str>,
Vec<&opentelemetry_sdk::export::logs::LogData>,
>,
log| {
let key = log
.record
.target
.clone()
.unwrap_or_else(|| log.instrumentation.name.clone());
scope_map.entry(key).or_default().push(log);
scope_map
},
);

let scope_logs = scope_map
.into_iter()
.map(|(key, log_data)| ScopeLogs {
scope: Some(InstrumentationScope::from((
&log_data.first().unwrap().instrumentation,
Some(key),
))),
schema_url: resource.schema_url.clone().unwrap_or_default(),
log_records: log_data
.into_iter()
.map(|log_data| log_data.record.clone().into())
.collect(),
})
.collect();

vec![ResourceLogs {
resource: Some(Resource {
attributes: resource.attributes.0.clone(),
dropped_attributes_count: 0,
}),
scope_logs,
schema_url: resource.schema_url.clone().unwrap_or_default(),
}]
}
}

#[cfg(test)]
mod tests {
use crate::transform::common::tonic::ResourceAttributesWithSchema;
use opentelemetry::logs::LogRecord as _;
use opentelemetry_sdk::export::logs::LogData;
use opentelemetry_sdk::{logs::LogRecord, Resource};
use std::time::SystemTime;

fn create_test_log_data(instrumentation_name: &str, _message: &str) -> LogData {
let mut logrecord = LogRecord::default();
logrecord.set_timestamp(SystemTime::now());
logrecord.set_observed_timestamp(SystemTime::now());
LogData {
instrumentation: opentelemetry_sdk::InstrumentationLibrary::builder(
instrumentation_name.to_string(),
)
.build(),
record: logrecord,
}
}

#[test]
fn test_group_logs_by_resource_and_scope_single_scope() {
let resource = Resource::default();
let log1 = create_test_log_data("test-lib", "Log 1");
let log2 = create_test_log_data("test-lib", "Log 2");

let logs = vec![log1, log2];
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema

let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
assert_eq!(resource_logs.scope_logs.len(), 1);

let scope_logs = &resource_logs.scope_logs[0];
assert_eq!(scope_logs.log_records.len(), 2);
}

#[test]
fn test_group_logs_by_resource_and_scope_multiple_scopes() {
let resource = Resource::default();
let log1 = create_test_log_data("lib1", "Log 1");
let log2 = create_test_log_data("lib2", "Log 2");

let logs = vec![log1, log2];
let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
let grouped_logs =
crate::transform::logs::tonic::group_logs_by_resource_and_scope(logs, &resource);

assert_eq!(grouped_logs.len(), 1);
let resource_logs = &grouped_logs[0];
assert_eq!(resource_logs.scope_logs.len(), 2);

let scope_logs_1 = &resource_logs
.scope_logs
.iter()
.find(|scope| scope.scope.as_ref().unwrap().name == "lib1")
.unwrap();
let scope_logs_2 = &resource_logs
.scope_logs
.iter()
.find(|scope| scope.scope.as_ref().unwrap().name == "lib2")
.unwrap();

assert_eq!(scope_logs_1.log_records.len(), 1);
assert_eq!(scope_logs_2.log_records.len(), 1);
}
}
Loading
Loading