Skip to content

feat: [Geneva Exporter] - Add Serializer #261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 68 commits into from
Jun 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
ff88eb7
initial code
lalitb May 14, 2025
803e38d
add ci
lalitb May 14, 2025
53cf8d6
fix CI
lalitb May 14, 2025
020b881
move script
lalitb May 14, 2025
9165297
fix ci
lalitb May 14, 2025
bd9c5a0
install boost
lalitb May 14, 2025
15b0a50
install haskell
lalitb May 14, 2025
8eeb563
fix ci
lalitb May 14, 2025
c77a33f
fix
lalitb May 14, 2025
9261c15
fix ci
lalitb May 14, 2025
93089d7
fix
lalitb May 14, 2025
17de005
revert ci workflow, exclude geneva
lalitb May 15, 2025
82cb160
fix
lalitb May 15, 2025
66804da
fix
lalitb May 15, 2025
6043450
Add CI
lalitb May 19, 2025
2bb2fae
fix CI
lalitb May 19, 2025
d8713c4
Merge branch 'main' into geneva-encode
lalitb May 19, 2025
b86bf47
fix CI
lalitb May 19, 2025
69b480a
fix ci
lalitb May 19, 2025
15f4e2b
fix ci
lalitb May 19, 2025
186cc5b
fix ci
lalitb May 19, 2025
adfc628
fix ci
lalitb May 19, 2025
338f82a
Fix CI
lalitb May 19, 2025
540951b
add CO
lalitb May 19, 2025
821292d
fix
lalitb May 19, 2025
f8913be
fix cargo
lalitb May 19, 2025
fe002bb
initial working version
lalitb May 19, 2025
b480bc8
add otlp mapper
lalitb May 19, 2025
94a063c
add exporter - for e2e
lalitb May 20, 2025
6541c28
Merge branch 'main' into geneva-encode
lalitb May 20, 2025
f77fb1b
Merge branch 'main' into geneva-encode
lalitb May 21, 2025
1bc9f7c
fix CI
lalitb May 21, 2025
5972673
fix
lalitb May 22, 2025
1696449
fix..
lalitb May 22, 2025
43bf4fc
fix
lalitb May 22, 2025
b1ea9fd
fix
lalitb May 22, 2025
be62be8
add direct otlp encoding
lalitb Jun 3, 2025
828c2d9
fix
lalitb Jun 3, 2025
11d7b9a
add TODOs
lalitb Jun 3, 2025
549eab6
Merge branch 'main' into geneva-encode
lalitb Jun 3, 2025
6b25a8f
Update Cargo.toml
lalitb Jun 3, 2025
6303b6d
Update otlp_encoder.rs
lalitb Jun 3, 2025
e09eeae
Merge branch 'main' into geneva-encode
lalitb Jun 9, 2025
46f3eb1
USING PURE RUST based BOND
lalitb Jun 10, 2025
4c6a99b
add batching based on SCHEMA and EVENT_NAME
lalitb Jun 11, 2025
c8d510b
Merge branch 'main' into geneva-encode
lalitb Jun 11, 2025
cf9c8d5
Update ci.yml
lalitb Jun 11, 2025
46e7e16
Update Cargo.toml
lalitb Jun 11, 2025
595d3f2
Update Cargo.toml
lalitb Jun 12, 2025
c1be071
Update Cargo.toml
lalitb Jun 12, 2025
3b0e4a6
Update client.rs
lalitb Jun 12, 2025
4e1d1bf
Update uploader.rs
lalitb Jun 12, 2025
9415e66
fix workspace
lalitb Jun 12, 2025
c7d1f5a
fix lint
lalitb Jun 12, 2025
db9b6a9
Update client.rs
lalitb Jun 12, 2025
fe9942d
Add tests for encoder
lalitb Jun 13, 2025
425c18b
Merge branch 'geneva-encode' of github.com:lalitb/opentelemetry-rust-…
lalitb Jun 13, 2025
51168bd
reorg
lalitb Jun 13, 2025
2668490
fmt, println r4emove
lalitb Jun 13, 2025
e5ab562
clippy warning for approximate conts
lalitb Jun 13, 2025
8828f2a
review comment - first commit
lalitb Jun 20, 2025
5bba0fe
review comments - second commit
lalitb Jun 20, 2025
772a905
Merge branch 'main' into geneva-encode
lalitb Jun 20, 2025
d22e802
Update opentelemetry-exporter-geneva/geneva-uploader/src/payload_enco…
lalitb Jun 20, 2025
cdbf6e4
Update opentelemetry-exporter-geneva/geneva-uploader/src/payload_enco…
lalitb Jun 21, 2025
c38f233
review comments - third commit
lalitb Jun 21, 2025
4b664b0
Merge branch 'geneva-encode' of github.com:lalitb/opentelemetry-rust-…
lalitb Jun 21, 2025
e9ab455
Fix test with correct length of traceid
lalitb Jun 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ rust-version = "1.75.0"

