Skip to content

Commit

Permalink
switch to protobuf crate for datadog_agent source
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Jan 8, 2024
1 parent 26a17e3 commit 41bdefd
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 183 deletions.
33 changes: 33 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ paste = "1.0.14"
percent-encoding = { version = "2.3.1", default-features = false }
postgres-openssl = { version = "0.5.0", default-features = false, features = ["runtime"], optional = true }
pulsar = { version = "6.1.0", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true }
protobuf = { version = "3.3.0", default-features = false, features = ["with-bytes"], optional = true }
rand = { version = "0.8.5", default-features = false, features = ["small_rng"] }
rand_distr = { version = "0.4.3", default-features = false }
rdkafka = { version = "0.35.0", default-features = false, features = ["tokio", "libz", "ssl", "zstd"], optional = true }
Expand Down Expand Up @@ -339,6 +340,7 @@ nix = { version = "0.26.2", default-features = false, features = ["socket", "sig
[build-dependencies]
prost-build = { version = "0.12", default-features = false, optional = true }
tonic-build = { version = "0.10", default-features = false, features = ["transport", "prost"], optional = true }
protobuf-codegen = { version = "3.3.0", default-features = false, optional = true }
# update 'openssl_version' in website/config.toml whenever <major.minor> version changes
openssl-src = { version = "300", default-features = false, features = ["force-engine", "legacy"] }

Expand Down Expand Up @@ -454,6 +456,7 @@ aws-core = [

# Anything that requires Protocol Buffers.
protobuf-build = ["dep:tonic-build", "dep:prost-build"]
dd-agent-source-protobuf-build = ["dep:protobuf-codegen"]

gcp = ["dep:base64", "dep:goauth", "dep:smpl_jwt"]

Expand Down Expand Up @@ -516,7 +519,7 @@ sources-aws_ecs_metrics = ["sources-utils-http-client"]
sources-aws_kinesis_firehose = ["dep:base64", "dep:infer"]
sources-aws_s3 = ["aws-core", "dep:aws-sdk-sqs", "dep:aws-sdk-s3", "dep:semver", "dep:async-compression", "sources-aws_sqs", "tokio-util/io"]
sources-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
sources-datadog_agent = ["sources-utils-http-error", "protobuf-build"]
sources-datadog_agent = ["sources-utils-http-error", "dep:protobuf", "protobuf-build", "dd-agent-source-protobuf-build"]
sources-demo_logs = ["dep:fakedata"]
sources-dnstap = ["dep:base64", "dep:hickory-proto", "dep:dnsmsg-parser", "protobuf-build"]
sources-docker_logs = ["docker"]
Expand Down
31 changes: 31 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,37 @@ fn main() {
.unwrap();
}

#[cfg(feature = "dd-agent-source-protobuf-build")]
{
// We're doing Datadog Agent-specific codegen here for Protocol Buffers handling in the
// `datadog_agent` source as `protobuf` supports using `bytes` for both byte buffers _and_
// strings, which we utilize to attempt to do more zero-copy deserialization and this
// improve our memory utilization for what are otherwise string-heavy (and thus
// allocation-heavy) payloads.
println!("cargo:rerun-if-changed=proto/dd_trace.proto");
println!("cargo:rerun-if-changed=proto/ddsketch_full.proto");
println!("cargo:rerun-if-changed=proto/dd_metric.proto");

let codegen_customize = protobuf_codegen::Customize::default()
.tokio_bytes(true)
.tokio_bytes_for_string(true)
.generate_getter(true)
.gen_mod_rs(true)
.lite_runtime(true);

protobuf_codegen::Codegen::new()
.protoc()
.includes(&["proto"])

Check failure on line 180 in build.rs

View workflow job for this annotation

GitHub Actions / Checks

the borrowed expression implements the required traits
.inputs(&[

Check failure on line 181 in build.rs

View workflow job for this annotation

GitHub Actions / Checks

the borrowed expression implements the required traits
"proto/ddsketch_full.proto",
"proto/dd_metric.proto",
"proto/dd_trace.proto",
])
.cargo_out_dir("dd-agent-protos")
.customize(codegen_customize)
.run_from_script();
}

// We keep track of which environment variables we slurp in, and then emit stanzas at the end to
// inform Cargo when it needs to rerun this build script. This allows us to avoid rerunning it
// every single time unless something _actually_ changes.
Expand Down
44 changes: 27 additions & 17 deletions src/sources/datadog_agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{num::NonZeroU32, sync::Arc};
use std::{num::NonZeroU32, ops::Deref as _, sync::Arc};

use bytes::Bytes;
use chrono::{TimeZone, Utc};
use http::StatusCode;
use prost::Message;
use protobuf::{Chars, Message as _};
use serde::{Deserialize, Serialize};
use warp::{filters::BoxedFilter, path, path::FullPath, reply::Response, Filter};

Expand All @@ -25,8 +25,9 @@ use crate::{
schema,
sources::{
datadog_agent::{
ddmetric_proto::{metric_payload, Metadata, MetricPayload, SketchPayload},
handle_request, ApiKeyQueryParams, DatadogAgentSource,
handle_request,
proto::metrics::{metric_payload, Metadata, MetricPayload, SketchPayload},
ApiKeyQueryParams, DatadogAgentSource,
},
util::{extract_tag_key_and_value, ErrorMessage},
},
Expand Down Expand Up @@ -256,7 +257,7 @@ pub(crate) fn decode_ddseries_v2(
frame: Bytes,
api_key: &Option<Arc<str>>,
) -> crate::Result<Vec<Event>> {
let payload = MetricPayload::decode(frame)?;
let payload = MetricPayload::parse_from_tokio_bytes(&frame)?;
let decoded_metrics: Vec<Event> = payload
.series
.into_iter()
Expand Down Expand Up @@ -296,21 +297,21 @@ pub(crate) fn decode_ddseries_v2(
serie.resources.into_iter().for_each(|r| {
// As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L180-L189
// the hostname can be found in MetricSeries::resources and that is the only value stored there.
if r.r#type.eq("host") {
if r.type_().eq("host") {
log_schema()
.host_key()
.and_then(|key| tags.replace(key.to_string(), r.name));
} else {
// But to avoid losing information if this situation changes, any other resource type/name will be saved in the tags map
tags.replace(format!("resource.{}", r.r#type), r.name);
tags.replace(format!("resource.{}", r.type_()), r.name);
}
});
(!serie.source_type_name.is_empty())
.then(|| tags.replace("source_type_name", serie.source_type_name));
// As per https://github.com/DataDog/datadog-agent/blob/a62ac9fb13e1e5060b89e731b8355b2b20a07c5b/pkg/serializer/internal/metrics/iterable_series.go#L224
// serie.unit is omitted
match metric_payload::MetricType::try_from(serie.r#type) {
Ok(metric_payload::MetricType::Count) => serie
match serie.type_.enum_value() {
Ok(metric_payload::MetricType::COUNT) => serie
.points
.iter()
.map(|dd_point| {
Expand All @@ -331,7 +332,7 @@ pub(crate) fn decode_ddseries_v2(
.with_namespace(namespace)
})
.collect::<Vec<_>>(),
Ok(metric_payload::MetricType::Gauge) => serie
Ok(metric_payload::MetricType::GAUGE) => serie
.points
.iter()
.map(|dd_point| {
Expand All @@ -353,7 +354,7 @@ pub(crate) fn decode_ddseries_v2(
.with_interval_ms(non_rate_interval)
})
.collect::<Vec<_>>(),
Ok(metric_payload::MetricType::Rate) => serie
Ok(metric_payload::MetricType::RATE) => serie
.points
.iter()
.map(|dd_point| {
Expand All @@ -380,8 +381,8 @@ pub(crate) fn decode_ddseries_v2(
.with_namespace(namespace)
})
.collect::<Vec<_>>(),
Ok(metric_payload::MetricType::Unspecified) | Err(_) => {
warn!("Unspecified metric type ({}).", serie.r#type);
Ok(metric_payload::MetricType::UNSPECIFIED) | Err(_) => {
warn!("Unspecified metric type ({}).", serie.type_.value());
Vec::new()
}
}
Expand Down Expand Up @@ -433,16 +434,25 @@ fn decode_datadog_series_v1(
Ok(decoded_metrics)
}

fn into_metric_tags(tags: Vec<String>) -> MetricTags {
tags.iter().map(extract_tag_key_and_value).collect()
fn into_metric_tags(tags: Vec<Chars>) -> MetricTags {
// TODO: inefficient conversion (in `extract_tag_key_and_value`) to `String` when `Bytes` would
// suffice. address once we have solution for getting original `Bytes` from `Chars`~`
tags.iter()
.map(|tag| extract_tag_key_and_value(tag.deref()))
.collect()
}

fn into_vector_metric(
dd_metric: DatadogSeriesMetric,
api_key: Option<Arc<str>>,
schema_definition: &Arc<schema::Definition>,
) -> Vec<Event> {
let mut tags = into_metric_tags(dd_metric.tags.unwrap_or_default());
let mut tags: MetricTags = dd_metric
.tags
.unwrap_or_default()
.into_iter()
.map(extract_tag_key_and_value)
.collect();

if let Some(key) = log_schema().host_key() {
dd_metric
Expand Down Expand Up @@ -549,7 +559,7 @@ pub(crate) fn decode_ddsketch(
frame: Bytes,
api_key: &Option<Arc<str>>,
) -> crate::Result<Vec<Event>> {
let payload = SketchPayload::decode(frame)?;
let payload = SketchPayload::parse_from_tokio_bytes(&frame)?;
// payload.metadata is always empty for payload coming from dd agents
Ok(payload
.sketches
Expand Down
11 changes: 1 addition & 10 deletions src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,9 @@ mod tests;

pub mod logs;
pub mod metrics;
pub(crate) mod proto;
pub mod traces;

#[allow(warnings, clippy::pedantic, clippy::nursery)]
pub(crate) mod ddmetric_proto {
include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
}

#[allow(warnings)]
pub(crate) mod ddtrace_proto {
include!(concat!(env!("OUT_DIR"), "/dd_trace.rs"));
}

use std::convert::Infallible;
use std::time::Duration;
use std::{fmt::Debug, io::Read, net::SocketAddr, sync::Arc};
Expand Down
12 changes: 12 additions & 0 deletions src/sources/datadog_agent/proto.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
mod include {
include!(concat!(env!("OUT_DIR"), "/dd-agent-protos/mod.rs"));
}

pub mod metrics {
pub use super::include::dd_metric::*;
pub use super::include::ddsketch_full::*;
}

pub mod traces {
pub use super::include::dd_trace::*;
}
Loading

0 comments on commit 41bdefd

Please sign in to comment.