diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ac43b3c8..038b3e27 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -160,7 +160,7 @@ jobs: with: repo-token: ${{ secrets.GITHUB_TOKEN }} - name: doc - run: cargo doc --no-deps --all-features + run: cargo doc --no-deps --all-features --exclude opentelemetry-geneva-exporter env: CARGO_INCREMENTAL: '0' RUSTDOCFLAGS: -Dwarnings @@ -233,4 +233,4 @@ jobs: with: tool: cargo-workspace-lints - name: cargo workspace-lints - run: cargo workspace-lints + run: cargo workspace-lints \ No newline at end of file diff --git a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml index f5c6adf4..ce7ad91f 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml +++ b/opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml @@ -15,7 +15,8 @@ native-tls = "0.2" thiserror = "2.0" chrono = "0.4" url = "2.2" - +md5 = "0.7.0" +smallvec = "1.15.0" [features] self_signed_certs = [] # Empty by default for security @@ -30,5 +31,9 @@ wiremock = "0.6" futures = "0.3" num_cpus = "1.16" +[build-dependencies] +cc = "1.0" + + [lints] workspace = true \ No newline at end of file diff --git a/opentelemetry-exporter-geneva/geneva-uploader/build.rs b/opentelemetry-exporter-geneva/geneva-uploader/build.rs new file mode 100644 index 00000000..8836c2d9 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/build.rs @@ -0,0 +1,19 @@ +use std::env; + +fn main() { + let bond_include = + env::var("BOND_INCLUDE_DIR").unwrap_or_else(|_| "/usr/local/include".to_string()); + let bond_lib = env::var("BOND_LIB_DIR").unwrap_or_else(|_| "/usr/local/lib".to_string()); + + cc::Build::new() + .cpp(true) + .file("src/payload_encoder/ffi/serialize_ffi.cpp") + .include("src/payload_encoder/ffi/") + .include(&bond_include) + .flag("-std=c++14") + .compile("bond_ffi"); + + println!("cargo:rustc-link-search=native={}", bond_lib); + println!("cargo:rustc-link-lib=bond"); + println!("cargo:rustc-link-lib=boost_system"); +} diff --git a/opentelemetry-exporter-geneva/geneva-uploader/scripts/build_bond_ci.sh b/opentelemetry-exporter-geneva/geneva-uploader/scripts/build_bond_ci.sh new file mode 100755 index 00000000..821f12e2 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/scripts/build_bond_ci.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +set -e # Exit immediately if a command exits with a non-zero status + +echo "================================================================================" +echo "IMPORTANT DISCLAIMER: Bond is deprecated. This script is intended only for CI testing and" +echo "should not be used in any other prod/non-prod environment." +echo "================================================================================" + +if [ "$GITHUB_ACTIONS" != "true" ]; then + echo "ERROR: This script should only be run in a GitHub Actions CI environment. Exiting." + exit 1 +fi + +for cmd in git cmake make; do + command -v $cmd >/dev/null 2>&1 || { echo >&2 "$cmd is required but not installed. Aborting."; exit 1; } +done + +# Allow custom install dir as first argument +INSTALL_DIR="${1:-$(pwd)/bond_build}" + +# Variables +BOND_REPO_URL="https://github.com/microsoft/bond.git" +BOND_CLONE_DIR="bond_repo" +BOND_BUILD_DIR="bond_build_temp" + +# Step 1: Clone the Bond repository if not already cloned +if [ ! -d "$BOND_CLONE_DIR" ]; then + echo "Cloning Bond repository..." + git clone --recurse-submodules "$BOND_REPO_URL" "$BOND_CLONE_DIR" +else + echo "Bond repository already cloned." +fi + +# Step 2: Create the build directory +mkdir -p "$BOND_BUILD_DIR" + +# Step 3: Build the Bond library +echo "Building Bond..." +cd "$BOND_BUILD_DIR" +cmake ../"$BOND_CLONE_DIR" -DCMAKE_INSTALL_PREFIX="$INSTALL_DIR" +make -j$(nproc) + +# Step 4: Install Bond locally +echo "Installing Bond locally in $INSTALL_DIR ..." +make install + +# Step 5: Display and export paths for integration +echo "Bond build and installation completed." +echo "Include directory: $INSTALL_DIR/include" +echo "Library directory: $INSTALL_DIR/lib" + +echo "" +echo "To use with your Rust build, export these variables:" +echo "export BOND_INCLUDE_DIR=\"$INSTALL_DIR/include\"" +echo "export BOND_LIB_DIR=\"$INSTALL_DIR/lib\"" +export BOND_INCLUDE_DIR="$INSTALL_DIR/include" +export BOND_LIB_DIR="$INSTALL_DIR/lib/bond" + +# Step 6: Clean up if required +# Uncomment the following lines to clean up after the build +# echo "Cleaning up temporary files..." +# cd .. +# rm -rf "$BOND_CLONE_DIR" "$BOND_BUILD_DIR" + +echo "Done." \ No newline at end of file diff --git a/opentelemetry-exporter-geneva/geneva-uploader/scripts/test_geneva_exporter.sh b/opentelemetry-exporter-geneva/geneva-uploader/scripts/test_geneva_exporter.sh new file mode 100644 index 00000000..26da351f --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/scripts/test_geneva_exporter.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +set -eu + +echo "Running tests for opentelemetry-exporter-geneva with --all-features" +cargo test -p opentelemetry-exporter-geneva --all-features --tests + +echo "Running doctests for opentelemetry-exporter-geneva with --all-features" +cargo test -p opentelemetry-exporter-geneva --all-features --doc \ No newline at end of file diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs index 86dfa582..8a853c05 100644 --- a/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs @@ -1,5 +1,6 @@ mod config_service; pub mod ingestion_service; +mod payload_encoder; mod uploader; diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/central_blob.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/central_blob.rs new file mode 100644 index 00000000..73962116 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/central_blob.rs @@ -0,0 +1,247 @@ +//use md5; + +use crate::payload_encoder::{EncoderRow as FFiEncoderRow, EncoderSchema as FFiEncoderSchema}; + +/// Helper to encode UTF-8 Rust str to UTF-16LE bytes +#[allow(dead_code)] +fn utf8_to_utf16le_bytes(s: &str) -> Vec { + // Each UTF-16 code unit is 2 bytes. For ASCII strings, the UTF-16 representation + // will be twice as large as UTF-8. For non-ASCII strings, UTF-16 may be more compact + // than UTF-8 in some cases, but to avoid reallocations we preallocate 2x len. + let mut buf = Vec::with_capacity(s.len() * 2); + for u in s.encode_utf16() { + buf.extend_from_slice(&u.to_le_bytes()); + } + buf +} + +/// Schema entry for central blob +#[allow(dead_code)] +pub(crate) struct CentralSchemaEntry { + pub id: u64, + pub md5: [u8; 16], + pub schema: FFiEncoderSchema, +} + +/// Event/row entry for central blob +#[allow(dead_code)] +pub(crate) struct CentralEventEntry { + pub schema_id: u64, + pub level: u8, + pub event_name: String, + pub row: FFiEncoderRow, +} + +#[allow(dead_code)] +pub(crate) struct CentralBlob { + pub version: u32, + pub format: u32, + pub metadata: String, // UTF-8, will be stored as UTF-16LE + pub schemas: Vec, + pub events: Vec, +} + +const TERMINATOR: u64 = 0xdeadc0dedeadc0de; + +/// CentralBlob Protocol Payload Structure +/// +/// The payload consists of a header, metadata, schemas, and events, each encoded in a specific format. +/// The central terminator constant used throughout is `TERMINATOR = 0xdeadc0dedeadc0de`. +/// +/// ## Payload Structure +/// +/// ### Header +/// - **Version**: `u32` (4 bytes) +/// - **Format**: `u32` (4 bytes) +/// +/// ### Metadata +/// - **Length**: `u32` (4 bytes, prefix for UTF-16LE encoded metadata) +/// - **Metadata**: UTF-16LE encoded string (variable length) +/// +/// ### Schemas +/// A collection of schema entries, each encoded as follows: +/// - **Entity Type**: `u16` (2 bytes, value = 0) +/// - **Schema ID**: `u64` (8 bytes, unique identifier) +/// - **MD5 Hash**: `[u8; 16]` (16 bytes, MD5 checksum of schema bytes) +/// - **Schema Length**: `u32` (4 bytes) +/// - **Schema Bytes**: `Vec` (variable length, schema serialized as bytes) +/// - **Terminator**: `u64` (8 bytes, constant `TERMINATOR`) +/// +/// ### Events +/// A collection of event entries, each encoded as follows: +/// - **Entity Type**: `u16` (2 bytes, value = 2) +/// - **Schema ID**: `u64` (8 bytes, links the event to a schema) +/// - **Level**: `u8` (1 byte, event verbosity or severity level) +/// - **Event Name Length**: `u16` (2 bytes, prefix for UTF-16LE encoded event name) +/// - **Event Name**: UTF-16LE encoded string (variable length) +/// - **Row Length**: `u32` (4 bytes, prefix for row data including `Simple Protocol` header) +/// - **Row Data**: +/// - **Simple Protocol Header**: `[0x53, 0x50, 0x01, 0x00]` (4 bytes) +/// - **Row Bytes**: `Vec` (variable length, row serialized as bytes) +/// - **Terminator**: `u64` (8 bytes, constant `TERMINATOR`) +/// + +impl CentralBlob { + #[allow(dead_code)] + pub(crate) fn to_bytes(&self) -> Vec { + // Estimate buffer size: + // - Header: 4 (version) + 4 (format) + // - Metadata: 4 (length prefix) + metadata_utf16.len() + // - Each schema: + // 2 (entity type, u16) + // + 8 (schema id, u64) + // + 16 (md5, [u8;16]) + // + 4 (schema bytes length, u32) + // + schema_bytes.len() + // + 8 (terminator, u64) + // - Each event: + // 2 (entity type, u16) + // + 8 (schema_id, u64) + // + 1 (level, u8) + // + 2 (event name length, u16) + // + event_name_utf16.len() + // + 4 (row length, u32) + // + 4 (Simple Protocol header) + // + row_bytes.len() + // + 8 (terminator, u64) + let meta_utf16 = utf8_to_utf16le_bytes(&self.metadata); + let events_with_utf16 = self + .events + .iter() + .map(|e| { + let evname_utf16 = utf8_to_utf16le_bytes(&e.event_name); + (e, evname_utf16) + }) + .collect::>(); + let mut estimated_size = 8 + 4 + meta_utf16.len(); + estimated_size += self + .schemas + .iter() + .map(|s| 2 + 8 + 16 + 4 + s.schema.as_bytes().len() + 8) + .sum::(); + estimated_size += events_with_utf16 + .iter() + .map(|(e, evname_utf16)| { + let row_len = { + let row_bytes = e.row.as_bytes(); + 4 + row_bytes.len() // SP header (4), row_bytes + }; + 2 + 8 + 1 + 2 + evname_utf16.len() + row_len + 8 + }) + .sum::(); + + let mut buf = Vec::with_capacity(estimated_size); + + // HEADER + buf.extend_from_slice(&self.version.to_le_bytes()); + buf.extend_from_slice(&self.format.to_le_bytes()); + + // METADATA (len, UTF-16LE bytes) + buf.extend_from_slice(&(meta_utf16.len() as u32).to_le_bytes()); + buf.extend_from_slice(&meta_utf16); + + // SCHEMAS (type 0) + for schema in &self.schemas { + buf.extend_from_slice(&0u16.to_le_bytes()); // entity type 0 + buf.extend_from_slice(&schema.id.to_le_bytes()); + buf.extend_from_slice(&schema.md5); + let schema_bytes = schema.schema.as_bytes(); + buf.extend_from_slice(&(schema_bytes.len() as u32).to_le_bytes()); + buf.extend_from_slice(schema_bytes); + buf.extend_from_slice(&TERMINATOR.to_le_bytes()); + } + + // EVENTS (type 2) + for (event, evname_utf16) in events_with_utf16 { + buf.extend_from_slice(&2u16.to_le_bytes()); // entity type 2 + buf.extend_from_slice(&event.schema_id.to_le_bytes()); + buf.push(event.level); + + // event name (UTF-16LE, prefixed with u16 len in bytes) + buf.extend_from_slice(&(evname_utf16.len() as u16).to_le_bytes()); + buf.extend_from_slice(&evname_utf16); + + // Add the Simple Protocol header before the row data + let row_bytes = event.row.as_bytes(); + + // Create a new buffer with the SP header + let mut modified_row = Vec::with_capacity(row_bytes.len() + 4); + modified_row.extend_from_slice(&[0x53, 0x50, 0x01, 0x00]); // Simple Protocol header + modified_row.extend_from_slice(row_bytes); + + // row (len, bytes) + buf.extend_from_slice(&(modified_row.len() as u32).to_le_bytes()); + buf.extend_from_slice(&modified_row); + + buf.extend_from_slice(&TERMINATOR.to_le_bytes()); + } + + buf + } +} + +// Example usage/test (can be moved to examples or tests) +#[cfg(test)] +mod tests { + use super::*; + use crate::payload_encoder::{EncoderRow, EncoderSchema}; + use md5; + + //Helper to calculate MD5 hash, returns [u8;16] + fn md5_bytes(data: &[u8]) -> [u8; 16] { + md5::compute(data).0 + } + + #[test] + fn test_central_blob_creation() { + // Prepare a schema + let fields = &[ + ("foo", 16u8, 1u16), // BT_INT32 + ("bar", 9u8, 2u16), // BT_STRING + ]; + let schema_obj = EncoderSchema::from_fields(fields); + let schema_bytes = schema_obj.as_bytes().to_vec(); + let schema_md5 = md5_bytes(&schema_bytes); + let schema_id = 1234u64; + + let schema = CentralSchemaEntry { + id: schema_id, + md5: schema_md5, + schema: schema_obj, + }; + + // Prepare a row + let mut row = Vec::new(); + row.extend_from_slice(&42i32.to_le_bytes()); + let s = "hello"; + row.extend_from_slice(&(s.len() as u32).to_le_bytes()); + row.extend_from_slice(s.as_bytes()); + + let row_obj = EncoderRow::from_schema_and_row(&schema.schema, &row); + + let event = CentralEventEntry { + schema_id, + level: 0, // e.g. ETW verbose + event_name: "eventname".to_string(), + row: row_obj, + }; + + // Metadata + let metadata = + "namespace=testNamespace/eventVersion=Ver1v0/tenant=T/role=R/roleinstance=RI"; + + // Build blob + let blob = CentralBlob { + version: 1, + format: 42, + metadata: metadata.to_string(), + schemas: vec![schema], + events: vec![event], + }; + + let payload = blob.to_bytes(); + + // Only assert that the payload is created and non-empty + assert!(!payload.is_empty()); + } +} diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/encoder.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/encoder.rs new file mode 100644 index 00000000..49c022b2 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/encoder.rs @@ -0,0 +1,673 @@ +use crate::payload_encoder::central_blob::{CentralBlob, CentralEventEntry, CentralSchemaEntry}; +use crate::payload_encoder::{EncoderRow, EncoderSchema}; +use smallvec::SmallVec; +use std::borrow::Cow; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; + +/// Supported value types for the encoder +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub enum ValueType<'a> { + Float(f32), + Int32(i32), + String(Cow<'a, str>), + Double(f64), + WString(Cow<'a, str>), + // TODO add more types as needed +} + +impl<'a> ValueType<'a> { + /// Get the Serializer type ID for this value + /// These values map to specific data types: BT_BOOL(2), BT_FLOAT(7), BT_DOUBLE(8), + /// BT_STRING(9), BT_WSTRING(18), BT_INT32(16), BT_INT64(17) + #[allow(dead_code)] + fn value_type_id(&self) -> u8 { + match self { + ValueType::Float(_) => 7, // BT_DOUBLE + ValueType::Double(_) => 8, // BT_DOUBLE + ValueType::Int32(_) => 16, // BT_INT32 + ValueType::String(_) => 9, // BT_STRING + ValueType::WString(_) => 18, // BT_WSTRING + } + } + + /// Write the value bytes to a buffer + #[allow(dead_code)] + fn write_to_buffer(&self, buffer: &mut Vec) { + match self { + ValueType::Float(v) => { + let bytes = v.to_le_bytes(); + buffer.extend_from_slice(&bytes); + } + ValueType::Int32(v) => { + let bytes = v.to_le_bytes(); + buffer.extend_from_slice(&bytes) + } + ValueType::Double(v) => { + let bytes = v.to_le_bytes(); + buffer.extend_from_slice(&bytes); + } + ValueType::String(v) => { + let utf8 = v.as_bytes(); + buffer.extend_from_slice(&(utf8.len() as u32).to_le_bytes()); + buffer.extend_from_slice(utf8); + } + ValueType::WString(v) => { + // Convert UTF-8 to UTF-16 + let utf16: Vec = v.encode_utf16().collect(); + // Write length of UTF-16 string (in code units, not bytes) + buffer.extend_from_slice(&(utf16.len() as u16).to_le_bytes()); + // Write UTF-16LE bytes + for code_unit in utf16 { + buffer.extend_from_slice(&code_unit.to_le_bytes()); + } + } // Add more serialization as needed + } + } +} + +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub struct EncoderField<'a> { + name: Cow<'a, str>, + value: ValueType<'a>, +} + +impl<'a> EncoderField<'a> { + /// Create a new field with borrowed or owned values + #[allow(dead_code)] + pub(crate) fn new(name: S, value: ValueType<'a>) -> Self + where + S: Into>, + { + Self { + name: name.into(), + value, + } + } + + /// Create field with static str and float value + #[allow(dead_code)] + pub fn float(name: &'static str, value: f32) -> Self { + Self { + name: Cow::Borrowed(name), + value: ValueType::Float(value), + } + } + + /// Create field with static str and int32 value + #[allow(dead_code)] + pub fn int32(name: &'static str, value: i32) -> Self { + Self { + name: Cow::Borrowed(name), + value: ValueType::Int32(value), + } + } + + /// Create field with static str and borrowed string value + #[allow(dead_code)] + pub fn string(name: &'static str, value: &'a str) -> Self { + Self { + name: Cow::Borrowed(name), + value: ValueType::String(Cow::Borrowed(value)), + } + } + + /// Create field with static str and owned string value + #[allow(dead_code)] + pub fn string_owned(name: &'static str, value: String) -> Self { + Self { + name: Cow::Borrowed(name), + value: ValueType::String(Cow::Owned(value)), + } + } + + /// Create field with static str and borrowed wstring value + #[allow(dead_code)] + pub fn wstring(name: &'static str, value: &'a str) -> Self { + Self { + name: Cow::Borrowed(name), + value: ValueType::WString(Cow::Borrowed(value)), + } + } + + /// Flexible method to create float field with any string type + #[allow(dead_code)] + pub fn float_with(name: S, value: f32) -> Self + where + S: Into>, + { + Self { + name: name.into(), + value: ValueType::Float(value), + } + } + + /// Flexible method to create int32 field with any string type + #[allow(dead_code)] + pub fn int32_with(name: S, value: i32) -> Self + where + S: Into>, + { + Self { + name: name.into(), + value: ValueType::Int32(value), + } + } + + /// Flexible method to create string field with any string type + #[allow(dead_code)] + pub fn string_with(name: S, value: T) -> Self + where + S: Into>, + T: Into>, + { + Self { + name: name.into(), + value: ValueType::String(value.into()), + } + } + + /// Flexible method to create wstring field with any string type + #[allow(dead_code)] + pub fn wstring_with(name: S, value: T) -> Self + where + S: Into>, + T: Into>, + { + Self { + name: name.into(), + value: ValueType::WString(value.into()), + } + } + + /// Flexible method to create double field with any string type + #[allow(dead_code)] + pub fn double_with(name: S, value: f64) -> Self + where + S: Into>, + { + Self { + name: name.into(), + value: ValueType::Double(value), + } + } + + /// Create any field type with flexible name and value types + #[allow(dead_code)] + pub fn new_any(name: S, value: V) -> Self + where + S: Into>, + V: Into>, + { + Self { + name: name.into(), + value: value.into(), + } + } +} + +// Implement From traits for common value types +impl<'a> From for ValueType<'a> { + #[allow(dead_code)] + fn from(v: i32) -> Self { + ValueType::Int32(v) + } +} + +impl<'a> From for ValueType<'a> { + #[allow(dead_code)] + fn from(v: f32) -> Self { + ValueType::Float(v) + } +} + +impl<'a> From for ValueType<'a> { + #[allow(dead_code)] + fn from(v: f64) -> Self { + ValueType::Double(v) + } +} + +impl<'a> From<&'a str> for ValueType<'a> { + #[allow(dead_code)] + fn from(v: &'a str) -> Self { + ValueType::String(Cow::Borrowed(v)) + } +} + +impl<'a> From for ValueType<'a> { + #[allow(dead_code)] + fn from(v: String) -> Self { + ValueType::String(Cow::Owned(v)) + } +} + +impl<'a> From> for ValueType<'a> { + #[allow(dead_code)] + fn from(v: Cow<'a, str>) -> Self { + ValueType::String(v) + } +} + +/// Field ordering information cached per schema +#[derive(Debug)] +struct FieldOrdering { + ordered_fields: Vec<(String, u16)>, // Field name and order ID +} + +/// The main encoder struct +#[allow(dead_code)] +pub struct Encoder { + schema_cache: Arc>>, + ordering_cache: Arc>>, +} + +impl Encoder { + /// Create a new encoder instance + #[allow(dead_code)] + pub fn new() -> Self { + Encoder { + schema_cache: Arc::new(RwLock::new(HashMap::new())), + ordering_cache: Arc::new(RwLock::new(HashMap::new())), + } + } + + /// Create a encoder blob from a simple key-value map + #[allow(dead_code)] + pub fn encode<'a>( + &self, + fields: &[EncoderField<'a>], + event_name: &str, + level: u8, + metadata: &str, + ) -> Vec { + // 1. Create or retrieve schema from data + let (schema_id, schema_entry) = self.create_schema_from_fields(fields); + + // 2. Create a row from the fields + let mut row_buffer = Vec::with_capacity(512); + + // Get cached field ordering + let field_ordering = { + let ordering_map = self.ordering_cache.read().unwrap(); + ordering_map.get(&schema_id).unwrap().ordered_fields.clone() + }; + // Build a map for quick field lookup by name + let mut field_map = HashMap::with_capacity(fields.len()); + for field in fields { + field_map.insert(field.name.as_ref(), &field.value); + } + + // Write values in schema order + for (field_name, _) in &field_ordering { + if let Some(value) = field_map.get(field_name.as_str()) { + value.write_to_buffer(&mut row_buffer); + } + } + + let row_obj = EncoderRow::from_schema_and_row(&schema_entry.schema, &row_buffer); + + // 3. Create event entry + let event = CentralEventEntry { + schema_id, + level, + event_name: event_name.to_string(), + row: row_obj, + }; + + // 4. Create the central blob + let blob = CentralBlob { + version: 1, + format: 2, + metadata: metadata.to_string(), + schemas: vec![schema_entry], + events: vec![event], + }; + + // 5. Return the serialized blob + blob.to_bytes() + } + + /// Create or retrieve a schema for the given data + #[allow(dead_code)] + fn create_schema_from_fields<'a>( + &self, + fields: &[EncoderField<'a>], + ) -> (u64, CentralSchemaEntry) { + // Create a stable order for fields - use SmallVec to avoid allocations for small field sets + let mut field_defs = SmallVec::<[(&str, u8, u16); 16]>::with_capacity(fields.len()); + + // Prepare field definitions for schema creation (sorted by name for deterministic schema) + for (i, field) in fields.iter().enumerate() { + field_defs.push(( + field.name.as_ref(), + field.value.value_type_id(), + (i + 1) as u16, + )); + } + + // Sort by name for deterministic schema creation + field_defs.sort_by(|a, b| a.0.cmp(b.0)); + + // Calculate a hash for the schema + let schema_id = self.calculate_schema_id(&field_defs); + + // 1. Try schema cache + if let Some(schema) = self.schema_cache.read().unwrap().get(&schema_id).cloned() { + let schema_bytes = schema.as_bytes(); // Using as_bytes() which was in the original code + let schema_md5 = self.md5_bytes(schema_bytes); + + return ( + schema_id, + CentralSchemaEntry { + id: schema_id, + md5: schema_md5, + schema: schema.clone(), + }, + ); + } + + // 2. Create new schema and cache it (write lock) + let schema = EncoderSchema::from_fields(&field_defs); + { + self.schema_cache + .write() + .unwrap() + .insert(schema_id, schema.clone()); + } + + // 3. Create and cache field ordering + let mut ordering = FieldOrdering { + ordered_fields: Vec::with_capacity(fields.len()), + }; + for (name, _, id) in &field_defs { + ordering.ordered_fields.push((name.to_string(), *id)); + } + ordering.ordered_fields.sort_by_key(|(_, id)| *id); + { + self.ordering_cache + .write() + .unwrap() + .entry(schema_id) + .or_insert(ordering); + } + + // 4. Create schema entry + let schema_bytes = schema.as_bytes(); // Using as_bytes() which was in the original code + let schema_md5 = self.md5_bytes(schema_bytes); + + ( + schema_id, + CentralSchemaEntry { + id: schema_id, + md5: schema_md5, + schema, + }, + ) + } + + /// Helper to calculate a schema ID based on the field definitions + #[allow(dead_code)] + fn calculate_schema_id(&self, field_defs: &[(&str, u8, u16)]) -> u64 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + for (name, type_id, field_id) in field_defs { + name.hash(&mut hasher); + type_id.hash(&mut hasher); + field_id.hash(&mut hasher); + } + + hasher.finish() + } + + /// Calculate MD5 hash of data + #[allow(dead_code)] + fn md5_bytes(&self, data: &[u8]) -> [u8; 16] { + md5::compute(data).0 + } + + #[cfg(test)] + #[allow(dead_code)] + pub fn schema_cache_size(&self) -> usize { + self.schema_cache.read().unwrap().len() + } +} + +/// Builder for creating Bond events with fluent API +#[allow(dead_code)] +pub struct EncoderEventBuilder<'a> { + encoder: Arc, + fields: Vec>, +} + +impl<'a> EncoderEventBuilder<'a> { + /// Create a new builder + #[allow(dead_code)] + pub(crate) fn new(encoder: Arc) -> Self { + Self { + encoder, + fields: Vec::with_capacity(16), + } + } + + /// Add a float field + #[allow(dead_code)] + pub fn add_float(mut self, name: &'static str, value: f32) -> Self { + self.fields.push(EncoderField::float(name, value)); + self + } + + /// Add an int32 field + #[allow(dead_code)] + pub fn add_int32(mut self, name: &'static str, value: i32) -> Self { + self.fields.push(EncoderField::int32(name, value)); + self + } + + /// Add a string field + #[allow(dead_code)] + pub fn add_string(mut self, name: &'static str, value: &'a str) -> Self { + self.fields.push(EncoderField::string(name, value)); + self + } + + /// Add a string field from owned String + #[allow(dead_code)] + pub fn add_string_owned(mut self, name: &'static str, value: String) -> Self { + self.fields.push(EncoderField::string_owned(name, value)); + self + } + + /// Add a wstring field + #[allow(dead_code)] + pub fn add_wstring(mut self, name: &'static str, value: &'a str) -> Self { + self.fields.push(EncoderField::wstring(name, value)); + self + } + + /// Add a custom field + #[allow(dead_code)] + pub fn add_field(mut self, field: EncoderField<'a>) -> Self { + self.fields.push(field); + self + } + + /// Build the event and return the encoded bytes + #[allow(dead_code)] + pub fn build(self, event_name: &str, level: u8, metadata: &str) -> Vec { + self.encoder + .encode(&self.fields, event_name, level, metadata) + } +} + +impl Encoder { + #[allow(dead_code)] + pub fn builder<'a>(self: &Arc) -> EncoderEventBuilder<'a> { + EncoderEventBuilder::new(self.clone()) + } +} + +mod tests { + #[allow(unused_imports)] + use super::*; + + #[test] + fn test_bond_encoder_with_fields() { + let encoder = Encoder::new(); + + let fields = [ + EncoderField::float("FloatCol", 3.1415), + EncoderField::int32("IntCol", 42), + EncoderField::string("StrCol", "hello"), + ]; + + let metadata = "namespace=testNamespace/eventVersion=Ver1v0"; + let payload = encoder.encode(&fields, "test_event", 1, metadata); + + // Basic validation that we got something + assert!(!payload.is_empty()); + } + + #[test] + fn test_bond_encoder_with_builder() { + let encoder = Arc::new(Encoder::new()); + + let payload = encoder + .builder() + .add_float("FloatCol", 3.1415) + .add_int32("IntCol", 42) + .add_string("StrCol", "hello") + .build( + "test_event", + 1, + "namespace=testNamespace/eventVersion=Ver1v0", + ); + + // Basic validation that we got something + assert!(!payload.is_empty()); + } + + #[test] + fn test_schema_caching() { + let mut encoder = Encoder::new(); + + // Create fields + let fields = [ + EncoderField::float("FloatCol", 3.1415), + EncoderField::int32("IntCol", 42), + EncoderField::string("StrCol", "hello"), + ]; + + // First encoding should create and cache the schema + let metadata = "namespace=testNamespace/eventVersion=Ver1v0"; + let payload1 = encoder.encode(&fields, "test_event", 1, metadata); + + // Check that we have one schema in the cache + assert_eq!(encoder.schema_cache_size(), 1); + + // Second encoding with the same fields (different values) should reuse the schema + let fields2 = [ + EncoderField::float("FloatCol", 2.7182), // Different value + EncoderField::int32("IntCol", 100), // Different value + EncoderField::string("StrCol", "world"), // Different value + ]; + + let _payload2 = encoder.encode(&fields2, "test_event", 1, metadata); + + // Schema cache should still have just one entry + assert_eq!(encoder.schema_cache_size(), 1); + + // Add a field to create a different schema + let fields3 = [ + EncoderField::float("FloatCol", 3.1415), + EncoderField::int32("IntCol", 42), + EncoderField::string("StrCol", "hello"), + EncoderField::int32("ExtraField", 99), // New field + ]; + + let _payload3 = encoder.encode(&fields3, "test_event", 1, metadata); + + // Schema cache should now have two entries + assert_eq!(encoder.schema_cache_size(), 2); + + // Different field order should be considered a different schema + let fields4 = [ + EncoderField::int32("IntCol", 42), // Order changed + EncoderField::string("StrCol", "hello"), // Order changed + EncoderField::float("FloatCol", 3.1415), // Order changed + ]; + + let _payload4 = encoder.encode(&fields4, "test_event", 1, metadata); + + // Field order doesn't matter for schema ID calculation (it's based on sorted field names) + // So we should still have just two schemas + assert_eq!(encoder.schema_cache_size(), 2); + + // Different field types should create a new schema + let fields5 = [ + EncoderField::float("FloatCol", 3.1415), + EncoderField::int32("IntCol", 42), + EncoderField::string("StrCol", "hello"), + EncoderField::float("ExtraField", 3.14), // Same name as in fields3 but different type + ]; + + let _payload5 = encoder.encode(&fields5, "test_event", 1, metadata); + + // Schema cache should now have three entries + assert_eq!(encoder.schema_cache_size(), 3); + } + + #[test] + fn test_ordering_cache() { + let encoder = Encoder::new(); + + // Create fields with specific order + let fields = [ + EncoderField::float("FloatCol", 3.1415), + EncoderField::int32("IntCol", 42), + EncoderField::string("StrCol", "hello"), + ]; + + // First encoding should create and cache the schema and ordering + let metadata = "namespace=testNamespace"; + let payload1 = encoder.encode(&fields, "test_event", 1, metadata); + + // Change field values but use same field structure + let fields2 = [ + EncoderField::float("FloatCol", 99.9), + EncoderField::int32("IntCol", 123), + EncoderField::string("StrCol", "world"), + EncoderField::string("StrCol", "world"), + ]; + + // Re-encode with same structure but different values + let payload2 = encoder.encode(&fields2, "test_event", 1, metadata); + + // Payloads should be different due to different values + assert_ne!(payload1, payload2); + + // But schema cache should still have just one entry + assert_eq!(encoder.schema_cache_size(), 1); + + // Now try with fields in different order + let fields3 = [ + EncoderField::string("StrCol", "hello"), // Changed order + EncoderField::int32("IntCol", 42), // Changed order + EncoderField::float("FloatCol", 3.1415), // Changed order + ]; + + let payload3 = encoder.encode(&fields3, "test_event", 1, metadata); + + // Since field order is normalized in schema creation, + // we should still have just one schema + assert_eq!(encoder.schema_cache_size(), 1); + + // But even more importantly, the payloads should be identical + // because the field ordering is preserved from the first encoding + assert_eq!(payload1, payload3); + } +} diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/ffi.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/ffi.rs new file mode 100644 index 00000000..bc58fa89 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/ffi.rs @@ -0,0 +1,27 @@ +use std::os::raw::c_void; + +#[repr(C)] +pub struct SchemaResult { + pub schema_ptr: *mut c_void, + pub schema_bytes: *mut c_void, + pub schema_bytes_len: usize, +} + +unsafe extern "C" { + pub fn marshal_schema_ffi( + schema_buf: *const c_void, + schema_len: usize, + out_len: *mut usize, + ) -> *mut SchemaResult; + + pub fn marshal_row_ffi( + schema_ptr: *mut c_void, // NOTE: This is the schema_ptr from SchemaResult! + row_buf: *const c_void, + row_len: usize, + out_len: *mut usize, + ) -> *mut c_void; + + pub fn free_row_buf_ffi(ptr: *mut c_void); + + pub fn free_schema_buf_ffi(ptr: *mut SchemaResult); +} diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/ffi/serialize_ffi.cpp b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/ffi/serialize_ffi.cpp new file mode 100644 index 00000000..ccc83b34 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/ffi/serialize_ffi.cpp @@ -0,0 +1,234 @@ +#include "serialize_ffi.h" + +// Suppress third-party warnings: -Wignored-qualifiers, -Wdeprecated-copy (GCC/Clang) +#ifdef __GNUC__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wignored-qualifiers" // Ignore qualifiers on cast result (internal) +#pragma GCC diagnostic ignored "-Wdeprecated-copy" // Ignore deprecated copy assignment (internal) +#endif + +// Suppress third-party warnings on MSVC: 4267 (size_t to int), 4244 (type conversion), 4996 (deprecated) +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable: 4267) // Conversion from 'size_t' to 'type', possible loss of data (internal) +#pragma warning(disable: 4244) // Conversion from 'type1' to 'type2', possible loss of data (internal) +#pragma warning(disable: 4996) // Deprecated functions or unsafe functions (internal) +#endif + +#include +#include +#include + +#ifdef __GNUC__ +#pragma GCC diagnostic pop +#endif + +#ifdef _MSC_VER +#pragma warning(pop) +#endif +#include +#include +#include +#include +#include +#include + +namespace { + +struct Field { + std::string name; + uint8_t type; + uint16_t id; +}; + +std::vector parse_schema(const uint8_t* ptr, size_t len, size_t& out_count) { + if (len < 2) throw std::runtime_error("schema too short"); + out_count = ptr[0] | (ptr[1] << 8); + std::vector fields; + ptr += 2; + len -= 2; + for (size_t i = 0; i < static_cast(out_count); ++i){ + if (len < 1) throw std::runtime_error("not enough data for name_len"); + uint8_t name_len = *ptr++; + len--; + if (len < size_t(name_len) + 1 + 2) throw std::runtime_error("not enough data for field"); + std::string name(reinterpret_cast(ptr), name_len); + ptr += name_len; + len -= name_len; + uint8_t type = *ptr++; + len--; + uint16_t id = ptr[0] | (ptr[1] << 8); + ptr += 2; + len -= 2; + fields.push_back({name, type, id}); + } + return fields; +} + +} // namespace + +extern "C" SchemaResult* marshal_schema_ffi(const void* schema_buf, size_t schema_len, size_t* out_len) { + try { + const uint8_t* ptr = reinterpret_cast(schema_buf); + size_t field_count; + auto fields = parse_schema(ptr, schema_len, field_count); + + bond::StructDef struct_def; + struct_def.metadata.name = "MdsContainer"; + struct_def.metadata.qualified_name = "testNamespace.MdsContainer"; + struct_def.metadata.attributes = {}; + struct_def.metadata.modifier = bond::Modifier::Optional; + + for (const auto& f : fields) { + bond::FieldDef fd; + fd.id = f.id; + fd.metadata.name = f.name; + fd.type.bonded_type = false; + fd.type.id = static_cast(f.type); + struct_def.fields.push_back(std::move(fd)); + } + + // Allocate schemaDef on heap + auto schemaDef = std::make_unique(); + schemaDef->root.id = bond::BT_STRUCT; + schemaDef->root.bonded_type = false; + schemaDef->structs.push_back(struct_def); + + // Marshal to buffer + bond::OutputBuffer buf; + bond::SimpleBinaryWriter writer(buf); + bond::Marshal(*schemaDef, writer); + + // Copy marshaled bytes + auto marshaled = buf.GetBuffer(); + void* bytes = malloc(marshaled.size()); + if (!bytes && marshaled.size()) throw std::bad_alloc(); + std::memcpy(bytes, marshaled.data(), marshaled.size()); + + // Create SchemaResult and fill fields + SchemaResult* result = new SchemaResult; + result->schema_bytes = bytes; + result->schema_bytes_len = marshaled.size(); + result->schema_ptr = schemaDef.release(); + + *out_len = result->schema_bytes_len; + return result; + } catch (...) { + *out_len = 0; + return nullptr; + } + } + +extern "C" void* marshal_row_ffi(void* schema_ptr, + const void* row_buf, size_t row_len, + size_t* out_len) { + try { + bond::SchemaDef* schemaDef = static_cast(schema_ptr); + if (!schemaDef || schemaDef->structs.empty()) + throw std::runtime_error("invalid or empty schema"); + + const auto& fields = schemaDef->structs[0].fields; + const uint8_t* ptr = reinterpret_cast(row_buf); + size_t remain = row_len; + + bond::OutputBuffer buf; + bond::SimpleBinaryWriter writer(buf); + writer.WriteStructBegin(schemaDef->structs[0].metadata, false); + + for (const auto& f : fields) { + std::cout << "Processing field: " << f.metadata.name << " type " << f.type.id << std::endl; + switch (f.type.id) { + case bond::BT_DOUBLE: { + std::cout << "Field is double remain:" << remain << std::endl; + if (remain < 8) throw std::runtime_error("row too short"); + double v; + std::memcpy(&v, ptr, 8); + writer.Write(v); + ptr += 8; remain -= 8; + break; + } + case bond::BT_INT32: { + std::cout << "Field is int32 remain:" << remain << std::endl; + if (remain < 4) throw std::runtime_error("row too short"); + int32_t v; + std::memcpy(&v, ptr, 4); + writer.Write(v); + ptr += 4; remain -= 4; + break; + } + case bond::BT_FLOAT: { + std::cout << "Field is float remain:" << remain << std::endl; + if (remain < 4) throw std::runtime_error("row too short for float"); + float v; + std::memcpy(&v, ptr, 4); + writer.Write(v); + ptr += 4; remain -= 4; + break; + } + case bond::BT_STRING: { + std::cout << "Field is string remain:" << remain << std::endl; + if (remain < 4) throw std::runtime_error("row too short for string len"); + uint32_t slen = ptr[0] | (ptr[1] << 8) | (ptr[2] << 16) | (ptr[3] << 24); + ptr += 4; remain -= 4; + if (remain < slen) throw std::runtime_error("row too short for string bytes"); + std::string s(reinterpret_cast(ptr), slen); + writer.Write(s); + ptr += slen; remain -= slen; + break; + } + case bond::BT_WSTRING: { + std::cout << "Field is wstring remain:" << remain << std::endl; + if (remain < 2) throw std::runtime_error("row too short for wstring len"); + uint16_t slen = ptr[0] | (ptr[1] << 8); // Read length in code units + ptr += 2; remain -= 2; + + // For UTF-16, each character is 2 bytes + size_t byte_len = slen * 2; + if (remain < byte_len) throw std::runtime_error("row too short for wstring bytes"); + + // Create a u16string from the UTF-16LE encoded bytes + std::u16string ws; + ws.reserve(slen); + for (size_t i = 0; i < slen; i++) { + char16_t c = ptr[i*2] | (ptr[i*2+1] << 8); + ws.push_back(c); + } + + writer.Write(ws); + ptr += byte_len; remain -= byte_len; + break; + } + // Extend here for more Seializable types as needed + default: { + std::cout << "Unsupported type id: " << static_cast(f.type.id) << std::endl; + throw std::runtime_error("unsupported type id"); + } + } + } + writer.WriteStructEnd(); + + // Copy the Serialized buffer directly to a malloc'd buffer + auto output = buf.GetBuffer(); + *out_len = output.size(); + void* out = malloc(*out_len); + if (!out && *out_len) throw std::bad_alloc(); + std::memcpy(out, output.data(), *out_len); + return out; + } catch (...) { + *out_len = 0; + return nullptr; + } +} + +extern "C" void free_row_buf_ffi(void* ptr) { + std::free(ptr); +} + + +extern "C" void free_schema_buf_ffi(SchemaResult* result) { + if (result) { + if (result->schema_bytes) free(result->schema_bytes); + delete static_cast(result->schema_ptr); + delete result; + } +} \ No newline at end of file diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/ffi/serialize_ffi.h b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/ffi/serialize_ffi.h new file mode 100644 index 00000000..4d05686a --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/ffi/serialize_ffi.h @@ -0,0 +1,41 @@ +#pragma once +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct SchemaResult { + void* schema_ptr; // Opaque pointer to SchemaDef + void* schema_bytes; // Marshaled bytes (for Rust) + size_t schema_bytes_len; +}; + +// Marshals a schema buffer describing the fields into a schema blob and a schema pointer. +// schema_buf: pointer to binary schema description +// schema_len: length of schema_buf +// out_len: output parameter to receive size of returned buffer +// Returns: malloc-allocated pointer to SchemaResult (must be freed via free_schema_buf_ffi) +SchemaResult* marshal_schema_ffi(const void* schema_buf, size_t schema_len, size_t* out_len); + +// Marshals a data row using a schema pointer and a binary row buffer. +// schema_ptr: pointer to the SchemaDef (from SchemaResult->schema_ptr) +// row_buf: pointer to binary row data (field values in schema order) +// row_len: length of row_buf +// out_len: output parameter to receive size of returned buffer +// Returns: malloc-allocated pointer to serialized row blob (must be freed via free_row_buf_ffi) +void* marshal_row_ffi(void* schema_ptr, + const void* row_buf, size_t row_len, + size_t* out_len); + +// Frees a buffer allocated by the above functions. +void free_row_buf_ffi(void* ptr); + +// Frees a SchemaResult structure and its contents. +void free_schema_buf_ffi(SchemaResult* result); + + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/mod.rs b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/mod.rs new file mode 100644 index 00000000..d8c21803 --- /dev/null +++ b/opentelemetry-exporter-geneva/geneva-uploader/src/payload_encoder/mod.rs @@ -0,0 +1,137 @@ +pub mod central_blob; +pub mod encoder; +mod ffi; + +//use smallvec::SmallVec; +use std::slice; + +#[allow(dead_code)] +pub(crate) struct EncoderSchema { + schema_result: *mut ffi::SchemaResult, + fields: Vec<(String, u8, u16)>, // (name, type, id) +} + +#[allow(dead_code)] +pub(crate) struct EncoderRow { + bytes: Vec, +} + +impl Clone for EncoderSchema { + fn clone(&self) -> Self { + EncoderSchema::from_fields( + &self + .fields + .iter() + .map(|(n, t, i)| (n.as_str(), *t, *i)) + .collect::>(), + ) + } +} + +impl Drop for EncoderSchema { + fn drop(&mut self) { + unsafe { + if !self.schema_result.is_null() { + ffi::free_schema_buf_ffi(self.schema_result); + } + } + } +} + +impl EncoderSchema { + #[allow(dead_code)] + pub(crate) fn from_fields(fields: &[(&str, u8, u16)]) -> Self { + let mut buf = Vec::new(); + buf.extend_from_slice(&(fields.len() as u16).to_le_bytes()); + for (name, typ, id) in fields { + buf.push(name.len() as u8); + buf.extend_from_slice(name.as_bytes()); + buf.push(*typ); + buf.extend_from_slice(&id.to_le_bytes()); + } + let mut out_len = 0usize; + let schema_result = + unsafe { ffi::marshal_schema_ffi(buf.as_ptr() as *const _, buf.len(), &mut out_len) }; + assert!(!schema_result.is_null()); + let fields = fields + .iter() + .map(|(name, typ, id)| (name.to_string(), *typ, *id)) + .collect(); + EncoderSchema { + schema_result, + fields, + } + } + + #[allow(dead_code)] + pub(crate) fn as_bytes(&self) -> &[u8] { + unsafe { + let schema = &*self.schema_result; + std::slice::from_raw_parts(schema.schema_bytes as *const u8, schema.schema_bytes_len) + } + } +} + +impl EncoderRow { + #[allow(dead_code)] + pub(crate) fn from_schema_and_row(schema: &EncoderSchema, row: &[u8]) -> Self { + let mut out_len = 0usize; + let ptr = unsafe { + let schema = &*schema.schema_result; + ffi::marshal_row_ffi( + schema.schema_ptr, + row.as_ptr() as *const _, + row.len(), + &mut out_len, + ) + }; + assert!(!ptr.is_null()); + let bytes = unsafe { + let slice = slice::from_raw_parts(ptr as *const u8, out_len); + let v = slice.to_vec(); + ffi::free_row_buf_ffi(ptr); + v + }; + EncoderRow { bytes } + } + #[allow(dead_code)] + pub(crate) fn as_bytes(&self) -> &[u8] { + &self.bytes + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_schema_from_fields() { + let fields = &[ + ("foo", 16u8, 1u16), // BT_INT32 = 16 + ("bar", 9u8, 2u16), // BT_STRING = 9 + ]; + let schema = EncoderSchema::from_fields(fields); + // Should be accessible + assert!(!schema.as_bytes().is_empty()); + } + + #[test] + fn test_row_from_schema_and_row() { + let fields = &[ + ("foo", 16u8, 1u16), // BT_INT32 = 16 + ("bar", 9u8, 2u16), // BT_STRING = 9 + ]; + let schema = EncoderSchema::from_fields(fields); + + // Compose a row: foo = 42i32; bar = "hello" + let mut row = Vec::new(); + row.extend_from_slice(&42i32.to_le_bytes()); // foo + let s = "hello"; + row.extend_from_slice(&(s.len() as u16).to_le_bytes()); // bar string length (u16 LE) + row.extend_from_slice(s.as_bytes()); // bar string bytes + + let bond_row = EncoderRow::from_schema_and_row(&schema, &row); + assert!(!bond_row.bytes.is_empty()); + assert!(!bond_row.as_bytes().is_empty()); + } +} diff --git a/scripts/test.sh b/scripts/test.sh index b3595094..4fca4ea5 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -3,7 +3,7 @@ set -eu echo "Running tests for all packages in workspace with --all-features" -cargo test --workspace --all-features --tests +cargo test --workspace --all-features --tests --exclude opentelemetry-exporter-geneva echo "Running doctests for all packages in workspace with --all-features" -cargo test --workspace --all-features --doc +cargo test --workspace --all-features --doc --exclude opentelemetry-exporter-geneva