[dependencies]


[lints]
workspace = true
workspace = true
5 changes: 4 additions & 1 deletion opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ native-tls = "0.2"
thiserror = "2.0"
chrono = "0.4"
url = "2.2"
md5 = "0.7.0"
hex = "0.4"
lz4_flex = { version = ">=0.11.0, <0.11.4", features = ["safe-encode"], default-features = false }

[features]
self_signed_certs = [] # Empty by default for security
mock_auth = [] # Disabled by default. Not to be enabled in the prod release.
default = ["self_signed_certs"] # TODO - remove this feature before release

[dev-dependencies]
Expand All @@ -34,4 +37,4 @@ num_cpus = "1.16"
lz4_flex = { version = "0.11" }

[lints]
workspace = true
workspace = true
106 changes: 106 additions & 0 deletions opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//! High-level GenevaClient for user code. Wraps config_service and ingestion_service.

use crate::config_service::client::{AuthMethod, GenevaConfigClient, GenevaConfigClientConfig};
use crate::ingestion_service::uploader::{GenevaUploader, GenevaUploaderConfig};
use crate::payload_encoder::lz4_chunked_compression::lz4_chunked_compression;
use crate::payload_encoder::otlp_encoder::OtlpEncoder;
use opentelemetry_proto::tonic::logs::v1::ResourceLogs;
use std::sync::Arc;

/// Configuration for GenevaClient (user-facing)
#[derive(Clone, Debug)]
pub struct GenevaClientConfig {
pub endpoint: String,
pub environment: String,
pub account: String,
pub namespace: String,
pub region: String,
pub config_major_version: u32,
pub auth_method: AuthMethod,
pub tenant: String,
pub role_name: String,
pub role_instance: String,
// Add event name/version here if constant, or per-upload if you want them per call.
}

/// Main user-facing client for Geneva ingestion.
#[derive(Clone)]
pub struct GenevaClient {
uploader: Arc<GenevaUploader>,
encoder: OtlpEncoder,
metadata: String,
}

impl GenevaClient {
/// Construct a new client with minimal configuration. Fetches and caches ingestion info as needed.
pub async fn new(cfg: GenevaClientConfig) -> Result<Self, String> {
// Build config client config
let config_client_config = GenevaConfigClientConfig {
endpoint: cfg.endpoint,
environment: cfg.environment.clone(),
account: cfg.account,
namespace: cfg.namespace.clone(),
region: cfg.region,
config_major_version: cfg.config_major_version,
auth_method: cfg.auth_method,
};
let config_client = Arc::new(
GenevaConfigClient::new(config_client_config)
.map_err(|e| format!("GenevaConfigClient init failed: {e}"))?,

Check warning on line 49 in opentelemetry-exporter-geneva/geneva-uploader/src/client.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/client.rs#L36-L49

Added lines #L36 - L49 were not covered by tests
);

let source_identity = format!(
"Tenant={}/Role={}/RoleInstance={}",
cfg.tenant, cfg.role_name, cfg.role_instance
);

let schema_ids =
"c1ce0ecea020359624c493bbe97f9e80;0da22cabbee419e000541a5eda732eb3".to_string(); // TODO - find the actual value to be populated

// Uploader config
let uploader_config = GenevaUploaderConfig {
namespace: cfg.namespace.clone(),
source_identity,
environment: cfg.environment,
schema_ids,
};

Check warning on line 66 in opentelemetry-exporter-geneva/geneva-uploader/src/client.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/client.rs#L52-L66

Added lines #L52 - L66 were not covered by tests

let uploader = GenevaUploader::from_config_client(config_client, uploader_config)
.await
.map_err(|e| format!("GenevaUploader init failed: {e}"))?;
let metadata = format!(
"namespace={}/eventVersion={}/tenant={}/role={}/roleinstance={}",
cfg.namespace,
"Ver1v0", // You can replace this with a cfg field if version should be dynamic
cfg.tenant,
cfg.role_name,
cfg.role_instance,
);
Ok(Self {
uploader: Arc::new(uploader),
encoder: OtlpEncoder::new(),
metadata,
})
}

Check warning on line 84 in opentelemetry-exporter-geneva/geneva-uploader/src/client.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/client.rs#L68-L84

Added lines #L68 - L84 were not covered by tests

/// Upload OTLP logs (as ResourceLogs).
pub async fn upload_logs(&self, logs: &[ResourceLogs]) -> Result<(), String> {
let log_iter = logs
.iter()
.flat_map(|resource_log| resource_log.scope_logs.iter())
.flat_map(|scope_log| scope_log.log_records.iter());
let blobs = self.encoder.encode_log_batch(log_iter, &self.metadata);
for (_schema_id, event_name, encoded_blob, _row_count) in blobs {

Check warning on line 93 in opentelemetry-exporter-geneva/geneva-uploader/src/client.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/client.rs#L87-L93

Added lines #L87 - L93 were not covered by tests
// TODO - log encoded_blob for debugging
let compressed_blob = lz4_chunked_compression(&encoded_blob)
.map_err(|e| format!("LZ4 compression failed: {e}"))?;

Check warning on line 96 in opentelemetry-exporter-geneva/geneva-uploader/src/client.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/client.rs#L95-L96

Added lines #L95 - L96 were not covered by tests
// TODO - log compressed_blob for debugging
let event_version = "Ver2v0"; // TODO - find the actual value to be populated
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see "Ver1v0" being used in GenevaClient::new() method. Should these two have been the same string? If yes, maybe use a const variable at both places for the time being.

self.uploader
.upload(compressed_blob, &event_name, event_version)
.await
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would cause sequential exports of the blobs. We could do better than that by issuing the required upload calls and await them all together instead of doing it one by one.

.map_err(|e| format!("Geneva upload failed: {e}"))?;

Check warning on line 102 in opentelemetry-exporter-geneva/geneva-uploader/src/client.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/client.rs#L98-L102

Added lines #L98 - L102 were not covered by tests
}
Ok(())
}

Check warning on line 105 in opentelemetry-exporter-geneva/geneva-uploader/src/client.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/client.rs#L104-L105

Added lines #L104 - L105 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
/// ```
#[allow(dead_code)]
#[derive(Clone, Debug)]
pub(crate) enum AuthMethod {
pub enum AuthMethod {
/// Certificate-based authentication
///
/// # Arguments
Expand All @@ -57,6 +57,8 @@
///
/// Note(TODO): This is not yet implemented.
ManagedIdentity,
#[cfg(feature = "mock_auth")]
MockAuth, // No authentication, used for testing purposes
}

#[derive(Debug, Error)]
Expand Down Expand Up @@ -249,6 +251,12 @@
"Managed Identity authentication is not implemented yet".into(),
));
}
#[cfg(feature = "mock_auth")]
AuthMethod::MockAuth => {
// Mock authentication for testing purposes, no actual auth needed
// Just use the default client builder
eprintln!("WARNING: Using MockAuth for GenevaConfigClient. This should only be used in tests!");
}

