-
Notifications
You must be signed in to change notification settings - Fork 62
feat: [Geneva Exporter] - Add Serializer #261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
68 commits
Select commit
Hold shift + click to select a range
ff88eb7
initial code
lalitb 803e38d
add ci
lalitb 53cf8d6
fix CI
lalitb 020b881
move script
lalitb 9165297
fix ci
lalitb bd9c5a0
install boost
lalitb 15b0a50
install haskell
lalitb 8eeb563
fix ci
lalitb c77a33f
fix
lalitb 9261c15
fix ci
lalitb 93089d7
fix
lalitb 17de005
revert ci workflow, exclude geneva
lalitb 82cb160
fix
lalitb 66804da
fix
lalitb 6043450
Add CI
lalitb 2bb2fae
fix CI
lalitb d8713c4
Merge branch 'main' into geneva-encode
lalitb b86bf47
fix CI
lalitb 69b480a
fix ci
lalitb 15f4e2b
fix ci
lalitb 186cc5b
fix ci
lalitb adfc628
fix ci
lalitb 338f82a
Fix CI
lalitb 540951b
add CO
lalitb 821292d
fix
lalitb f8913be
fix cargo
lalitb fe002bb
initial working version
lalitb b480bc8
add otlp mapper
lalitb 94a063c
add exporter - for e2e
lalitb 6541c28
Merge branch 'main' into geneva-encode
lalitb f77fb1b
Merge branch 'main' into geneva-encode
lalitb 1bc9f7c
fix CI
lalitb 5972673
fix
lalitb 1696449
fix..
lalitb 43bf4fc
fix
lalitb b1ea9fd
fix
lalitb be62be8
add direct otlp encoding
lalitb 828c2d9
fix
lalitb 11d7b9a
add TODOs
lalitb 549eab6
Merge branch 'main' into geneva-encode
lalitb 6b25a8f
Update Cargo.toml
lalitb 6303b6d
Update otlp_encoder.rs
lalitb e09eeae
Merge branch 'main' into geneva-encode
lalitb 46f3eb1
USING PURE RUST based BOND
lalitb 4c6a99b
add batching based on SCHEMA and EVENT_NAME
lalitb c8d510b
Merge branch 'main' into geneva-encode
lalitb cf9c8d5
Update ci.yml
lalitb 46e7e16
Update Cargo.toml
lalitb 595d3f2
Update Cargo.toml
lalitb c1be071
Update Cargo.toml
lalitb 3b0e4a6
Update client.rs
lalitb 4e1d1bf
Update uploader.rs
lalitb 9415e66
fix workspace
lalitb c7d1f5a
fix lint
lalitb db9b6a9
Update client.rs
lalitb fe9942d
Add tests for encoder
lalitb 425c18b
Merge branch 'geneva-encode' of github.com:lalitb/opentelemetry-rust-…
lalitb 51168bd
reorg
lalitb 2668490
fmt, println r4emove
lalitb e5ab562
clippy warning for approximate conts
lalitb 8828f2a
review comment - first commit
lalitb 5bba0fe
review comments - second commit
lalitb 772a905
Merge branch 'main' into geneva-encode
lalitb d22e802
Update opentelemetry-exporter-geneva/geneva-uploader/src/payload_enco…
lalitb cdbf6e4
Update opentelemetry-exporter-geneva/geneva-uploader/src/payload_enco…
lalitb c38f233
review comments - third commit
lalitb 4b664b0
Merge branch 'geneva-encode' of github.com:lalitb/opentelemetry-rust-…
lalitb e9ab455
Fix test with correct length of traceid
lalitb File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,5 @@ rust-version = "1.75.0" | |
|
||
[dependencies] | ||
|
||
|
||
[lints] | ||
workspace = true | ||
workspace = true |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
106 changes: 106 additions & 0 deletions
106
opentelemetry-exporter-geneva/geneva-uploader/src/client.rs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
//! High-level GenevaClient for user code. Wraps config_service and ingestion_service. | ||
|
||
use crate::config_service::client::{AuthMethod, GenevaConfigClient, GenevaConfigClientConfig}; | ||
use crate::ingestion_service::uploader::{GenevaUploader, GenevaUploaderConfig}; | ||
use crate::payload_encoder::lz4_chunked_compression::lz4_chunked_compression; | ||
use crate::payload_encoder::otlp_encoder::OtlpEncoder; | ||
use opentelemetry_proto::tonic::logs::v1::ResourceLogs; | ||
use std::sync::Arc; | ||
|
||
/// Configuration for GenevaClient (user-facing) | ||
#[derive(Clone, Debug)] | ||
pub struct GenevaClientConfig { | ||
pub endpoint: String, | ||
pub environment: String, | ||
pub account: String, | ||
pub namespace: String, | ||
pub region: String, | ||
pub config_major_version: u32, | ||
pub auth_method: AuthMethod, | ||
pub tenant: String, | ||
pub role_name: String, | ||
pub role_instance: String, | ||
// Add event name/version here if constant, or per-upload if you want them per call. | ||
} | ||
|
||
/// Main user-facing client for Geneva ingestion. | ||
#[derive(Clone)] | ||
pub struct GenevaClient { | ||
uploader: Arc<GenevaUploader>, | ||
encoder: OtlpEncoder, | ||
metadata: String, | ||
} | ||
|
||
impl GenevaClient { | ||
/// Construct a new client with minimal configuration. Fetches and caches ingestion info as needed. | ||
pub async fn new(cfg: GenevaClientConfig) -> Result<Self, String> { | ||
// Build config client config | ||
let config_client_config = GenevaConfigClientConfig { | ||
endpoint: cfg.endpoint, | ||
environment: cfg.environment.clone(), | ||
account: cfg.account, | ||
namespace: cfg.namespace.clone(), | ||
region: cfg.region, | ||
config_major_version: cfg.config_major_version, | ||
auth_method: cfg.auth_method, | ||
}; | ||
let config_client = Arc::new( | ||
GenevaConfigClient::new(config_client_config) | ||
.map_err(|e| format!("GenevaConfigClient init failed: {e}"))?, | ||
); | ||
|
||
let source_identity = format!( | ||
"Tenant={}/Role={}/RoleInstance={}", | ||
cfg.tenant, cfg.role_name, cfg.role_instance | ||
); | ||
|
||
let schema_ids = | ||
"c1ce0ecea020359624c493bbe97f9e80;0da22cabbee419e000541a5eda732eb3".to_string(); // TODO - find the actual value to be populated | ||
|
||
// Uploader config | ||
let uploader_config = GenevaUploaderConfig { | ||
namespace: cfg.namespace.clone(), | ||
source_identity, | ||
environment: cfg.environment, | ||
schema_ids, | ||
}; | ||
|
||
let uploader = GenevaUploader::from_config_client(config_client, uploader_config) | ||
.await | ||
.map_err(|e| format!("GenevaUploader init failed: {e}"))?; | ||
let metadata = format!( | ||
"namespace={}/eventVersion={}/tenant={}/role={}/roleinstance={}", | ||
cfg.namespace, | ||
"Ver1v0", // You can replace this with a cfg field if version should be dynamic | ||
cfg.tenant, | ||
cfg.role_name, | ||
cfg.role_instance, | ||
); | ||
Ok(Self { | ||
uploader: Arc::new(uploader), | ||
encoder: OtlpEncoder::new(), | ||
metadata, | ||
}) | ||
} | ||
|
||
/// Upload OTLP logs (as ResourceLogs). | ||
pub async fn upload_logs(&self, logs: &[ResourceLogs]) -> Result<(), String> { | ||
let log_iter = logs | ||
.iter() | ||
.flat_map(|resource_log| resource_log.scope_logs.iter()) | ||
.flat_map(|scope_log| scope_log.log_records.iter()); | ||
let blobs = self.encoder.encode_log_batch(log_iter, &self.metadata); | ||
for (_schema_id, event_name, encoded_blob, _row_count) in blobs { | ||
// TODO - log encoded_blob for debugging | ||
let compressed_blob = lz4_chunked_compression(&encoded_blob) | ||
.map_err(|e| format!("LZ4 compression failed: {e}"))?; | ||
// TODO - log compressed_blob for debugging | ||
let event_version = "Ver2v0"; // TODO - find the actual value to be populated | ||
self.uploader | ||
.upload(compressed_blob, &event_name, event_version) | ||
.await | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would cause sequential exports of the blobs. We could do better than that by issuing the required upload calls and await them all together instead of doing it one by one. |
||
.map_err(|e| format!("Geneva upload failed: {e}"))?; | ||
} | ||
Ok(()) | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,18 @@ | ||
mod config_service; | ||
pub mod ingestion_service; | ||
mod ingestion_service; | ||
mod payload_encoder; | ||
|
||
mod uploader; | ||
pub mod client; | ||
|
||
#[allow(unused_imports)] | ||
pub(crate) use config_service::client::{ | ||
AuthMethod, GenevaConfigClient, GenevaConfigClientConfig, GenevaConfigClientError, | ||
IngestionGatewayInfo, | ||
GenevaConfigClient, GenevaConfigClientConfig, GenevaConfigClientError, IngestionGatewayInfo, | ||
}; | ||
|
||
#[allow(unused_imports)] | ||
pub(crate) use ingestion_service::uploader::{ | ||
GenevaUploader, GenevaUploaderConfig, GenevaUploaderError, IngestionResponse, Result, | ||
}; | ||
|
||
pub use uploader::{create_uploader, GenevaUploader as Uploader}; | ||
pub use client::{GenevaClient, GenevaClientConfig}; | ||
pub use config_service::client::AuthMethod; |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see "Ver1v0" being used in
GenevaClient::new()
method. Should these two have been the same string? If yes, maybe use a const variable at both places for the time being.