diff --git a/opentelemetry-exporter-geneva/geneva-uploader-ffi/Cargo.toml b/opentelemetry-exporter-geneva/geneva-uploader-ffi/Cargo.toml index 60b957377..164f7ac11 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader-ffi/Cargo.toml +++ b/opentelemetry-exporter-geneva/geneva-uploader-ffi/Cargo.toml @@ -7,6 +7,5 @@ rust-version = "1.75.0" [dependencies] - [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml index 3866c56dc..d45cef9b9 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml +++ b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml @@ -17,10 +17,13 @@ native-tls = "0.2" thiserror = "2.0" chrono = "0.4" url = "2.2" +md5 = "0.7.0" +hex = "0.4" lz4_flex = { version = ">=0.11.0, <0.11.4", features = ["safe-encode"], default-features = false } [features] self_signed_certs = [] # Empty by default for security +mock_auth = [] # Disabled by default. Not to be enabled in the prod release. default = ["self_signed_certs"] # TODO - remove this feature before release [dev-dependencies] @@ -34,4 +37,4 @@ num_cpus = "1.16" lz4_flex = { version = "0.11" } [lints] -workspace = true \ No newline at end of file +workspace = true diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs new file mode 100644 index 000000000..fddeba987 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/client.rs @@ -0,0 +1,106 @@ +//! High-level GenevaClient for user code. Wraps config_service and ingestion_service. + +use crate::config_service::client::{AuthMethod, GenevaConfigClient, GenevaConfigClientConfig}; +use crate::ingestion_service::uploader::{GenevaUploader, GenevaUploaderConfig}; +use crate::payload_encoder::lz4_chunked_compression::lz4_chunked_compression; +use crate::payload_encoder::otlp_encoder::OtlpEncoder; +use opentelemetry_proto::tonic::logs::v1::ResourceLogs; +use std::sync::Arc; + +/// Configuration for GenevaClient (user-facing) +#[derive(Clone, Debug)] +pub struct GenevaClientConfig { + pub endpoint: String, + pub environment: String, + pub account: String, + pub namespace: String, + pub region: String, + pub config_major_version: u32, + pub auth_method: AuthMethod, + pub tenant: String, + pub role_name: String, + pub role_instance: String, + // Add event name/version here if constant, or per-upload if you want them per call. +} + +/// Main user-facing client for Geneva ingestion. +#[derive(Clone)] +pub struct GenevaClient { + uploader: Arc, + encoder: OtlpEncoder, + metadata: String, +} + +impl GenevaClient { + /// Construct a new client with minimal configuration. Fetches and caches ingestion info as needed. + pub async fn new(cfg: GenevaClientConfig) -> Result { + // Build config client config + let config_client_config = GenevaConfigClientConfig { + endpoint: cfg.endpoint, + environment: cfg.environment.clone(), + account: cfg.account, + namespace: cfg.namespace.clone(), + region: cfg.region, + config_major_version: cfg.config_major_version, + auth_method: cfg.auth_method, + }; + let config_client = Arc::new( + GenevaConfigClient::new(config_client_config) + .map_err(|e| format!("GenevaConfigClient init failed: {e}"))?, + ); + + let source_identity = format!( + "Tenant={}/Role={}/RoleInstance={}", + cfg.tenant, cfg.role_name, cfg.role_instance + ); + + let schema_ids = + "c1ce0ecea020359624c493bbe97f9e80;0da22cabbee419e000541a5eda732eb3".to_string(); // TODO - find the actual value to be populated + + // Uploader config + let uploader_config = GenevaUploaderConfig { + namespace: cfg.namespace.clone(), + source_identity, + environment: cfg.environment, + schema_ids, + }; + + let uploader = GenevaUploader::from_config_client(config_client, uploader_config) + .await + .map_err(|e| format!("GenevaUploader init failed: {e}"))?; + let metadata = format!( + "namespace={}/eventVersion={}/tenant={}/role={}/roleinstance={}", + cfg.namespace, + "Ver1v0", // You can replace this with a cfg field if version should be dynamic + cfg.tenant, + cfg.role_name, + cfg.role_instance, + ); + Ok(Self { + uploader: Arc::new(uploader), + encoder: OtlpEncoder::new(), + metadata, + }) + } + + /// Upload OTLP logs (as ResourceLogs). + pub async fn upload_logs(&self, logs: &[ResourceLogs]) -> Result<(), String> { + let log_iter = logs + .iter() + .flat_map(|resource_log| resource_log.scope_logs.iter()) + .flat_map(|scope_log| scope_log.log_records.iter()); + let blobs = self.encoder.encode_log_batch(log_iter, &self.metadata); + for (_schema_id, event_name, encoded_blob, _row_count) in blobs { + // TODO - log encoded_blob for debugging + let compressed_blob = lz4_chunked_compression(&encoded_blob) + .map_err(|e| format!("LZ4 compression failed: {e}"))?; + // TODO - log compressed_blob for debugging + let event_version = "Ver2v0"; // TODO - find the actual value to be populated + self.uploader + .upload(compressed_blob, &event_name, event_version) + .await + .map_err(|e| format!("Geneva upload failed: {e}"))?; + } + Ok(()) + } +} diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs index 1d9dd147a..c1c7e04aa 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/config_service/client.rs @@ -46,7 +46,7 @@ use std::sync::RwLock; /// ``` #[allow(dead_code)] #[derive(Clone, Debug)] -pub(crate) enum AuthMethod { +pub enum AuthMethod { /// Certificate-based authentication /// /// # Arguments @@ -57,6 +57,8 @@ pub(crate) enum AuthMethod { /// /// Note(TODO): This is not yet implemented. ManagedIdentity, + #[cfg(feature = "mock_auth")] + MockAuth, // No authentication, used for testing purposes } #[derive(Debug, Error)] @@ -249,6 +251,12 @@ impl GenevaConfigClient { "Managed Identity authentication is not implemented yet".into(), )); } + #[cfg(feature = "mock_auth")] + AuthMethod::MockAuth => { + // Mock authentication for testing purposes, no actual auth needed + // Just use the default client builder + eprintln!("WARNING: Using MockAuth for GenevaConfigClient. This should only be used in tests!"); + } } let agent_identity = "GenevaUploader"; @@ -432,7 +440,7 @@ impl GenevaConfigClient { .send() .await .map_err(GenevaConfigClientError::Http)?; - + // Check if the response is successful let status = response.status(); let body = response.text().await?; diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs index 83d9b6f77..1f5e3b157 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/ingestion_service/uploader.rs @@ -139,7 +139,6 @@ impl GenevaUploader { // TODO - Maintain this as url-encoded in config service to avoid conversion here let encoded_monitoring_endpoint: String = byte_serialize(monitoring_endpoint.as_bytes()).collect(); - let encoded_source_identity: String = byte_serialize(self.config.source_identity.as_bytes()).collect(); @@ -195,7 +194,6 @@ impl GenevaUploader { auth_info.endpoint.trim_end_matches('/'), upload_uri ); - // Send the upload request let response = self .http_client @@ -208,13 +206,13 @@ impl GenevaUploader { .send() .await .map_err(GenevaUploaderError::Http)?; - let status = response.status(); let body = response.text().await.map_err(GenevaUploaderError::Http)?; if status == reqwest::StatusCode::ACCEPTED { let ingest_response: IngestionResponse = serde_json::from_str(&body).map_err(GenevaUploaderError::SerdeJson)?; + Ok(ingest_response) } else { Err(GenevaUploaderError::UploadFailed { diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs index 8a853c05d..091d8daf1 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs @@ -1,13 +1,12 @@ mod config_service; -pub mod ingestion_service; +mod ingestion_service; mod payload_encoder; -mod uploader; +pub mod client; #[allow(unused_imports)] pub(crate) use config_service::client::{ - AuthMethod, GenevaConfigClient, GenevaConfigClientConfig, GenevaConfigClientError, - IngestionGatewayInfo, + GenevaConfigClient, GenevaConfigClientConfig, GenevaConfigClientError, IngestionGatewayInfo, }; #[allow(unused_imports)] @@ -15,4 +14,5 @@ pub(crate) use ingestion_service::uploader::{ GenevaUploader, GenevaUploaderConfig, GenevaUploaderError, IngestionResponse, Result, }; -pub use uploader::{create_uploader, GenevaUploader as Uploader}; +pub use client::{GenevaClient, GenevaClientConfig}; +pub use config_service::client::AuthMethod; diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/bond_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/bond_encoder.rs new file mode 100644 index 000000000..283238284 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/bond_encoder.rs @@ -0,0 +1,356 @@ +// bond_encoder.rs - Pure Rust Bond encoder for dynamic OTLP schemas + +use std::borrow::Cow; +use std::io::{Result, Write}; + +/// Bond data types +#[derive(Clone, Copy, Debug)] +#[repr(u8)] +#[allow(non_camel_case_types)] // Allow C-style naming for clarity with Bond protocol +#[allow(dead_code)] // These represent all possible Bond types, not all are used yet +pub(crate) enum BondDataType { + BT_STOP = 0, + BT_STOP_BASE = 1, + BT_BOOL = 2, + BT_UINT8 = 3, + BT_UINT16 = 4, + BT_UINT32 = 5, + BT_UINT64 = 6, + BT_FLOAT = 7, + BT_DOUBLE = 8, + BT_STRING = 9, + BT_STRUCT = 10, + BT_LIST = 11, + BT_SET = 12, + BT_MAP = 13, + BT_INT8 = 14, + BT_INT16 = 15, + BT_INT32 = 16, + BT_INT64 = 17, + BT_WSTRING = 18, + BT_UNAVAILABLE = 127, +} + +/// Bond protocol writer for encoding values to buffers +pub(crate) struct BondWriter; + +impl BondWriter { + /// Write a string value to buffer (Bond BT_STRING format) + pub fn write_string(buffer: &mut Vec, s: &str) { + let bytes = s.as_bytes(); + //TODO - check if the length is less than 2^32-1 + buffer.extend_from_slice(&(bytes.len() as u32).to_le_bytes()); + buffer.extend_from_slice(bytes); + } + + /// Write an int32 value to buffer (Bond BT_INT32 format) + pub fn write_int32(buffer: &mut Vec, value: i32) { + buffer.extend_from_slice(&value.to_le_bytes()); + } + + /// Write a float value to buffer (Bond BT_FLOAT format) + pub fn write_float(buffer: &mut Vec, value: f32) { + buffer.extend_from_slice(&value.to_le_bytes()); + } + + /// Write a double value to buffer (Bond BT_DOUBLE format) + pub fn write_double(buffer: &mut Vec, value: f64) { + buffer.extend_from_slice(&value.to_le_bytes()); + } + + /// Write a WSTRING value to buffer (Bond BT_WSTRING format) + /// Character count prefix + UTF-16LE bytes + pub fn write_wstring(buffer: &mut Vec, s: &str) { + let utf16_bytes: Vec = s.encode_utf16().flat_map(|c| c.to_le_bytes()).collect(); + + // Character count (not byte count) + // TODO - check if the length is less than 2^32-1 + // TODO - check if length is number of bytes, or number of UTF-16 code units + buffer.extend_from_slice(&(s.len() as u32).to_le_bytes()); + buffer.extend_from_slice(&utf16_bytes); + } + + /// Write a boolean as int32 (Bond doesn't have native bool in Simple Binary) + pub fn write_bool_as_int32(buffer: &mut Vec, value: bool) { + Self::write_int32(buffer, if value { 1 } else { 0 }); + } +} + +/// Field definition for dynamic schemas +#[derive(Clone, Debug)] +pub(crate) struct FieldDef { + pub name: Cow<'static, str>, + pub field_id: u16, + pub type_id: u8, +} + +/// Schema definition that can be built dynamically +#[derive(Clone)] +pub(crate) struct DynamicSchema { + pub struct_name: String, + pub qualified_name: String, + pub fields: Vec, +} + +impl DynamicSchema { + pub(crate) fn new(name: &str, namespace: &str, fields: Vec) -> Self { + Self { + struct_name: name.to_string(), + qualified_name: format!("{}.{}", namespace, name), + fields, + } + } + + /// Encode the schema to Bond format + pub(crate) fn encode(&self) -> Result> { + let mut schema_bytes = Vec::new(); + + // Write header + schema_bytes.write_all(&[0x53, 0x50])?; // 'S','P' + schema_bytes.write_all(&[0x01, 0x00])?; // Version 1 + schema_bytes.write_all(&1u32.to_le_bytes())?; // num structs + + // Write struct definition + write_bond_string(&mut schema_bytes, &self.struct_name)?; + write_bond_string(&mut schema_bytes, &self.qualified_name)?; + schema_bytes.write_all(&0u32.to_le_bytes())?; // attributes + + // Modifier - 0 for Optional + schema_bytes.write_all(&[0u8])?; + + // Default values + schema_bytes.write_all(&0u64.to_le_bytes())?; // default_uint + schema_bytes.write_all(&0i64.to_le_bytes())?; // default_int + schema_bytes.write_all(&0f64.to_le_bytes())?; // default_double + schema_bytes.write_all(&0u32.to_le_bytes())?; // default_string + schema_bytes.write_all(&0u32.to_le_bytes())?; // default_wstring + schema_bytes.write_all(&[0u8])?; // default_nothing + + // Base def + schema_bytes.write_all(&0u32.to_le_bytes())?; + + // 3 bytes of zeros before num_fields + schema_bytes.write_all(&[0u8; 3])?; + + // Number of fields + schema_bytes.write_all(&(self.fields.len() as u32).to_le_bytes())?; + + // Write field definitions + for (i, field) in self.fields.iter().enumerate() { + let is_last = i == self.fields.len() - 1; + write_field_def(&mut schema_bytes, field, is_last)?; + } + + // Padding to align to 8 bytes + schema_bytes.write_all(&[0u8; 8])?; + + // Root type typedef + schema_bytes.write_all(&[BondDataType::BT_STRUCT as u8])?; + schema_bytes.write_all(&0u16.to_le_bytes())?; // struct index 0 + schema_bytes.write_all(&[0u8])?; // element + schema_bytes.write_all(&[0u8])?; // key + schema_bytes.write_all(&[0u8])?; // bonded = false + + // Final padding + schema_bytes.write_all(&[0u8; 9])?; + + Ok(schema_bytes) + } +} + +fn write_bond_string(writer: &mut W, s: &str) -> Result<()> { + let bytes = s.as_bytes(); + writer.write_all(&(bytes.len() as u32).to_le_bytes())?; + writer.write_all(bytes)?; + Ok(()) +} + +fn write_field_def(writer: &mut W, field: &FieldDef, is_last: bool) -> Result<()> { + // Field name + write_bond_string(writer, field.name.as_ref())?; + + // Empty qualified name + write_bond_string(writer, "")?; + + // Attributes + writer.write_all(&0u32.to_le_bytes())?; + + // Modifier + writer.write_all(&[0u8])?; + + // Default values (all zeros for primitives) + writer.write_all(&0u64.to_le_bytes())?; // default_uint + writer.write_all(&0i64.to_le_bytes())?; // default_int + writer.write_all(&0f64.to_le_bytes())?; // default_double + writer.write_all(&0u32.to_le_bytes())?; // default_string + writer.write_all(&0u32.to_le_bytes())?; // default_wstring + writer.write_all(&[0u8])?; // default_nothing + + // Add 3 bytes of padding before field ID + writer.write_all(&[0u8; 3])?; + + // Field ID + writer.write_all(&field.field_id.to_le_bytes())?; + + // Type + writer.write_all(&[field.type_id])?; + + // Additional type info (all zeros for primitives) + writer.write_all(&0u16.to_le_bytes())?; // struct_def + writer.write_all(&[0u8])?; // element + writer.write_all(&[0u8])?; // key + writer.write_all(&[0u8])?; // bonded_type + writer.write_all(&[0u8])?; // default_value_present + + // Add 8 bytes padding after each field except the last one + if !is_last { + writer.write_all(&[0u8; 8])?; + } + + Ok(()) +} + +/// Encode a payload with dynamic fields +#[allow(dead_code)] // May be used in future +pub(crate) fn encode_dynamic_payload( + writer: &mut W, + fields: &[FieldDef], + values: &[(&str, Vec)], // field_name -> encoded value +) -> Result<()> { + // Write Simple Binary header + writer.write_all(&[0x53, 0x50])?; // 'S','P' + writer.write_all(&[0x01, 0x00])?; // Version 1 + + // Create a map for quick lookup + let value_map: std::collections::HashMap<&str, &[u8]> = + values.iter().map(|(k, v)| (*k, v.as_slice())).collect(); + + // Write values in field order + for field in fields { + if let Some(value_bytes) = value_map.get(field.name.as_ref()) { + writer.write_all(value_bytes)?; + } else { + // Write default value based on type + match field.type_id { + 7 => writer.write_all(&0f32.to_le_bytes())?, // float + 8 => writer.write_all(&0f64.to_le_bytes())?, // double + 9 | 18 => writer.write_all(&0u32.to_le_bytes())?, // empty string + 16 => writer.write_all(&0i32.to_le_bytes())?, // int32 + 17 => writer.write_all(&0i64.to_le_bytes())?, // int64 + _ => {} // Handle other types as needed + } + } + } + + Ok(()) +} + +pub(crate) struct BondEncodedSchema { + schema: DynamicSchema, + encoded_bytes: Vec, +} + +impl BondEncodedSchema { + pub(crate) fn from_fields(name: &str, namespace: &str, fields: Vec) -> Self { + let schema = DynamicSchema::new(name, namespace, fields); //"OtlpLogRecord", "telemetry"); + + let encoded_bytes = schema.encode().expect("Schema encoding failed"); + + Self { + schema, + encoded_bytes, + } + } + + pub(crate) fn as_bytes(&self) -> &[u8] { + &self.encoded_bytes + } +} + +impl Clone for BondEncodedSchema { + fn clone(&self) -> Self { + Self { + schema: self.schema.clone(), + encoded_bytes: self.encoded_bytes.clone(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::borrow::Cow; + + #[test] + fn test_dynamic_schema() { + // Create fields directly as FieldDef + let fields = vec![ + FieldDef { + name: Cow::Borrowed("field1"), + type_id: BondDataType::BT_DOUBLE as u8, + field_id: 1, + }, + FieldDef { + name: Cow::Borrowed("field2"), + type_id: BondDataType::BT_STRING as u8, + field_id: 2, + }, + FieldDef { + name: Cow::Borrowed("field3"), + type_id: BondDataType::BT_INT32 as u8, + field_id: 3, + }, + ]; + + let schema = DynamicSchema::new("TestStruct", "test.namespace", fields); + let encoded = schema.encode().unwrap(); + assert!(!encoded.is_empty()); + } + + #[test] + fn test_pure_rust_encoder_schema() { + let fields = vec![ + FieldDef { + name: Cow::Borrowed("timestamp"), + type_id: BondDataType::BT_STRING as u8, + field_id: 1, + }, + FieldDef { + name: Cow::Borrowed("severity"), + type_id: BondDataType::BT_INT32 as u8, + field_id: 2, + }, + FieldDef { + name: Cow::Borrowed("message"), + type_id: BondDataType::BT_STRING as u8, + field_id: 3, + }, + ]; + + let schema = BondEncodedSchema::from_fields("OtlpLogRecord", "telemetry", fields); + let bytes = schema.as_bytes(); + assert!(!bytes.is_empty()); + } + + #[test] + fn test_field_def_with_owned_strings() { + // Test that FieldDef works with owned strings too + let dynamic_field_name = format!("dynamic_{}", 123); + let fields = vec![ + FieldDef { + name: Cow::Owned(dynamic_field_name), + type_id: BondDataType::BT_STRING as u8, + field_id: 1, + }, + FieldDef { + name: Cow::Borrowed("static_field"), + type_id: BondDataType::BT_INT32 as u8, + field_id: 2, + }, + ]; + + let schema = BondEncodedSchema::from_fields("TestStruct", "test.namespace", fields); + let bytes = schema.as_bytes(); + assert!(!bytes.is_empty()); + } +} diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/central_blob.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/central_blob.rs new file mode 100644 index 000000000..e8b7910b7 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/central_blob.rs @@ -0,0 +1,246 @@ +//use md5; + +use crate::payload_encoder::bond_encoder::BondEncodedSchema; + +/// Helper to encode UTF-8 Rust str to UTF-16LE bytes +/// TODO - consider avoiding temporary allocation, by passing a mutable buffer +#[allow(dead_code)] +fn utf8_to_utf16le_bytes(s: &str) -> Vec { + // Each UTF-16 code unit is 2 bytes. For ASCII strings, the UTF-16 representation + // will be twice as large as UTF-8. For non-ASCII strings, UTF-16 may be more compact + // than UTF-8 in some cases, but to avoid reallocations we preallocate 2x len. + let mut buf = Vec::with_capacity(s.len() * 2); + for u in s.encode_utf16() { + buf.extend_from_slice(&u.to_le_bytes()); + } + buf +} + +/// Schema entry for central blob +#[allow(dead_code)] +pub(crate) struct CentralSchemaEntry { + pub id: u64, + pub md5: [u8; 16], + pub schema: BondEncodedSchema, +} + +/// Event/row entry for central blob +#[allow(dead_code)] +pub(crate) struct CentralEventEntry { + pub schema_id: u64, + pub level: u8, + pub event_name: String, + pub row: Vec, +} + +const TERMINATOR: u64 = 0xdeadc0dedeadc0de; + +/// CentralBlob Protocol Payload Structure +/// +/// The payload consists of a header, metadata, schemas, and events, each encoded in a specific format. +/// The central terminator constant used throughout is `TERMINATOR = 0xdeadc0dedeadc0de`. +/// +/// ## Payload Structure +/// +/// ### Header +/// - **Version**: `u32` (4 bytes) +/// - **Format**: `u32` (4 bytes) +/// +/// ### Metadata +/// - **Length**: `u32` (4 bytes, prefix for UTF-16LE encoded metadata) +/// - **Metadata**: UTF-16LE encoded string (variable length) +/// +/// ### Schemas +/// A collection of schema entries, each encoded as follows: +/// - **Entity Type**: `u16` (2 bytes, value = 0) +/// - **Schema ID**: `u64` (8 bytes, unique identifier) +/// - **MD5 Hash**: `[u8; 16]` (16 bytes, MD5 checksum of schema bytes) +/// - **Schema Length**: `u32` (4 bytes) +/// - **Schema Bytes**: `Vec` (variable length, schema serialized as bytes) +/// - **Terminator**: `u64` (8 bytes, constant `TERMINATOR`) +/// +/// ### Events +/// A collection of event entries, each encoded as follows: +/// - **Entity Type**: `u16` (2 bytes, value = 2) +/// - **Schema ID**: `u64` (8 bytes, links the event to a schema) +/// - **Level**: `u8` (1 byte, event verbosity or severity level) +/// - **Event Name Length**: `u16` (2 bytes, prefix for UTF-16LE encoded event name) +/// - **Event Name**: UTF-16LE encoded string (variable length) +/// - **Row Length**: `u32` (4 bytes, prefix for row data including `Simple Protocol` header) +/// - **Row Data**: +/// - **Simple Protocol Header**: `[0x53, 0x50, 0x01, 0x00]` (4 bytes) +/// - **Row Bytes**: `Vec` (variable length, row serialized as bytes) +/// - **Terminator**: `u64` (8 bytes, constant `TERMINATOR`) +/// +#[allow(dead_code)] +pub(crate) struct CentralBlob { + pub version: u32, + pub format: u32, + pub metadata: String, // UTF-8, will be stored as UTF-16LE + pub schemas: Vec, + pub events: Vec, +} + +impl CentralBlob { + #[allow(dead_code)] + pub(crate) fn to_bytes(&self) -> Vec { + // Estimate buffer size: + // - Header: 4 (version) + 4 (format) + // - Metadata: 4 (length prefix) + metadata_utf16.len() + // - Each schema: + // 2 (entity type, u16) + // + 8 (schema id, u64) + // + 16 (md5, [u8;16]) + // + 4 (schema bytes length, u32) + // + schema_bytes.len() + // + 8 (terminator, u64) + // - Each event: + // 2 (entity type, u16) + // + 8 (schema_id, u64) + // + 1 (level, u8) + // + 2 (event name length, u16) + // + event_name_utf16.len() + // + 4 (row length, u32) + // + 4 (Simple Protocol header) + // + row_bytes.len() + // + 8 (terminator, u64) + let meta_utf16 = utf8_to_utf16le_bytes(&self.metadata); + let events_with_utf16 = self + .events + .iter() + .map(|e| { + let evname_utf16 = utf8_to_utf16le_bytes(&e.event_name); + (e, evname_utf16) + }) + .collect::>(); + let mut estimated_size = 8 + 4 + meta_utf16.len(); + estimated_size += self + .schemas + .iter() + .map(|s| 2 + 8 + 16 + 4 + s.schema.as_bytes().len() + 8) + .sum::(); + estimated_size += events_with_utf16 + .iter() + .map(|(e, evname_utf16)| { + let row_len = { + 4 + &e.row.len() // SP header (4), row_bytes + }; + 2 + 8 + 1 + 2 + 4 + evname_utf16.len() + row_len + 8 + }) + .sum::(); + + let mut buf = Vec::with_capacity(estimated_size); + + // HEADER + buf.extend_from_slice(&self.version.to_le_bytes()); + buf.extend_from_slice(&self.format.to_le_bytes()); + + // METADATA (len, UTF-16LE bytes) + buf.extend_from_slice(&(meta_utf16.len() as u32).to_le_bytes()); + buf.extend_from_slice(&meta_utf16); + + // SCHEMAS (type 0) + for schema in &self.schemas { + buf.extend_from_slice(&0u16.to_le_bytes()); // entity type 0 + buf.extend_from_slice(&schema.id.to_le_bytes()); + buf.extend_from_slice(&schema.md5); + let schema_bytes = schema.schema.as_bytes(); + buf.extend_from_slice(&(schema_bytes.len() as u32).to_le_bytes()); //TODO - check for overflow + buf.extend_from_slice(schema_bytes); + buf.extend_from_slice(&TERMINATOR.to_le_bytes()); + } + + // EVENTS (type 2) + for (event, evname_utf16) in events_with_utf16 { + buf.extend_from_slice(&2u16.to_le_bytes()); // entity type 2 + buf.extend_from_slice(&event.schema_id.to_le_bytes()); + buf.push(event.level); + + // event name (UTF-16LE, prefixed with u16 len in bytes) + buf.extend_from_slice(&(evname_utf16.len() as u16).to_le_bytes()); // TODO - check for overflow + buf.extend_from_slice(&evname_utf16); + + let total_len = 4 + &event.row.len(); // SP header + data + + buf.extend_from_slice(&(total_len as u32).to_le_bytes()); // TODO - check for overflow + buf.extend_from_slice(&[0x53, 0x50, 0x01, 0x00]); // Simple Protocol header + buf.extend_from_slice(&event.row); + + buf.extend_from_slice(&TERMINATOR.to_le_bytes()); + } + + buf + } +} + +// Example usage/test (can be moved to examples or tests) +#[cfg(test)] +mod tests { + use super::*; + use crate::payload_encoder::bond_encoder::{BondEncodedSchema, FieldDef}; + use std::borrow::Cow; + + //Helper to calculate MD5 hash, returns [u8;16] + fn md5_bytes(data: &[u8]) -> [u8; 16] { + md5::compute(data).0 + } + + #[test] + fn test_central_blob_creation() { + // Prepare a schema + let fields = vec![ + FieldDef { + name: Cow::Borrowed("foo"), + type_id: 16u8, // BT_INT32 + field_id: 1u16, + }, + FieldDef { + name: Cow::Borrowed("bar"), + type_id: 9u8, // BT_STRING + field_id: 2u16, + }, + ]; + let schema_obj = BondEncodedSchema::from_fields("TestStruct", "test.namespace", fields); + let schema_bytes = schema_obj.as_bytes().to_vec(); + let schema_md5 = md5_bytes(&schema_bytes); + let schema_id = 1234u64; + + let schema = CentralSchemaEntry { + id: schema_id, + md5: schema_md5, + schema: schema_obj, + }; + + // Prepare a row + let mut row = Vec::new(); + row.extend_from_slice(&42i32.to_le_bytes()); + let s = "hello"; + row.extend_from_slice(&(s.len() as u32).to_le_bytes()); + row.extend_from_slice(s.as_bytes()); + + let event = CentralEventEntry { + schema_id, + level: 0, // e.g. ETW verbose + event_name: "eventname".to_string(), + row, + }; + + // Metadata + let metadata = + "namespace=testNamespace/eventVersion=Ver1v0/tenant=T/role=R/roleinstance=RI"; + + // Build blob + let blob = CentralBlob { + version: 1, + format: 42, + metadata: metadata.to_string(), + schemas: vec![schema], + events: vec![event], + }; + + let payload = blob.to_bytes(); + + // Only assert that the payload is created and non-empty + assert!(!payload.is_empty()); + } +} diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/mod.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/mod.rs index 84ee9d815..a73d439fc 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/mod.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/mod.rs @@ -1 +1,222 @@ -mod lz4_chunked_compression; +pub(crate) mod bond_encoder; +pub(crate) mod central_blob; +pub(crate) mod lz4_chunked_compression; +pub(crate) mod otlp_encoder; + +#[cfg(test)] +mod tests { + use crate::payload_encoder::bond_encoder::{BondDataType, BondEncodedSchema, BondWriter}; + use crate::payload_encoder::central_blob::{ + CentralBlob, CentralEventEntry, CentralSchemaEntry, + }; + + use crate::payload_encoder::bond_encoder::FieldDef; + use std::borrow::Cow; + + fn create_payload(fields: &[(&str, u8, u16)], row_data: Vec) -> Vec { + // Convert the input tuples to FieldDef + let field_defs: Vec = fields + .iter() + .map(|(name, type_id, field_id)| FieldDef { + name: Cow::Owned(name.to_string()), + type_id: *type_id, + field_id: *field_id, + }) + .collect(); + let schema_obj = + BondEncodedSchema::from_fields("MdsContainer", "testNamespace", field_defs); + let schema_bytes = schema_obj.as_bytes(); + let schema_md5 = md5::compute(schema_bytes).0; + let schema_id = 1u64; + + let schema_entry = CentralSchemaEntry { + id: schema_id, + md5: schema_md5, + schema: schema_obj, + }; + + let event = CentralEventEntry { + schema_id, + level: 5, + event_name: "basename".to_string(), + row: row_data, + }; + + let metadata = + "namespace=testNamespace/eventVersion=Ver1v0/tenant=T/role=R/roleinstance=RI"; + + let blob = CentralBlob { + version: 1, + format: 2, + metadata: metadata.to_string(), + schemas: vec![schema_entry], + events: vec![event], + }; + + blob.to_bytes() + } + + #[test] + #[allow(clippy::approx_constant)] + fn test_single_column_double() { + // The encoded value using official C++ Bond encoder + let expected: &[u8] = &[ + 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x96, 0x00, 0x00, 0x00, 0x6e, 0x00, + 0x61, 0x00, 0x6d, 0x00, 0x65, 0x00, 0x73, 0x00, 0x70, 0x00, 0x61, 0x00, 0x63, 0x00, + 0x65, 0x00, 0x3d, 0x00, 0x74, 0x00, 0x65, 0x00, 0x73, 0x00, 0x74, 0x00, 0x4e, 0x00, + 0x61, 0x00, 0x6d, 0x00, 0x65, 0x00, 0x73, 0x00, 0x70, 0x00, 0x61, 0x00, 0x63, 0x00, + 0x65, 0x00, 0x2f, 0x00, 0x65, 0x00, 0x76, 0x00, 0x65, 0x00, 0x6e, 0x00, 0x74, 0x00, + 0x56, 0x00, 0x65, 0x00, 0x72, 0x00, 0x73, 0x00, 0x69, 0x00, 0x6f, 0x00, 0x6e, 0x00, + 0x3d, 0x00, 0x56, 0x00, 0x65, 0x00, 0x72, 0x00, 0x31, 0x00, 0x76, 0x00, 0x30, 0x00, + 0x2f, 0x00, 0x74, 0x00, 0x65, 0x00, 0x6e, 0x00, 0x61, 0x00, 0x6e, 0x00, 0x74, 0x00, + 0x3d, 0x00, 0x54, 0x00, 0x2f, 0x00, 0x72, 0x00, 0x6f, 0x00, 0x6c, 0x00, 0x65, 0x00, + 0x3d, 0x00, 0x52, 0x00, 0x2f, 0x00, 0x72, 0x00, 0x6f, 0x00, 0x6c, 0x00, 0x65, 0x00, + 0x69, 0x00, 0x6e, 0x00, 0x73, 0x00, 0x74, 0x00, 0x61, 0x00, 0x6e, 0x00, 0x63, 0x00, + 0x65, 0x00, 0x3d, 0x00, 0x52, 0x00, 0x49, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x0d, 0x3a, 0xa1, 0xdc, 0xb7, 0xd9, 0x15, 0x9d, 0x5d, 0x90, + 0x87, 0xd3, 0x89, 0x20, 0x86, 0x5e, 0xc0, 0x00, 0x00, 0x00, 0x53, 0x50, 0x01, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x4d, 0x64, 0x73, 0x43, 0x6f, 0x6e, + 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x1a, 0x00, 0x00, 0x00, 0x74, 0x65, 0x73, 0x74, + 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x2e, 0x4d, 0x64, 0x73, 0x43, + 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, + 0x00, 0x08, 0x00, 0x00, 0x00, 0x46, 0x6c, 0x6f, 0x61, 0x74, 0x43, 0x6f, 0x6c, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x01, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xde, 0xc0, 0xad, 0xde, 0xde, 0xc0, 0xad, 0xde, + 0x02, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x10, 0x00, 0x62, + 0x00, 0x61, 0x00, 0x73, 0x00, 0x65, 0x00, 0x6e, 0x00, 0x61, 0x00, 0x6d, 0x00, 0x65, + 0x00, 0x0c, 0x00, 0x00, 0x00, 0x53, 0x50, 0x01, 0x00, 0x6f, 0x12, 0x83, 0xc0, 0xca, + 0x21, 0x09, 0x40, 0xde, 0xc0, 0xad, 0xde, 0xde, 0xc0, 0xad, 0xde, + ]; + let fields = &[("FloatCol", BondDataType::BT_DOUBLE as u8, 1u16)]; + let mut row_data = Vec::new(); + BondWriter::write_double(&mut row_data, 3.1415f64); + let actual = create_payload(fields, row_data); + assert_eq!(expected, actual); + } + + #[test] + #[allow(clippy::approx_constant)] + fn test_single_column_int64() { + // The encoded value using official C++ Bond encoder + let expected: &[u8] = &[ + 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x96, 0x00, 0x00, 0x00, 0x6e, 0x00, + 0x61, 0x00, 0x6d, 0x00, 0x65, 0x00, 0x73, 0x00, 0x70, 0x00, 0x61, 0x00, 0x63, 0x00, + 0x65, 0x00, 0x3d, 0x00, 0x74, 0x00, 0x65, 0x00, 0x73, 0x00, 0x74, 0x00, 0x4e, 0x00, + 0x61, 0x00, 0x6d, 0x00, 0x65, 0x00, 0x73, 0x00, 0x70, 0x00, 0x61, 0x00, 0x63, 0x00, + 0x65, 0x00, 0x2f, 0x00, 0x65, 0x00, 0x76, 0x00, 0x65, 0x00, 0x6e, 0x00, 0x74, 0x00, + 0x56, 0x00, 0x65, 0x00, 0x72, 0x00, 0x73, 0x00, 0x69, 0x00, 0x6f, 0x00, 0x6e, 0x00, + 0x3d, 0x00, 0x56, 0x00, 0x65, 0x00, 0x72, 0x00, 0x31, 0x00, 0x76, 0x00, 0x30, 0x00, + 0x2f, 0x00, 0x74, 0x00, 0x65, 0x00, 0x6e, 0x00, 0x61, 0x00, 0x6e, 0x00, 0x74, 0x00, + 0x3d, 0x00, 0x54, 0x00, 0x2f, 0x00, 0x72, 0x00, 0x6f, 0x00, 0x6c, 0x00, 0x65, 0x00, + 0x3d, 0x00, 0x52, 0x00, 0x2f, 0x00, 0x72, 0x00, 0x6f, 0x00, 0x6c, 0x00, 0x65, 0x00, + 0x69, 0x00, 0x6e, 0x00, 0x73, 0x00, 0x74, 0x00, 0x61, 0x00, 0x6e, 0x00, 0x63, 0x00, + 0x65, 0x00, 0x3d, 0x00, 0x52, 0x00, 0x49, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x7c, 0x44, 0xde, 0xc6, 0x39, 0x71, 0x7d, 0x2e, 0xed, 0x1c, + 0x04, 0xd2, 0xc8, 0x5c, 0x3d, 0xef, 0xbe, 0x00, 0x00, 0x00, 0x53, 0x50, 0x01, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x4d, 0x64, 0x73, 0x43, 0x6f, 0x6e, + 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x1a, 0x00, 0x00, 0x00, 0x74, 0x65, 0x73, 0x74, + 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x2e, 0x4d, 0x64, 0x73, 0x43, + 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, + 0x00, 0x06, 0x00, 0x00, 0x00, 0x49, 0x6e, 0x74, 0x43, 0x6f, 0x6c, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xde, 0xc0, 0xad, 0xde, 0xde, 0xc0, 0xad, 0xde, 0x02, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x10, 0x00, 0x62, 0x00, 0x61, + 0x00, 0x73, 0x00, 0x65, 0x00, 0x6e, 0x00, 0x61, 0x00, 0x6d, 0x00, 0x65, 0x00, 0x08, + 0x00, 0x00, 0x00, 0x53, 0x50, 0x01, 0x00, 0x64, 0x00, 0x00, 0x00, 0xde, 0xc0, 0xad, + 0xde, 0xde, 0xc0, 0xad, 0xde, + ]; + let fields = &[("IntCol", BondDataType::BT_INT32 as u8, 1u16)]; + let mut row_data = Vec::new(); + BondWriter::write_int32(&mut row_data, 100i32); + let actual = create_payload(fields, row_data); + assert_eq!(expected, actual); + } + + #[test] + #[allow(clippy::approx_constant)] + fn test_mixed_types_compressed() { + // The encoded value for mixed schema using official C++ Bond encoder + let expected: &[u8] = &[ + 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x96, 0x00, 0x00, 0x00, 0x6e, 0x00, + 0x61, 0x00, 0x6d, 0x00, 0x65, 0x00, 0x73, 0x00, 0x70, 0x00, 0x61, 0x00, 0x63, 0x00, + 0x65, 0x00, 0x3d, 0x00, 0x74, 0x00, 0x65, 0x00, 0x73, 0x00, 0x74, 0x00, 0x4e, 0x00, + 0x61, 0x00, 0x6d, 0x00, 0x65, 0x00, 0x73, 0x00, 0x70, 0x00, 0x61, 0x00, 0x63, 0x00, + 0x65, 0x00, 0x2f, 0x00, 0x65, 0x00, 0x76, 0x00, 0x65, 0x00, 0x6e, 0x00, 0x74, 0x00, + 0x56, 0x00, 0x65, 0x00, 0x72, 0x00, 0x73, 0x00, 0x69, 0x00, 0x6f, 0x00, 0x6e, 0x00, + 0x3d, 0x00, 0x56, 0x00, 0x65, 0x00, 0x72, 0x00, 0x31, 0x00, 0x76, 0x00, 0x30, 0x00, + 0x2f, 0x00, 0x74, 0x00, 0x65, 0x00, 0x6e, 0x00, 0x61, 0x00, 0x6e, 0x00, 0x74, 0x00, + 0x3d, 0x00, 0x54, 0x00, 0x2f, 0x00, 0x72, 0x00, 0x6f, 0x00, 0x6c, 0x00, 0x65, 0x00, + 0x3d, 0x00, 0x52, 0x00, 0x2f, 0x00, 0x72, 0x00, 0x6f, 0x00, 0x6c, 0x00, 0x65, 0x00, + 0x69, 0x00, 0x6e, 0x00, 0x73, 0x00, 0x74, 0x00, 0x61, 0x00, 0x6e, 0x00, 0x63, 0x00, + 0x65, 0x00, 0x3d, 0x00, 0x52, 0x00, 0x49, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xc2, 0x91, 0x6b, 0x48, 0xa6, 0xc8, 0xab, 0x6a, 0x7c, 0x47, + 0xda, 0x01, 0xc7, 0x8a, 0x84, 0x5d, 0xa2, 0x01, 0x00, 0x00, 0x53, 0x50, 0x01, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x4d, 0x64, 0x73, 0x43, 0x6f, 0x6e, + 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x1a, 0x00, 0x00, 0x00, 0x74, 0x65, 0x73, 0x74, + 0x4e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x2e, 0x4d, 0x64, 0x73, 0x43, + 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, + 0x00, 0x09, 0x00, 0x00, 0x00, 0x46, 0x6c, 0x6f, 0x61, 0x74, 0x43, 0x6f, 0x6c, 0x31, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x01, 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x46, 0x6c, 0x6f, 0x61, + 0x74, 0x43, 0x6f, 0x6c, 0x32, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x08, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0x00, 0x00, + 0x00, 0x46, 0x6c, 0x6f, 0x61, 0x74, 0x43, 0x6f, 0x6c, 0x33, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, + 0x00, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x09, 0x00, 0x00, 0x00, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x43, 0x6f, + 0x6c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x04, 0x00, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xde, 0xc0, 0xad, 0xde, 0xde, 0xc0, + 0xad, 0xde, 0x02, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x10, + 0x00, 0x62, 0x00, 0x61, 0x00, 0x73, 0x00, 0x65, 0x00, 0x6e, 0x00, 0x61, 0x00, 0x6d, + 0x00, 0x65, 0x00, 0x36, 0x00, 0x00, 0x00, 0x53, 0x50, 0x01, 0x00, 0x6f, 0x12, 0x83, + 0xc0, 0xca, 0x21, 0x09, 0x40, 0xc9, 0xe5, 0x3f, 0xa4, 0xdf, 0xbe, 0x05, 0x40, 0x17, + 0xd9, 0xce, 0xf7, 0x53, 0xe3, 0xf9, 0x3f, 0x0b, 0x00, 0x00, 0x00, 0x48, 0x00, 0x65, + 0x00, 0x6c, 0x00, 0x6c, 0x00, 0x6f, 0x00, 0x20, 0x00, 0x42, 0x00, 0x6f, 0x00, 0x6e, + 0x00, 0x64, 0x00, 0x21, 0x00, 0xde, 0xc0, 0xad, 0xde, 0xde, 0xc0, 0xad, 0xde, + ]; + let fields = &[ + ("FloatCol1", BondDataType::BT_DOUBLE as u8, 1u16), + ("FloatCol2", BondDataType::BT_DOUBLE as u8, 2u16), + ("FloatCol3", BondDataType::BT_DOUBLE as u8, 3u16), + ("StringCol", BondDataType::BT_WSTRING as u8, 4u16), + ]; + + let mut row_data = Vec::new(); + BondWriter::write_double(&mut row_data, 3.1415f64); + BondWriter::write_double(&mut row_data, 2.7182f64); + BondWriter::write_double(&mut row_data, 1.6180f64); + BondWriter::write_wstring(&mut row_data, "Hello Bond!"); + + let actual = create_payload(fields, row_data); + assert_eq!(expected, actual); + } +} diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs new file mode 100644 index 000000000..cc55bd339 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/otlp_encoder.rs @@ -0,0 +1,406 @@ +use crate::payload_encoder::bond_encoder::{BondDataType, BondEncodedSchema, BondWriter, FieldDef}; +use crate::payload_encoder::central_blob::{CentralBlob, CentralEventEntry, CentralSchemaEntry}; +use chrono::{TimeZone, Utc}; +use opentelemetry_proto::tonic::common::v1::any_value::Value; +use opentelemetry_proto::tonic::logs::v1::LogRecord; +use std::borrow::Cow; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +type SchemaCache = Arc)>>>; +type BatchKey = (u64, String); +type EncodedRow = (String, u8, Vec); // (event_name, level, row_buffer) +type BatchValue = (CentralSchemaEntry, Vec); +type LogBatches = HashMap; + +const FIELD_ENV_NAME: &str = "env_name"; +const FIELD_ENV_VER: &str = "env_ver"; +const FIELD_TIMESTAMP: &str = "timestamp"; +const FIELD_ENV_TIME: &str = "env_time"; +const FIELD_TRACE_ID: &str = "env_dt_traceId"; +const FIELD_SPAN_ID: &str = "env_dt_spanId"; +const FIELD_TRACE_FLAGS: &str = "env_dt_traceFlags"; +const FIELD_NAME: &str = "name"; +const FIELD_SEVERITY_NUMBER: &str = "SeverityNumber"; +const FIELD_SEVERITY_TEXT: &str = "SeverityText"; +const FIELD_BODY: &str = "body"; + +/// Encoder to write OTLP payload in bond form. +#[derive(Clone)] +pub struct OtlpEncoder { + // TODO - limit cache size or use LRU eviction, and/or add feature flag for caching + schema_cache: SchemaCache, +} + +impl OtlpEncoder { + pub fn new() -> Self { + OtlpEncoder { + schema_cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub fn encode_log_batch<'a, I>( + &self, + logs: I, + metadata: &str, + ) -> Vec<(u64, String, Vec, usize)> + where + I: Iterator, + { + use std::collections::HashMap; + + let mut batches: LogBatches = HashMap::new(); + + for log_record in logs { + // 1. Get schema + let field_specs = self.determine_fields(log_record); + let schema_id = Self::calculate_schema_id(&field_specs); + let (schema_entry, field_info) = self.get_or_create_schema(schema_id, field_specs); + + // 2. Encode row + let row_buffer = self.write_row_data(log_record, &field_info); + let event_name = if log_record.event_name.is_empty() { + "Log".to_string() + } else { + log_record.event_name.clone() + }; + let level = log_record.severity_number as u8; + + // 3. Insert into batches - Key is (schema_id, event_name) + batches + .entry((schema_id, event_name.clone())) + .or_insert_with(|| (schema_entry, Vec::new())) + .1 + .push((event_name, level, row_buffer)); + } + + // 4. Encode blobs (one per schema AND event_name combination) + let mut blobs = Vec::new(); + for ((schema_id, batch_event_name), (schema_entry, records)) in batches { + let events: Vec = records + .into_iter() + .map(|(event_name, level, row_buffer)| CentralEventEntry { + schema_id, + level, + event_name, + row: row_buffer, + }) + .collect(); + let events_len = events.len(); + + let blob = CentralBlob { + version: 1, + format: 2, + metadata: metadata.to_string(), + schemas: vec![schema_entry], + events, + }; + let bytes = blob.to_bytes(); + blobs.push((schema_id, batch_event_name, bytes, events_len)); + } + blobs + } + + /// Determine which fields are present in the LogRecord + fn determine_fields(&self, log: &LogRecord) -> Vec { + // Pre-allocate with estimated capacity to avoid reallocations + let estimated_capacity = 7 + 4 + log.attributes.len(); + let mut fields = Vec::with_capacity(estimated_capacity); + fields.push((Cow::Borrowed(FIELD_ENV_NAME), BondDataType::BT_STRING as u8)); + fields.push((FIELD_ENV_VER.into(), BondDataType::BT_STRING as u8)); + fields.push((FIELD_TIMESTAMP.into(), BondDataType::BT_STRING as u8)); + fields.push((FIELD_ENV_TIME.into(), BondDataType::BT_STRING as u8)); + + // Part A extension - Conditional fields + if !log.trace_id.is_empty() { + fields.push((FIELD_TRACE_ID.into(), BondDataType::BT_STRING as u8)); + } + if !log.span_id.is_empty() { + fields.push((FIELD_SPAN_ID.into(), BondDataType::BT_STRING as u8)); + } + if log.flags != 0 { + fields.push((FIELD_TRACE_FLAGS.into(), BondDataType::BT_INT32 as u8)); + } + + // Part B - Core log fields + if !log.event_name.is_empty() { + fields.push((FIELD_NAME.into(), BondDataType::BT_STRING as u8)); + } + fields.push((FIELD_SEVERITY_NUMBER.into(), BondDataType::BT_INT32 as u8)); + if !log.severity_text.is_empty() { + fields.push((FIELD_SEVERITY_TEXT.into(), BondDataType::BT_STRING as u8)); + } + if let Some(body) = &log.body { + if let Some(Value::StringValue(_)) = &body.value { + // Only included in schema when body is a string value + fields.push((FIELD_BODY.into(), BondDataType::BT_STRING as u8)); + } + //TODO - handle other body types + } + + // Part C - Dynamic attributes + for attr in &log.attributes { + if let Some(val) = attr.value.as_ref().and_then(|v| v.value.as_ref()) { + let type_id = match val { + Value::StringValue(_) => BondDataType::BT_STRING as u8, + Value::IntValue(_) => BondDataType::BT_INT32 as u8, + Value::DoubleValue(_) => BondDataType::BT_FLOAT as u8, // TODO - using float for now + Value::BoolValue(_) => BondDataType::BT_INT32 as u8, // representing bool as int + _ => continue, + }; + fields.push((attr.key.clone().into(), type_id)); + } + } + fields.sort_by(|a, b| a.0.cmp(&b.0)); // Sort fields by name consistent schema ID generation + fields + .into_iter() + .enumerate() + .map(|(i, (name, type_id))| FieldDef { + name, + type_id, + field_id: (i + 1) as u16, + }) + .collect() + } + + /// Calculate schema ID from field specifications + fn calculate_schema_id(fields: &[FieldDef]) -> u64 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + + for field in fields { + field.name.hash(&mut hasher); + field.type_id.hash(&mut hasher); + } + + hasher.finish() + } + + /// Get or create schema with field ordering information + fn get_or_create_schema( + &self, + schema_id: u64, + field_info: Vec, + ) -> (CentralSchemaEntry, Vec) { + // Check cache first + if let Some((schema, schema_md5, field_info)) = + self.schema_cache.read().unwrap().get(&schema_id) + { + return ( + CentralSchemaEntry { + id: schema_id, + md5: *schema_md5, + schema: schema.clone(), + }, + field_info.clone(), + ); + } + + let schema = + BondEncodedSchema::from_fields("OtlpLogRecord", "telemetry", field_info.clone()); //TODO - use actual struct name and namespace + + let schema_bytes = schema.as_bytes(); + let schema_md5 = md5::compute(schema_bytes).0; + // Cache the schema and field info + { + let mut cache = self.schema_cache.write().unwrap(); + // TODO: Refactor to eliminate field_info duplication in cache + // The field information (name, type_id, order) is already stored in BondEncodedSchema's + // DynamicSchema.fields vector. We should: + // 1. Ensure DynamicSchema maintains fields in sorted order + // 2. Add a method to BondEncodedSchema to iterate fields for row encoding + // 3. Remove field_info from cache tuple to reduce memory usage and cloning overhead + // This would require updating write_row_data() to work with DynamicSchema fields directly + cache.insert(schema_id, (schema.clone(), schema_md5, field_info.clone())); + } + + let schema_bytes = schema.as_bytes(); + let schema_md5 = md5::compute(schema_bytes).0; + + ( + CentralSchemaEntry { + id: schema_id, + md5: schema_md5, + schema, + }, + field_info, + ) + } + + /// Write row data directly from LogRecord + fn write_row_data(&self, log: &LogRecord, sorted_fields: &[FieldDef]) -> Vec { + let mut buffer = Vec::with_capacity(sorted_fields.len() * 50); //TODO - estimate better + + for field in sorted_fields { + match field.name.as_ref() { + FIELD_ENV_NAME => BondWriter::write_string(&mut buffer, "TestEnv"), // TODO - placeholder for actual env name + FIELD_ENV_VER => BondWriter::write_string(&mut buffer, "4.0"), // TODO - placeholder for actual env version + FIELD_TIMESTAMP | FIELD_ENV_TIME => { + let dt = Self::format_timestamp(log.observed_time_unix_nano); + BondWriter::write_string(&mut buffer, &dt); + } + FIELD_TRACE_ID => { + let hex_bytes = Self::encode_id_to_hex::<32>(&log.trace_id); + let hex_str = std::str::from_utf8(&hex_bytes).unwrap(); + BondWriter::write_string(&mut buffer, hex_str); + } + FIELD_SPAN_ID => { + let hex_bytes = Self::encode_id_to_hex::<16>(&log.span_id); + let hex_str = std::str::from_utf8(&hex_bytes).unwrap(); + BondWriter::write_string(&mut buffer, hex_str); + } + FIELD_TRACE_FLAGS => { + BondWriter::write_int32(&mut buffer, log.flags as i32); + } + FIELD_NAME => { + BondWriter::write_string(&mut buffer, &log.event_name); + } + FIELD_SEVERITY_NUMBER => BondWriter::write_int32(&mut buffer, log.severity_number), + FIELD_SEVERITY_TEXT => { + BondWriter::write_string(&mut buffer, &log.severity_text); + } + FIELD_BODY => { + // TODO - handle all types of body values - For now, we only handle string values + if let Some(body) = &log.body { + if let Some(Value::StringValue(s)) = &body.value { + BondWriter::write_string(&mut buffer, s); + } + } + } + _ => { + // Handle dynamic attributes + // TODO - optimize better - we could update determine_fields to also return a vec of bytes which has bond serialized attributes + if let Some(attr) = log.attributes.iter().find(|a| a.key == field.name) { + self.write_attribute_value(&mut buffer, attr, field.type_id); + } + } + } + } + + buffer + } + + fn encode_id_to_hex(id: &[u8]) -> [u8; N] { + let mut hex_bytes = [0u8; N]; + hex::encode_to_slice(id, &mut hex_bytes).unwrap(); + hex_bytes + } + + /// Format timestamp from nanoseconds + fn format_timestamp(nanos: u64) -> String { + let secs = (nanos / 1_000_000_000) as i64; + let nsec = (nanos % 1_000_000_000) as u32; + Utc.timestamp_opt(secs, nsec) + .single() + .unwrap_or_else(|| Utc.timestamp_opt(0, 0).single().unwrap()) + .to_rfc3339() + } + + /// Write attribute value based on its type + fn write_attribute_value( + &self, + buffer: &mut Vec, + attr: &opentelemetry_proto::tonic::common::v1::KeyValue, + expected_type: u8, + ) { + const BT_STRING: u8 = BondDataType::BT_STRING as u8; + const BT_FLOAT: u8 = BondDataType::BT_FLOAT as u8; + const BT_DOUBLE: u8 = BondDataType::BT_DOUBLE as u8; + const BT_INT32: u8 = BondDataType::BT_INT32 as u8; + const BT_WSTRING: u8 = BondDataType::BT_WSTRING as u8; + + if let Some(val) = &attr.value { + match (&val.value, expected_type) { + (Some(Value::StringValue(s)), BT_STRING) => BondWriter::write_string(buffer, s), + (Some(Value::StringValue(s)), BT_WSTRING) => BondWriter::write_wstring(buffer, s), + (Some(Value::IntValue(i)), BT_INT32) => { + // TODO - handle i64 properly, for now we cast to i32 + BondWriter::write_int32(buffer, *i as i32) + } + (Some(Value::DoubleValue(d)), BT_FLOAT) => { + // TODO - handle double values properly + BondWriter::write_float(buffer, *d as f32) + } + (Some(Value::DoubleValue(d)), BT_DOUBLE) => BondWriter::write_double(buffer, *d), + (Some(Value::BoolValue(b)), BT_INT32) => { + // TODO - represent bool as BT_BOOL + BondWriter::write_bool_as_int32(buffer, *b) + } + _ => {} // TODO - handle more types + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use opentelemetry_proto::tonic::common::v1::{AnyValue, KeyValue}; + + #[test] + fn test_encoding() { + let encoder = OtlpEncoder::new(); + + let mut log = LogRecord { + observed_time_unix_nano: 1_700_000_000_000_000_000, + event_name: "test_event".to_string(), + severity_number: 9, + severity_text: "INFO".to_string(), + ..Default::default() + }; + + // Add some attributes + log.attributes.push(KeyValue { + key: "user_id".to_string(), + value: Some(AnyValue { + value: Some(Value::StringValue("user123".to_string())), + }), + }); + + log.attributes.push(KeyValue { + key: "request_count".to_string(), + value: Some(AnyValue { + value: Some(Value::IntValue(42)), + }), + }); + + let metadata = "namespace=testNamespace/eventVersion=Ver1v0"; + let result = encoder.encode_log_batch([log].iter(), metadata); + + assert!(!result.is_empty()); + } + + #[test] + fn test_schema_caching() { + let encoder = OtlpEncoder::new(); + + let log1 = LogRecord { + observed_time_unix_nano: 1_700_000_000_000_000_000, + severity_number: 9, + ..Default::default() + }; + + let mut log2 = LogRecord { + observed_time_unix_nano: 1_700_000_001_000_000_000, + severity_number: 10, + ..Default::default() + }; + + let metadata = "namespace=test"; + + // First encoding creates schema + let _result1 = encoder.encode_log_batch([log1].iter(), metadata); + assert_eq!(encoder.schema_cache.read().unwrap().len(), 1); + + // Second encoding with same schema structure reuses schema + let _result2 = encoder.encode_log_batch([log2.clone()].iter(), metadata); + assert_eq!(encoder.schema_cache.read().unwrap().len(), 1); + + // Add trace_id to create different schema + log2.trace_id = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]; + let _result3 = encoder.encode_log_batch([log2].iter(), metadata); + assert_eq!(encoder.schema_cache.read().unwrap().len(), 2); + } +} diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/uploader.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/uploader.rs deleted file mode 100644 index cf8753841..000000000 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/uploader.rs +++ /dev/null @@ -1,23 +0,0 @@ -use opentelemetry_proto::tonic::logs::v1::ResourceLogs; -use std::sync::Arc; - -/// A basic implementation of the log uploader. -pub struct GenevaUploader; - -impl GenevaUploader { - /// Upload logs to Geneva - pub async fn upload_logs(&self, logs: Vec) -> Result<(), String> { - // TODO: Process and send logs to Geneva - for log in &logs { - println!("Processing log: {:?}", log); - } - - // Simulate successful processing - Ok(()) - } -} - -/// Helper function to create an uploader instance. -pub fn create_uploader() -> Arc { - Arc::new(GenevaUploader) -} diff --git a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/Cargo.toml b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/Cargo.toml index 951ce2f89..001efa516 100644 --- a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/Cargo.toml +++ b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/Cargo.toml @@ -6,10 +6,17 @@ license = "Apache-2.0" rust-version = "1.75.0" [dependencies] -opentelemetry_sdk = { workspace = true, default-features = false, features = ["logs"] } +opentelemetry_sdk = {workspace = true, default-features = false, features = ["logs"]} opentelemetry-proto = {workspace = true, default-features = false, features = ["logs"]} geneva-uploader = {path = "../geneva-uploader/", version = "0.1.0"} +[dev-dependencies] +opentelemetry-appender-tracing = {workspace = true} +opentelemetry_sdk = { workspace = true, features = ["logs", "trace", "experimental_logs_batch_log_processor_with_async_runtime", "experimental_async_runtime", "rt-tokio"] } +tracing = { version = "0.1", default-features = false, features = ["std"] } +tracing-core = "0.1.31" +tracing-subscriber = { version = "0.3.0", default-features = false, features = ["env-filter", "fmt", "registry", "std"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } [lints] workspace = true \ No newline at end of file diff --git a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs new file mode 100644 index 000000000..faaf2b6e0 --- /dev/null +++ b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/examples/basic.rs @@ -0,0 +1,146 @@ +//! run with `$ cargo run --example basic + +use geneva_uploader::client::{GenevaClient, GenevaClientConfig}; +use geneva_uploader::AuthMethod; +use opentelemetry_appender_tracing::layer; +use opentelemetry_exporter_geneva::GenevaExporter; +use opentelemetry_sdk::logs::log_processor_with_async_runtime::BatchLogProcessor; +use opentelemetry_sdk::runtime::Tokio; +use opentelemetry_sdk::{ + logs::{BatchConfig, SdkLoggerProvider}, + Resource, +}; +use std::env; +use std::path::PathBuf; +use std::thread; +use std::time::Duration; +use tracing::{error, info, warn}; +use tracing_subscriber::{prelude::*, EnvFilter}; + +/* +export GENEVA_ENDPOINT="https://abc.azurewebsites.net" +export GENEVA_ENVIRONMENT="Test" +export GENEVA_ACCOUNT="myaccount" +export GENEVA_NAMESPACE="myns" +export GENEVA_REGION="eastus" +export GENEVA_CERT_PATH="/tmp/client.p12" +export GENEVA_CERT_PASSWORD="password" +export GENEVA_CONFIG_MAJOR_VERSION=2 +*/ + +#[tokio::main] +async fn main() { + let endpoint = env::var("GENEVA_ENDPOINT").expect("GENEVA_ENDPOINT is required"); + let environment = env::var("GENEVA_ENVIRONMENT").expect("GENEVA_ENVIRONMENT is required"); + let account = env::var("GENEVA_ACCOUNT").expect("GENEVA_ACCOUNT is required"); + let namespace = env::var("GENEVA_NAMESPACE").expect("GENEVA_NAMESPACE is required"); + let region = env::var("GENEVA_REGION").expect("GENEVA_REGION is required"); + let cert_path = + PathBuf::from(env::var("GENEVA_CERT_PATH").expect("GENEVA_CERT_PATH is required")); + let cert_password = env::var("GENEVA_CERT_PASSWORD").expect("GENEVA_CERT_PASSWORD is required"); + let config_major_version: u32 = env::var("GENEVA_CONFIG_MAJOR_VERSION") + .expect("GENEVA_CONFIG_MAJOR_VERSION is required") + .parse() + .expect("GENEVA_CONFIG_MAJOR_VERSION must be a u32"); + + let tenant = env::var("GENEVA_TENANT").unwrap_or_else(|_| "default-tenant".to_string()); + let role_name = env::var("GENEVA_ROLE_NAME").unwrap_or_else(|_| "default-role".to_string()); + let role_instance = + env::var("GENEVA_ROLE_INSTANCE").unwrap_or_else(|_| "default-instance".to_string()); + + let config = GenevaClientConfig { + endpoint, + environment, + account, + namespace, + region, + config_major_version, + auth_method: AuthMethod::Certificate { + path: cert_path, + password: cert_password, + }, + tenant, + role_name, + role_instance, + }; + + let geneva_client = GenevaClient::new(config) + .await + .expect("Failed to create GenevaClient"); + + let exporter = GenevaExporter::new(geneva_client); + let batch_processor = BatchLogProcessor::builder(exporter, Tokio) + .with_batch_config(BatchConfig::default()) + .build(); + + let provider: SdkLoggerProvider = SdkLoggerProvider::builder() + .with_resource( + Resource::builder() + .with_service_name("geneva-exporter-example") + .build(), + ) + .with_log_processor(batch_processor) + .build(); + + // To prevent a telemetry-induced-telemetry loop, OpenTelemetry's own internal + // logging is properly suppressed. However, logs emitted by external components + // (such as reqwest, tonic, etc.) are not suppressed as they do not propagate + // OpenTelemetry context. Until this issue is addressed + // (https://github.com/open-telemetry/opentelemetry-rust/issues/2877), + // filtering like this is the best way to suppress such logs. + // + // The filter levels are set as follows: + // - Allow `info` level and above by default. + // - Completely restrict logs from `hyper`, `tonic`, `h2`, and `reqwest`. + // + // Note: This filtering will also drop logs from these components even when + // they are used outside of the OTLP Exporter. + let filter_otel = EnvFilter::new("info") + .add_directive("hyper=off".parse().unwrap()) + .add_directive("opentelemetry=off".parse().unwrap()) + .add_directive("tonic=off".parse().unwrap()) + .add_directive("h2=off".parse().unwrap()) + .add_directive("reqwest=off".parse().unwrap()); + let otel_layer = layer::OpenTelemetryTracingBridge::new(&provider).with_filter(filter_otel); + + // Create a new tracing::Fmt layer to print the logs to stdout. It has a + // default filter of `info` level and above, and `debug` and above for logs + // from OpenTelemetry crates. The filter levels can be customized as needed. + let filter_fmt = EnvFilter::new("info") + .add_directive("hyper=debug".parse().unwrap()) + .add_directive("reqwest=debug".parse().unwrap()) + .add_directive("opentelemetry=debug".parse().unwrap()); + let fmt_layer = tracing_subscriber::fmt::layer() + .with_thread_names(true) + .with_filter(filter_fmt); + + tracing_subscriber::registry() + .with(otel_layer) + .with(fmt_layer) + .init(); + + // User registration event + info!(name: "Log", target: "my-system", event_id = 20, user_name = "user1", user_email = "user1@opentelemetry.io", message = "Registration successful"); + // User checkout event + info!(name: "Log", target: "my-system", event_id = 51, user_name = "user2", user_email = "user2@opentelemetry.io", message = "Checkout successful"); + // Login event + info!(name: "Log", target: "my-system", event_id = 30, user_name = "user3", user_email = "user3@opentelemetry.io", message = "User login successful"); + // Payment processed + info!(name: "Log", target: "my-system", event_id = 52, user_name = "user2", user_email = "user2@opentelemetry.io", message = "Payment processed successfully"); + // Error event - Failed login + error!(name: "Log", target: "my-system", event_id = 31, user_name = "user4", user_email = "user4@opentelemetry.io", message = "Login failed - invalid credentials"); + // Warning event - Cart abandoned + warn!(name: "Log", target: "my-system", event_id = 53, user_name = "user5", user_email = "user5@opentelemetry.io", message = "Shopping cart abandoned"); + + // Password reset + info!(name: "Log", target: "my-system", event_id = 32, user_name = "user1", user_email = "user1@opentelemetry.io", message = "Password reset requested"); + + // Order shipped + info!(name: "Log", target: "my-system", event_id = 54, user_name = "user2", user_email = "user2@opentelemetry.io", message = "Order shipped successfully"); + + // sleep for a while + println!("Sleeping for 5 seconds..."); + thread::sleep(Duration::from_secs(5)); + let _ = provider.shutdown(); + println!("Shutting down provider"); +} diff --git a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/src/logs/exporter.rs b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/src/logs/exporter.rs index 4d06b72fe..941cff4eb 100644 --- a/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/src/logs/exporter.rs +++ b/opentelemetry-exporter-geneva/opentelemetry-exporter-geneva/src/logs/exporter.rs @@ -1,35 +1,25 @@ use core::fmt; -use geneva_uploader::Uploader; +use geneva_uploader::client::GenevaClient; use opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema; use opentelemetry_proto::transform::logs::tonic::group_logs_by_resource_and_scope; -use opentelemetry_sdk::error::OTelSdkResult; +use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult}; use opentelemetry_sdk::logs::LogBatch; -use std::sync::{atomic, Arc}; +use std::sync::atomic; /// An OpenTelemetry exporter that writes logs to Geneva exporter pub struct GenevaExporter { resource: ResourceAttributesWithSchema, _is_shutdown: atomic::AtomicBool, - uploader: Arc, + geneva_client: GenevaClient, } impl GenevaExporter { /// Create a new GenavaExporter - pub fn new(uploader: Arc) -> Self { + pub fn new(geneva_client: GenevaClient) -> Self { Self { resource: ResourceAttributesWithSchema::default(), _is_shutdown: atomic::AtomicBool::new(false), - uploader, - } - } -} - -impl Default for GenevaExporter { - fn default() -> Self { - GenevaExporter { - resource: ResourceAttributesWithSchema::default(), - _is_shutdown: atomic::AtomicBool::new(false), - uploader: Arc::new(Uploader), + geneva_client, } } } @@ -41,13 +31,11 @@ impl fmt::Debug for GenevaExporter { } impl opentelemetry_sdk::logs::LogExporter for GenevaExporter { - /// Export logs to stdout - async fn export(&self, _batch: LogBatch<'_>) -> OTelSdkResult { - //serialize to otlp format - let otlp = group_logs_by_resource_and_scope(_batch, &self.resource); - //TODO send to Geneva using geneva-uploader - let _ = self.uploader.upload_logs(otlp).await; - + async fn export(&self, batch: LogBatch<'_>) -> OTelSdkResult { + let otlp = group_logs_by_resource_and_scope(batch, &self.resource); + if let Err(e) = self.geneva_client.upload_logs(&otlp).await { + return Err(OTelSdkError::InternalFailure(e)); + } Ok(()) }