Skip to content

Commit 99dfc3c

Browse files
committed
Encode DD traces into provided buffer.
1 parent 5308844 commit 99dfc3c

File tree

4 files changed

+131
-114
lines changed

4 files changed

+131
-114
lines changed

opentelemetry-datadog/src/exporter/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,9 @@ impl DatadogExporter {
9797
) -> Result<http::Request<Vec<u8>>, TraceError> {
9898
let traces: Vec<&[SpanData]> = group_into_traces(&mut batch);
9999
let trace_count = traces.len();
100-
let data = self.api_version.encode(
100+
let mut buffer = Vec::with_capacity(trace_count * 512);
101+
self.api_version.encode(
102+
&mut buffer,
101103
&self.model_config,
102104
traces,
103105
&self.mapping,
@@ -114,7 +116,7 @@ impl DatadogExporter {
114116
DATADOG_META_TRACER_VERSION_HEADER,
115117
env!("CARGO_PKG_VERSION"),
116118
)
117-
.body(data)
119+
.body(buffer)
118120
.map_err::<Error, _>(Into::into)?;
119121

120122
Ok(req)

opentelemetry-datadog/src/exporter/model/mod.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -149,16 +149,18 @@ impl ApiVersion {
149149
}
150150
}
151151

152-
pub(crate) fn encode(
152+
pub(crate) fn encode<W: std::io::Write>(
153153
self,
154+
writer: &mut W,
154155
model_config: &ModelConfig,
155156
traces: Vec<&[trace::SpanData]>,
156157
mapping: &Mapping,
157158
unified_tags: &UnifiedTags,
158159
resource: Option<&Resource>,
159-
) -> Result<Vec<u8>, Error> {
160+
) -> Result<(), Error> {
160161
match self {
161162
Self::Version03 => v03::encode(
163+
writer,
162164
model_config,
163165
traces,
164166
|span, config| match &mapping.service_name {
@@ -176,6 +178,7 @@ impl ApiVersion {
176178
resource,
177179
),
178180
Self::Version05 => v05::encode(
181+
writer,
179182
model_config,
180183
traces,
181184
|span, config| match &mapping.service_name {
@@ -256,13 +259,17 @@ pub(crate) mod tests {
256259
..Default::default()
257260
};
258261
let resource = Resource::new(vec![KeyValue::new("host.name", "test")]);
259-
let encoded = base64::encode(ApiVersion::Version03.encode(
262+
263+
let mut buffer = Vec::new();
264+
ApiVersion::Version03.encode(
265+
&mut buffer,
260266
&model_config,
261267
traces.iter().map(|x| &x[..]).collect(),
262268
&Mapping::empty(),
263269
&UnifiedTags::new(),
264270
Some(&resource),
265-
)?);
271+
)?;
272+
let encoded = base64::encode(buffer);
266273

267274
assert_eq!(encoded.as_str(), "kZGMpHR5cGWjd2Vip3NlcnZpY2Wsc2VydmljZV9uYW1lpG5hbWWpY29tcG9uZW\
268275
50qHJlc291cmNlqHJlc291cmNlqHRyYWNlX2lkzwAAAAAAAAAHp3NwYW5faWTPAAAAAAAAAGOpcGFyZW50X2lkzwAAAA\
@@ -286,13 +293,16 @@ pub(crate) mod tests {
286293
unified_tags.set_version(Some(String::from("test-version")));
287294
unified_tags.set_service(Some(String::from("test-service")));
288295

289-
let _encoded = base64::encode(ApiVersion::Version05.encode(
296+
let mut buffer = Vec::new();
297+
ApiVersion::Version05.encode(
298+
&mut buffer,
290299
&model_config,
291300
traces.iter().map(|x| &x[..]).collect(),
292301
&Mapping::empty(),
293302
&unified_tags,
294303
Some(&resource),
295-
)?);
304+
)?;
305+
let _encoded = base64::encode(&mut buffer);
296306

297307
// TODO: Need someone to generate the expected result or instructions to do so.
298308
// assert_eq!(encoded.as_str(), "kp6jd2VirHNlcnZpY2VfbmFtZaljb21wb25lbnSocmVzb3VyY2WpaG9zdC5uYW\

opentelemetry-datadog/src/exporter/model/v03.rs

Lines changed: 38 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,24 @@ use opentelemetry_sdk::export::trace::SpanData;
55
use opentelemetry_sdk::Resource;
66
use std::time::SystemTime;
77

8-
pub(crate) fn encode<S, N, R>(
8+
pub(crate) fn encode<S, N, R, W: std::io::Write>(
9+
writer: &mut W,
910
model_config: &ModelConfig,
1011
traces: Vec<&[SpanData]>,
1112
get_service_name: S,
1213
get_name: N,
1314
get_resource: R,
1415
resource: Option<&Resource>,
15-
) -> Result<Vec<u8>, Error>
16+
) -> Result<(), Error>
1617
where
1718
for<'a> S: Fn(&'a SpanData, &'a ModelConfig) -> &'a str,
1819
for<'a> N: Fn(&'a SpanData, &'a ModelConfig) -> &'a str,
1920
for<'a> R: Fn(&'a SpanData, &'a ModelConfig) -> &'a str,
2021
{
21-
let mut encoded = Vec::new();
22-
rmp::encode::write_array_len(&mut encoded, traces.len() as u32)?;
22+
rmp::encode::write_array_len(writer, traces.len() as u32)?;
2323

2424
for trace in traces.into_iter() {
25-
rmp::encode::write_array_len(&mut encoded, trace.len() as u32)?;
25+
rmp::encode::write_array_len(writer, trace.len() as u32)?;
2626

2727
for span in trace {
2828
// Safe until the year 2262 when Datadog will need to change their API
@@ -42,81 +42,78 @@ where
4242
for kv in &span.attributes {
4343
if kv.key.as_str() == "span.type" {
4444
span_type_found = true;
45-
rmp::encode::write_map_len(&mut encoded, 12)?;
46-
rmp::encode::write_str(&mut encoded, "type")?;
47-
rmp::encode::write_str(&mut encoded, kv.value.as_str().as_ref())?;
45+
rmp::encode::write_map_len(writer, 12)?;
46+
rmp::encode::write_str(writer, "type")?;
47+
rmp::encode::write_str(writer, kv.value.as_str().as_ref())?;
4848
break;
4949
}
5050
}
5151

5252
if !span_type_found {
53-
rmp::encode::write_map_len(&mut encoded, 11)?;
53+
rmp::encode::write_map_len(writer, 11)?;
5454
}
5555

5656
// Datadog span name is OpenTelemetry component name - see module docs for more information
57-
rmp::encode::write_str(&mut encoded, "service")?;
58-
rmp::encode::write_str(&mut encoded, get_service_name(span, model_config))?;
57+
rmp::encode::write_str(writer, "service")?;
58+
rmp::encode::write_str(writer, get_service_name(span, model_config))?;
5959

60-
rmp::encode::write_str(&mut encoded, "name")?;
61-
rmp::encode::write_str(&mut encoded, get_name(span, model_config))?;
60+
rmp::encode::write_str(writer, "name")?;
61+
rmp::encode::write_str(writer, get_name(span, model_config))?;
6262

63-
rmp::encode::write_str(&mut encoded, "resource")?;
64-
rmp::encode::write_str(&mut encoded, get_resource(span, model_config))?;
63+
rmp::encode::write_str(writer, "resource")?;
64+
rmp::encode::write_str(writer, get_resource(span, model_config))?;
6565

66-
rmp::encode::write_str(&mut encoded, "trace_id")?;
66+
rmp::encode::write_str(writer, "trace_id")?;
6767
rmp::encode::write_u64(
68-
&mut encoded,
68+
writer,
6969
u128::from_be_bytes(span.span_context.trace_id().to_bytes()) as u64,
7070
)?;
7171

72-
rmp::encode::write_str(&mut encoded, "span_id")?;
72+
rmp::encode::write_str(writer, "span_id")?;
7373
rmp::encode::write_u64(
74-
&mut encoded,
74+
writer,
7575
u64::from_be_bytes(span.span_context.span_id().to_bytes()),
7676
)?;
7777

78-
rmp::encode::write_str(&mut encoded, "parent_id")?;
79-
rmp::encode::write_u64(
80-
&mut encoded,
81-
u64::from_be_bytes(span.parent_span_id.to_bytes()),
82-
)?;
78+
rmp::encode::write_str(writer, "parent_id")?;
79+
rmp::encode::write_u64(writer, u64::from_be_bytes(span.parent_span_id.to_bytes()))?;
8380

84-
rmp::encode::write_str(&mut encoded, "start")?;
85-
rmp::encode::write_i64(&mut encoded, start)?;
81+
rmp::encode::write_str(writer, "start")?;
82+
rmp::encode::write_i64(writer, start)?;
8683

87-
rmp::encode::write_str(&mut encoded, "duration")?;
88-
rmp::encode::write_i64(&mut encoded, duration)?;
84+
rmp::encode::write_str(writer, "duration")?;
85+
rmp::encode::write_i64(writer, duration)?;
8986

90-
rmp::encode::write_str(&mut encoded, "error")?;
87+
rmp::encode::write_str(writer, "error")?;
9188
rmp::encode::write_i32(
92-
&mut encoded,
89+
writer,
9390
match span.status {
9491
Status::Error { .. } => 1,
9592
_ => 0,
9693
},
9794
)?;
9895

99-
rmp::encode::write_str(&mut encoded, "meta")?;
96+
rmp::encode::write_str(writer, "meta")?;
10097
rmp::encode::write_map_len(
101-
&mut encoded,
98+
writer,
10299
(span.attributes.len() + resource.map(|r| r.len()).unwrap_or(0)) as u32,
103100
)?;
104101
if let Some(resource) = resource {
105102
for (key, value) in resource.iter() {
106-
rmp::encode::write_str(&mut encoded, key.as_str())?;
107-
rmp::encode::write_str(&mut encoded, value.as_str().as_ref())?;
103+
rmp::encode::write_str(writer, key.as_str())?;
104+
rmp::encode::write_str(writer, value.as_str().as_ref())?;
108105
}
109106
}
110107
for kv in span.attributes.iter() {
111-
rmp::encode::write_str(&mut encoded, kv.key.as_str())?;
112-
rmp::encode::write_str(&mut encoded, kv.value.as_str().as_ref())?;
108+
rmp::encode::write_str(writer, kv.key.as_str())?;
109+
rmp::encode::write_str(writer, kv.value.as_str().as_ref())?;
113110
}
114111

115-
rmp::encode::write_str(&mut encoded, "metrics")?;
116-
rmp::encode::write_map_len(&mut encoded, 1)?;
117-
rmp::encode::write_str(&mut encoded, SAMPLING_PRIORITY_KEY)?;
112+
rmp::encode::write_str(writer, "metrics")?;
113+
rmp::encode::write_map_len(writer, 1)?;
114+
rmp::encode::write_str(writer, SAMPLING_PRIORITY_KEY)?;
118115
rmp::encode::write_f64(
119-
&mut encoded,
116+
writer,
120117
if span.span_context.is_sampled() {
121118
1.0
122119
} else {
@@ -126,5 +123,5 @@ where
126123
}
127124
}
128125

129-
Ok(encoded)
126+
Ok(())
130127
}

0 commit comments

Comments
 (0)