Check warning on line 259 in opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs#L255-L259

Added lines #L255 - L259 were not covered by tests
}

let agent_identity = "GenevaUploader";
Expand Down Expand Up @@ -432,7 +440,7 @@
.send()
.await
.map_err(GenevaConfigClientError::Http)?;

// Check if the response is successful
let status = response.status();
let body = response.text().await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ impl GenevaUploader {
// TODO - Maintain this as url-encoded in config service to avoid conversion here
let encoded_monitoring_endpoint: String =
byte_serialize(monitoring_endpoint.as_bytes()).collect();

let encoded_source_identity: String =
byte_serialize(self.config.source_identity.as_bytes()).collect();

Expand Down Expand Up @@ -195,7 +194,6 @@ impl GenevaUploader {
auth_info.endpoint.trim_end_matches('/'),
upload_uri
);

// Send the upload request
let response = self
.http_client
Expand All @@ -208,13 +206,13 @@ impl GenevaUploader {
.send()
.await
.map_err(GenevaUploaderError::Http)?;

let status = response.status();
let body = response.text().await.map_err(GenevaUploaderError::Http)?;

if status == reqwest::StatusCode::ACCEPTED {
let ingest_response: IngestionResponse =
serde_json::from_str(&body).map_err(GenevaUploaderError::SerdeJson)?;

Ok(ingest_response)
} else {
Err(GenevaUploaderError::UploadFailed {
Expand Down
10 changes: 5 additions & 5 deletions opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
mod config_service;
pub mod ingestion_service;
mod ingestion_service;
mod payload_encoder;

mod uploader;
pub mod client;

#[allow(unused_imports)]
pub(crate) use config_service::client::{
AuthMethod, GenevaConfigClient, GenevaConfigClientConfig, GenevaConfigClientError,
IngestionGatewayInfo,
GenevaConfigClient, GenevaConfigClientConfig, GenevaConfigClientError, IngestionGatewayInfo,
};

#[allow(unused_imports)]
pub(crate) use ingestion_service::uploader::{
GenevaUploader, GenevaUploaderConfig, GenevaUploaderError, IngestionResponse, Result,
};

pub use uploader::{create_uploader, GenevaUploader as Uploader};
pub use client::{GenevaClient, GenevaClientConfig};
pub use config_service::client::AuthMethod;
Loading