Skip to content

feat: [Geneva Exporter] - [WIP] Add Serializer #261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ jobs:
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: doc
run: cargo doc --no-deps --all-features
run: cargo doc --no-deps --all-features --exclude opentelemetry-geneva-exporter
env:
CARGO_INCREMENTAL: '0'
RUSTDOCFLAGS: -Dwarnings
Expand Down Expand Up @@ -233,4 +233,4 @@ jobs:
with:
tool: cargo-workspace-lints
- name: cargo workspace-lints
run: cargo workspace-lints
run: cargo workspace-lints
7 changes: 6 additions & 1 deletion opentelemetry-exporter-geneva/geneva-uploader/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ native-tls = "0.2"
thiserror = "2.0"
chrono = "0.4"
url = "2.2"

md5 = "0.7.0"
smallvec = "1.15.0"

[features]
self_signed_certs = [] # Empty by default for security
Expand All @@ -30,5 +31,9 @@ wiremock = "0.6"
futures = "0.3"
num_cpus = "1.16"

[build-dependencies]
cc = "1.0"


[lints]
workspace = true
19 changes: 19 additions & 0 deletions opentelemetry-exporter-geneva/geneva-uploader/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::env;

fn main() {
let bond_include =
env::var("BOND_INCLUDE_DIR").unwrap_or_else(|_| "/usr/local/include".to_string());
let bond_lib = env::var("BOND_LIB_DIR").unwrap_or_else(|_| "/usr/local/lib".to_string());

cc::Build::new()
.cpp(true)
.file("src/payload_encoder/ffi/serialize_ffi.cpp")
.include("src/payload_encoder/ffi/")
.include(&bond_include)
.flag("-std=c++14")
.compile("bond_ffi");

println!("cargo:rustc-link-search=native={}", bond_lib);
println!("cargo:rustc-link-lib=bond");
println!("cargo:rustc-link-lib=boost_system");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/bash

set -e # Exit immediately if a command exits with a non-zero status

echo "================================================================================"
echo "IMPORTANT DISCLAIMER: Bond is deprecated. This script is intended only for CI testing and"
echo "should not be used in any other prod/non-prod environment."
echo "================================================================================"

if [ "$GITHUB_ACTIONS" != "true" ]; then
echo "ERROR: This script should only be run in a GitHub Actions CI environment. Exiting."
exit 1
fi

for cmd in git cmake make; do
command -v $cmd >/dev/null 2>&1 || { echo >&2 "$cmd is required but not installed. Aborting."; exit 1; }
done

# Allow custom install dir as first argument
INSTALL_DIR="${1:-$(pwd)/bond_build}"

# Variables
BOND_REPO_URL="https://github.com/microsoft/bond.git"
BOND_CLONE_DIR="bond_repo"
BOND_BUILD_DIR="bond_build_temp"

# Step 1: Clone the Bond repository if not already cloned
if [ ! -d "$BOND_CLONE_DIR" ]; then
echo "Cloning Bond repository..."
git clone --recurse-submodules "$BOND_REPO_URL" "$BOND_CLONE_DIR"
else
echo "Bond repository already cloned."
fi

# Step 2: Create the build directory
mkdir -p "$BOND_BUILD_DIR"

# Step 3: Build the Bond library
echo "Building Bond..."
cd "$BOND_BUILD_DIR"
cmake ../"$BOND_CLONE_DIR" -DCMAKE_INSTALL_PREFIX="$INSTALL_DIR"
make -j$(nproc)

# Step 4: Install Bond locally
echo "Installing Bond locally in $INSTALL_DIR ..."
make install

# Step 5: Display and export paths for integration
echo "Bond build and installation completed."
echo "Include directory: $INSTALL_DIR/include"
echo "Library directory: $INSTALL_DIR/lib"

echo ""
echo "To use with your Rust build, export these variables:"
echo "export BOND_INCLUDE_DIR=\"$INSTALL_DIR/include\""
echo "export BOND_LIB_DIR=\"$INSTALL_DIR/lib\""
export BOND_INCLUDE_DIR="$INSTALL_DIR/include"
export BOND_LIB_DIR="$INSTALL_DIR/lib/bond"

# Step 6: Clean up if required
# Uncomment the following lines to clean up after the build
# echo "Cleaning up temporary files..."
# cd ..
# rm -rf "$BOND_CLONE_DIR" "$BOND_BUILD_DIR"

echo "Done."
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

set -eu

echo "Running tests for opentelemetry-exporter-geneva with --all-features"
cargo test -p opentelemetry-exporter-geneva --all-features --tests

echo "Running doctests for opentelemetry-exporter-geneva with --all-features"
cargo test -p opentelemetry-exporter-geneva --all-features --doc
1 change: 1 addition & 0 deletions opentelemetry-exporter-geneva/geneva-uploader/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod config_service;
pub mod ingestion_service;
mod payload_encoder;

mod uploader;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
//use md5;

use crate::payload_encoder::{EncoderRow as FFiEncoderRow, EncoderSchema as FFiEncoderSchema};

/// Helper to encode UTF-8 Rust str to UTF-16LE bytes
#[allow(dead_code)]
fn utf8_to_utf16le_bytes(s: &str) -> Vec<u8> {
// Each UTF-16 code unit is 2 bytes. For ASCII strings, the UTF-16 representation
// will be twice as large as UTF-8. For non-ASCII strings, UTF-16 may be more compact
// than UTF-8 in some cases, but to avoid reallocations we preallocate 2x len.
let mut buf = Vec::with_capacity(s.len() * 2);
for u in s.encode_utf16() {
buf.extend_from_slice(&u.to_le_bytes());
}
buf
}

/// Schema entry for central blob
#[allow(dead_code)]
pub(crate) struct CentralSchemaEntry {
pub id: u64,
pub md5: [u8; 16],
pub schema: FFiEncoderSchema,
}

/// Event/row entry for central blob
#[allow(dead_code)]
pub(crate) struct CentralEventEntry {
pub schema_id: u64,
pub level: u8,
pub event_name: String,
pub row: FFiEncoderRow,
}

#[allow(dead_code)]
pub(crate) struct CentralBlob {
pub version: u32,
pub format: u32,
pub metadata: String, // UTF-8, will be stored as UTF-16LE
pub schemas: Vec<CentralSchemaEntry>,
pub events: Vec<CentralEventEntry>,
}

const TERMINATOR: u64 = 0xdeadc0dedeadc0de;

/// CentralBlob Protocol Payload Structure
///
/// The payload consists of a header, metadata, schemas, and events, each encoded in a specific format.
/// The central terminator constant used throughout is `TERMINATOR = 0xdeadc0dedeadc0de`.
///
/// ## Payload Structure
///
/// ### Header
/// - **Version**: `u32` (4 bytes)
/// - **Format**: `u32` (4 bytes)
///
/// ### Metadata
/// - **Length**: `u32` (4 bytes, prefix for UTF-16LE encoded metadata)
/// - **Metadata**: UTF-16LE encoded string (variable length)
///
/// ### Schemas
/// A collection of schema entries, each encoded as follows:
/// - **Entity Type**: `u16` (2 bytes, value = 0)
/// - **Schema ID**: `u64` (8 bytes, unique identifier)
/// - **MD5 Hash**: `[u8; 16]` (16 bytes, MD5 checksum of schema bytes)
/// - **Schema Length**: `u32` (4 bytes)
/// - **Schema Bytes**: `Vec<u8>` (variable length, schema serialized as bytes)
/// - **Terminator**: `u64` (8 bytes, constant `TERMINATOR`)
///
/// ### Events
/// A collection of event entries, each encoded as follows:
/// - **Entity Type**: `u16` (2 bytes, value = 2)
/// - **Schema ID**: `u64` (8 bytes, links the event to a schema)
/// - **Level**: `u8` (1 byte, event verbosity or severity level)
/// - **Event Name Length**: `u16` (2 bytes, prefix for UTF-16LE encoded event name)
/// - **Event Name**: UTF-16LE encoded string (variable length)
/// - **Row Length**: `u32` (4 bytes, prefix for row data including `Simple Protocol` header)
/// - **Row Data**:
/// - **Simple Protocol Header**: `[0x53, 0x50, 0x01, 0x00]` (4 bytes)
/// - **Row Bytes**: `Vec<u8>` (variable length, row serialized as bytes)
/// - **Terminator**: `u64` (8 bytes, constant `TERMINATOR`)
///

impl CentralBlob {
#[allow(dead_code)]
pub(crate) fn to_bytes(&self) -> Vec<u8> {
// Estimate buffer size:
// - Header: 4 (version) + 4 (format)
// - Metadata: 4 (length prefix) + metadata_utf16.len()
// - Each schema:
// 2 (entity type, u16)
// + 8 (schema id, u64)
// + 16 (md5, [u8;16])
// + 4 (schema bytes length, u32)
// + schema_bytes.len()
// + 8 (terminator, u64)
// - Each event:
// 2 (entity type, u16)
// + 8 (schema_id, u64)
// + 1 (level, u8)
// + 2 (event name length, u16)
// + event_name_utf16.len()
// + 4 (row length, u32)
// + 4 (Simple Protocol header)
// + row_bytes.len()
// + 8 (terminator, u64)
let meta_utf16 = utf8_to_utf16le_bytes(&self.metadata);
let events_with_utf16 = self
.events
.iter()
.map(|e| {
let evname_utf16 = utf8_to_utf16le_bytes(&e.event_name);
(e, evname_utf16)
})
.collect::<Vec<_>>();
let mut estimated_size = 8 + 4 + meta_utf16.len();
estimated_size += self
.schemas
.iter()
.map(|s| 2 + 8 + 16 + 4 + s.schema.as_bytes().len() + 8)
.sum::<usize>();
estimated_size += events_with_utf16
.iter()
.map(|(e, evname_utf16)| {
let row_len = {
let row_bytes = e.row.as_bytes();
4 + row_bytes.len() // SP header (4), row_bytes
};
2 + 8 + 1 + 2 + evname_utf16.len() + row_len + 8
})
.sum::<usize>();

let mut buf = Vec::with_capacity(estimated_size);

// HEADER
buf.extend_from_slice(&self.version.to_le_bytes());
buf.extend_from_slice(&self.format.to_le_bytes());

// METADATA (len, UTF-16LE bytes)
buf.extend_from_slice(&(meta_utf16.len() as u32).to_le_bytes());
buf.extend_from_slice(&meta_utf16);

// SCHEMAS (type 0)
for schema in &self.schemas {
buf.extend_from_slice(&0u16.to_le_bytes()); // entity type 0
buf.extend_from_slice(&schema.id.to_le_bytes());
buf.extend_from_slice(&schema.md5);
let schema_bytes = schema.schema.as_bytes();
buf.extend_from_slice(&(schema_bytes.len() as u32).to_le_bytes());
buf.extend_from_slice(schema_bytes);
buf.extend_from_slice(&TERMINATOR.to_le_bytes());
}

// EVENTS (type 2)
for (event, evname_utf16) in events_with_utf16 {
buf.extend_from_slice(&2u16.to_le_bytes()); // entity type 2
buf.extend_from_slice(&event.schema_id.to_le_bytes());
buf.push(event.level);

// event name (UTF-16LE, prefixed with u16 len in bytes)
buf.extend_from_slice(&(evname_utf16.len() as u16).to_le_bytes());
buf.extend_from_slice(&evname_utf16);

// Add the Simple Protocol header before the row data
let row_bytes = event.row.as_bytes();

// Create a new buffer with the SP header
let mut modified_row = Vec::with_capacity(row_bytes.len() + 4);
modified_row.extend_from_slice(&[0x53, 0x50, 0x01, 0x00]); // Simple Protocol header
modified_row.extend_from_slice(row_bytes);

// row (len, bytes)
buf.extend_from_slice(&(modified_row.len() as u32).to_le_bytes());
buf.extend_from_slice(&modified_row);

buf.extend_from_slice(&TERMINATOR.to_le_bytes());
}

buf
}
}

// Example usage/test (can be moved to examples or tests)
#[cfg(test)]
mod tests {
use super::*;
use crate::payload_encoder::{EncoderRow, EncoderSchema};
use md5;

//Helper to calculate MD5 hash, returns [u8;16]
fn md5_bytes(data: &[u8]) -> [u8; 16] {
md5::compute(data).0
}

#[test]
fn test_central_blob_creation() {
// Prepare a schema
let fields = &[
("foo", 16u8, 1u16), // BT_INT32
("bar", 9u8, 2u16), // BT_STRING
];
let schema_obj = EncoderSchema::from_fields(fields);
let schema_bytes = schema_obj.as_bytes().to_vec();
let schema_md5 = md5_bytes(&schema_bytes);
let schema_id = 1234u64;

let schema = CentralSchemaEntry {
id: schema_id,
md5: schema_md5,
schema: schema_obj,
};

// Prepare a row
let mut row = Vec::new();
row.extend_from_slice(&42i32.to_le_bytes());
let s = "hello";
row.extend_from_slice(&(s.len() as u32).to_le_bytes());
row.extend_from_slice(s.as_bytes());

let row_obj = EncoderRow::from_schema_and_row(&schema.schema, &row);

let event = CentralEventEntry {
schema_id,
level: 0, // e.g. ETW verbose
event_name: "eventname".to_string(),
row: row_obj,
};

// Metadata
let metadata =
"namespace=testNamespace/eventVersion=Ver1v0/tenant=T/role=R/roleinstance=RI";

// Build blob
let blob = CentralBlob {
version: 1,
format: 42,
metadata: metadata.to_string(),
schemas: vec![schema],
events: vec![event],
};

let payload = blob.to_bytes();

// Only assert that the payload is created and non-empty
assert!(!payload.is_empty());
}
}
Loading
Loading