From 586fb31a1678ca220cdeef7f37b091de41b6ce95 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Wed, 10 Jan 2024 21:08:43 +0800 Subject: [PATCH] feat(greptimedb sink): update ingestion api for greptimedb sink (#19410) * feat: update ingestion api for greptimedb sink * fix: lint issues * chore: update greptime client * chore: update license --- Cargo.lock | 57 ++---- Cargo.toml | 2 +- LICENSE-3rdparty.csv | 2 +- scripts/integration/greptimedb/test.yaml | 2 +- src/sinks/greptimedb/request_builder.rs | 244 +++++++++++++---------- src/sinks/greptimedb/service.rs | 6 +- 6 files changed, 163 insertions(+), 150 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 007408d067d3b..e7f8fab6939b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2209,7 +2209,7 @@ dependencies = [ "futures-core", "prost 0.12.3", "prost-types 0.12.3", - "tonic 0.10.2", + "tonic", "tracing-core 0.1.32", ] @@ -2231,7 +2231,7 @@ dependencies = [ "thread_local", "tokio", "tokio-stream", - "tonic 0.10.2", + "tonic", "tracing 0.1.40", "tracing-core 0.1.32", "tracing-subscriber", @@ -3682,19 +3682,21 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?tag=0.2.1#4398d20c56d5f7939cc2960789cb1fa7dd18e6fe" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?tag=v0.4.1#4306ab645ee55b3f7f2ad3fb7acc5820f967c1aa" dependencies = [ - "prost 0.11.9", + "prost 0.12.3", "serde", "serde_json", - "tonic 0.9.2", - "tonic-build 0.9.2", + "strum", + "strum_macros", + "tonic", + "tonic-build 0.10.2", ] [[package]] name = "greptimedb-client" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptimedb-client-rust.git?rev=bc32362adf0df17a41a95bae4221d6d8f1775656#bc32362adf0df17a41a95bae4221d6d8f1775656" +source = "git+https://github.com/GreptimeTeam/greptimedb-ingester-rust.git?rev=4cb19ec47eeaf634c451d9ae438dac445a8a3dce#4cb19ec47eeaf634c451d9ae438dac445a8a3dce" dependencies = [ "dashmap", "enum_dispatch", @@ -3702,12 +3704,12 @@ dependencies = [ "futures-util", "greptime-proto", "parking_lot", - "prost 0.11.9", + "prost 0.12.3", "rand 0.8.5", "snafu", "tokio", "tokio-stream", - "tonic 0.9.2", + "tonic", "tonic-build 0.9.2", "tower", ] @@ -5985,7 +5987,7 @@ dependencies = [ "ordered-float 4.2.0", "prost 0.12.3", "prost-build 0.12.3", - "tonic 0.10.2", + "tonic", "tonic-build 0.10.2", "vector-core", "vector-lookup", @@ -9103,37 +9105,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "tonic" -version = "0.9.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a" -dependencies = [ - "async-stream", - "async-trait", - "axum", - "base64 0.21.6", - "bytes 1.5.0", - "futures-core", - "futures-util", - "h2 0.3.22", - "http 0.2.9", - "http-body", - "hyper", - "hyper-timeout", - "percent-encoding", - "pin-project", - "prost 0.11.9", - "rustls-pemfile", - "tokio", - "tokio-rustls", - "tokio-stream", - "tower", - "tower-layer", - "tower-service", - "tracing 0.1.40", -] - [[package]] name = "tonic" version = "0.10.2" @@ -9953,7 +9924,7 @@ dependencies = [ "tokio-tungstenite", "tokio-util", "toml 0.8.8", - "tonic 0.10.2", + "tonic", "tonic-build 0.10.2", "tower", "tower-http", @@ -10195,7 +10166,7 @@ dependencies = [ "tokio-test", "tokio-util", "toml 0.8.8", - "tonic 0.10.2", + "tonic", "tracing 0.1.40", "tracing-core 0.1.32", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 512528b49265a..0b0e3c0aaf134 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -229,7 +229,7 @@ hex = { version = "0.4.3", default-features = false, optional = true } sha2 = { version = "0.10.8", default-features = false, optional = true } # GreptimeDB -greptimedb-client = { git = "https://github.com/GreptimeTeam/greptimedb-client-rust.git", rev = "bc32362adf0df17a41a95bae4221d6d8f1775656", optional = true } +greptimedb-client = { git = "https://github.com/GreptimeTeam/greptimedb-ingester-rust.git", rev = "4cb19ec47eeaf634c451d9ae438dac445a8a3dce", optional = true } # External libs arc-swap = { version = "1.6", default-features = false, optional = true } diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 4842b415a3230..89e4b38c983a3 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -232,7 +232,7 @@ graphql_client,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT, graphql_client_codegen,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé graphql_query_derive,https://github.com/graphql-rust/graphql-client,Apache-2.0 OR MIT,Tom Houlé greptime-proto,https://github.com/GreptimeTeam/greptime-proto,Apache-2.0,The greptime-proto Authors -greptimedb-client,https://github.com/GreptimeTeam/greptimedb-client-rust,Apache-2.0,The greptimedb-client Authors +greptimedb-client,https://github.com/GreptimeTeam/greptimedb-ingester-rust,Apache-2.0,The greptimedb-client Authors grok,https://github.com/daschl/grok,Apache-2.0,Michael Nitschinger group,https://github.com/zkcrypto/group,MIT OR Apache-2.0,"Sean Bowe , Jack Grigg " h2,https://github.com/hyperium/h2,MIT,"Carl Lerche , Sean McArthur " diff --git a/scripts/integration/greptimedb/test.yaml b/scripts/integration/greptimedb/test.yaml index ac336a9210095..0fe648b0e7db2 100644 --- a/scripts/integration/greptimedb/test.yaml +++ b/scripts/integration/greptimedb/test.yaml @@ -11,7 +11,7 @@ runner: matrix: # Temporarily pegging to the latest known stable release # since using `latest` is failing consistently. - version: [v0.4.0] + version: [v0.4.4] # changes to these files/paths will invoke the integration test in CI # expressions are evaluated using https://github.com/micromatch/picomatch diff --git a/src/sinks/greptimedb/request_builder.rs b/src/sinks/greptimedb/request_builder.rs index 350aaaa6c9b1f..7c7b6c8189b14 100644 --- a/src/sinks/greptimedb/request_builder.rs +++ b/src/sinks/greptimedb/request_builder.rs @@ -1,6 +1,6 @@ use chrono::Utc; -use greptimedb_client::api::v1::column::*; use greptimedb_client::api::v1::*; +use greptimedb_client::helpers::values::*; use vector_lib::event::metric::{Bucket, MetricSketch, Quantile, Sample}; use vector_lib::event::{Metric, MetricValue}; use vector_lib::metrics::AgentDDSketch; @@ -11,46 +11,44 @@ pub(super) const DISTRIBUTION_QUANTILES: [f64; 5] = [0.5, 0.75, 0.90, 0.95, 0.99 pub(super) const DISTRIBUTION_STAT_FIELD_COUNT: usize = 5; pub(super) const SUMMARY_STAT_FIELD_COUNT: usize = 2; -fn f64_field(name: &str, value: f64) -> Column { - Column { +fn f64_column(name: &str) -> ColumnSchema { + ColumnSchema { column_name: name.to_owned(), - values: Some(column::Values { - f64_values: vec![value], - ..Default::default() - }), semantic_type: SemanticType::Field as i32, datatype: ColumnDataType::Float64 as i32, ..Default::default() } } -fn ts_column(name: &str, value: i64) -> Column { - Column { +fn ts_column(name: &str) -> ColumnSchema { + ColumnSchema { column_name: name.to_owned(), - values: Some(column::Values { - ts_millisecond_values: vec![value], - ..Default::default() - }), semantic_type: SemanticType::Timestamp as i32, datatype: ColumnDataType::TimestampMillisecond as i32, ..Default::default() } } -fn tag_column(name: &str, value: &str) -> Column { - Column { +fn tag_column(name: &str) -> ColumnSchema { + ColumnSchema { column_name: name.to_owned(), - values: Some(column::Values { - string_values: vec![value.to_owned()], - ..Default::default() - }), semantic_type: SemanticType::Tag as i32, datatype: ColumnDataType::String as i32, ..Default::default() } } -pub(super) fn metric_to_insert_request(metric: Metric) -> InsertRequest { +fn encode_f64_value( + name: &str, + value: f64, + schema: &mut Vec, + columns: &mut Vec, +) { + schema.push(f64_column(name)); + columns.push(f64_value(value)); +} + +pub(super) fn metric_to_insert_request(metric: Metric) -> RowInsertRequest { let ns = metric.namespace(); let metric_name = metric.name(); let table_name = if let Some(ns) = ns { @@ -58,29 +56,38 @@ pub(super) fn metric_to_insert_request(metric: Metric) -> InsertRequest { } else { metric_name.to_owned() }; - + let mut schema = Vec::new(); let mut columns = Vec::new(); + // timestamp let timestamp = metric .timestamp() .map(|t| t.timestamp_millis()) .unwrap_or_else(|| Utc::now().timestamp_millis()); - columns.push(ts_column("ts", timestamp)); + schema.push(ts_column("ts")); + columns.push(timestamp_millisecond_value(timestamp)); // tags if let Some(tags) = metric.tags() { for (key, value) in tags.iter_single() { - columns.push(tag_column(key, value)); + schema.push(tag_column(key)); + columns.push(string_value(value.to_owned())); } } // fields match metric.value() { - MetricValue::Counter { value } => columns.push(f64_field("val", *value)), - MetricValue::Gauge { value } => columns.push(f64_field("val", *value)), - MetricValue::Set { values } => columns.push(f64_field("val", values.len() as f64)), + MetricValue::Counter { value } => { + encode_f64_value("val", *value, &mut schema, &mut columns); + } + MetricValue::Gauge { value } => { + encode_f64_value("val", *value, &mut schema, &mut columns); + } + MetricValue::Set { values } => { + encode_f64_value("val", values.len() as f64, &mut schema, &mut columns); + } MetricValue::Distribution { samples, .. } => { - encode_distribution(samples, &mut columns); + encode_distribution(samples, &mut schema, &mut columns); } MetricValue::AggregatedHistogram { @@ -88,83 +95,97 @@ pub(super) fn metric_to_insert_request(metric: Metric) -> InsertRequest { count, sum, } => { - encode_histogram(buckets.as_ref(), &mut columns); - columns.push(f64_field("count", *count as f64)); - columns.push(f64_field("sum", *sum)); + encode_histogram(buckets.as_ref(), &mut schema, &mut columns); + encode_f64_value("count", *count as f64, &mut schema, &mut columns); + encode_f64_value("sum", *sum, &mut schema, &mut columns); } MetricValue::AggregatedSummary { quantiles, count, sum, } => { - encode_quantiles(quantiles.as_ref(), &mut columns); - columns.push(f64_field("count", *count as f64)); - columns.push(f64_field("sum", *sum)); + encode_quantiles(quantiles.as_ref(), &mut schema, &mut columns); + encode_f64_value("count", *count as f64, &mut schema, &mut columns); + encode_f64_value("sum", *sum, &mut schema, &mut columns); } MetricValue::Sketch { sketch } => { let MetricSketch::AgentDDSketch(sketch) = sketch; - encode_sketch(sketch, &mut columns); + encode_sketch(sketch, &mut schema, &mut columns); } } - InsertRequest { + RowInsertRequest { table_name, - columns, - row_count: 1, - ..Default::default() + rows: Some(Rows { + schema, + rows: vec![Row { values: columns }], + }), } } -fn encode_distribution(samples: &[Sample], columns: &mut Vec) { +fn encode_distribution( + samples: &[Sample], + schema: &mut Vec, + columns: &mut Vec, +) { if let Some(stats) = DistributionStatistic::from_samples(samples, &DISTRIBUTION_QUANTILES) { - columns.push(f64_field("min", stats.min)); - columns.push(f64_field("max", stats.max)); - columns.push(f64_field("avg", stats.avg)); - columns.push(f64_field("sum", stats.sum)); - columns.push(f64_field("count", stats.count as f64)); + encode_f64_value("min", stats.min, schema, columns); + encode_f64_value("max", stats.max, schema, columns); + encode_f64_value("avg", stats.avg, schema, columns); + encode_f64_value("sum", stats.sum, schema, columns); + encode_f64_value("count", stats.count as f64, schema, columns); for (quantile, value) in stats.quantiles { - columns.push(f64_field(&format!("p{:02}", quantile * 100f64), value)); + encode_f64_value( + &format!("p{:02}", quantile * 100f64), + value, + schema, + columns, + ); } } } -fn encode_histogram(buckets: &[Bucket], columns: &mut Vec) { +fn encode_histogram(buckets: &[Bucket], schema: &mut Vec, columns: &mut Vec) { for bucket in buckets { let column_name = format!("b{}", bucket.upper_limit); - columns.push(f64_field(&column_name, bucket.count as f64)); + encode_f64_value(&column_name, bucket.count as f64, schema, columns); } } -fn encode_quantiles(quantiles: &[Quantile], columns: &mut Vec) { +fn encode_quantiles( + quantiles: &[Quantile], + schema: &mut Vec, + columns: &mut Vec, +) { for quantile in quantiles { let column_name = format!("p{:02}", quantile.quantile * 100f64); - columns.push(f64_field(&column_name, quantile.value)); + encode_f64_value(&column_name, quantile.value, schema, columns); } } -fn encode_sketch(sketch: &AgentDDSketch, columns: &mut Vec) { - columns.push(f64_field("count", sketch.count() as f64)); +fn encode_sketch(sketch: &AgentDDSketch, schema: &mut Vec, columns: &mut Vec) { + encode_f64_value("count", sketch.count() as f64, schema, columns); if let Some(min) = sketch.min() { - columns.push(f64_field("min", min)); + encode_f64_value("min", min, schema, columns); } if let Some(max) = sketch.max() { - columns.push(f64_field("max", max)); + encode_f64_value("max", max, schema, columns); } if let Some(sum) = sketch.sum() { - columns.push(f64_field("sum", sum)); + encode_f64_value("sum", sum, schema, columns); } if let Some(avg) = sketch.avg() { - columns.push(f64_field("avg", avg)); + encode_f64_value("avg", avg, schema, columns); } for q in DISTRIBUTION_QUANTILES { if let Some(quantile) = sketch.quantile(q) { let column_name = format!("p{:02}", q * 100f64); - columns.push(f64_field(&column_name, quantile)); + encode_f64_value(&column_name, quantile, schema, columns); } } } @@ -177,9 +198,23 @@ mod tests { use super::*; use crate::event::metric::{MetricKind, StatisticKind}; - fn get_column(columns: &[Column], name: &str) -> f64 { - let col = columns.iter().find(|c| c.column_name == name).unwrap(); - *(col.values.as_ref().unwrap().f64_values.first().unwrap()) + fn get_column(rows: &Rows, name: &str) -> f64 { + let (col_index, _) = rows + .schema + .iter() + .enumerate() + .find(|(_, c)| c.column_name == name) + .unwrap(); + let value_data = rows.rows[0].values[col_index] + .value_data + .as_ref() + .expect("null value"); + match value_data { + value::ValueData::F64Value(v) => *v, + _ => { + unreachable!() + } + } } #[test] @@ -196,11 +231,12 @@ mod tests { let insert = metric_to_insert_request(metric); assert_eq!(insert.table_name, "ns_load1"); - assert_eq!(insert.row_count, 1); - assert_eq!(insert.columns.len(), 3); + let rows = insert.rows.expect("Empty insert request"); + assert_eq!(rows.rows.len(), 1); + assert_eq!(rows.rows[0].values.len(), 3); - let column_names = insert - .columns + let column_names = rows + .schema .iter() .map(|c| c.column_name.as_ref()) .collect::>(); @@ -208,7 +244,7 @@ mod tests { assert!(column_names.contains(&"host")); assert!(column_names.contains(&"val")); - assert_eq!(get_column(&insert.columns, "val"), 1.1); + assert_eq!(get_column(&rows, "val"), 1.1); let metric2 = Metric::new( "load1", @@ -227,9 +263,10 @@ mod tests { MetricValue::Counter { value: 1.1 }, ); let insert = metric_to_insert_request(metric); - assert_eq!(insert.columns.len(), 2); + let rows = insert.rows.expect("Empty insert request"); + assert_eq!(rows.rows[0].values.len(), 2); - assert_eq!(get_column(&insert.columns, "val"), 1.1); + assert_eq!(get_column(&rows, "val"), 1.1); } #[test] @@ -242,9 +279,10 @@ mod tests { }, ); let insert = metric_to_insert_request(metric); - assert_eq!(insert.columns.len(), 2); + let rows = insert.rows.expect("Empty insert request"); + assert_eq!(rows.rows[0].values.len(), 2); - assert_eq!(get_column(&insert.columns, "val"), 2.0); + assert_eq!(get_column(&rows, "val"), 2.0); } #[test] @@ -258,21 +296,22 @@ mod tests { }, ); let insert = metric_to_insert_request(metric); + let rows = insert.rows.expect("Empty insert request"); assert_eq!( - insert.columns.len(), + rows.rows[0].values.len(), 1 + DISTRIBUTION_STAT_FIELD_COUNT + DISTRIBUTION_QUANTILES.len() ); - assert_eq!(get_column(&insert.columns, "max"), 3.0); - assert_eq!(get_column(&insert.columns, "min"), 1.0); - assert_eq!(get_column(&insert.columns, "avg"), 2.0); - assert_eq!(get_column(&insert.columns, "sum"), 16.0); - assert_eq!(get_column(&insert.columns, "count"), 8.0); - assert_eq!(get_column(&insert.columns, "p50"), 2.0); - assert_eq!(get_column(&insert.columns, "p75"), 2.0); - assert_eq!(get_column(&insert.columns, "p90"), 3.0); - assert_eq!(get_column(&insert.columns, "p95"), 3.0); - assert_eq!(get_column(&insert.columns, "p99"), 3.0); + assert_eq!(get_column(&rows, "max"), 3.0); + assert_eq!(get_column(&rows, "min"), 1.0); + assert_eq!(get_column(&rows, "avg"), 2.0); + assert_eq!(get_column(&rows, "sum"), 16.0); + assert_eq!(get_column(&rows, "count"), 8.0); + assert_eq!(get_column(&rows, "p50"), 2.0); + assert_eq!(get_column(&rows, "p75"), 2.0); + assert_eq!(get_column(&rows, "p90"), 3.0); + assert_eq!(get_column(&rows, "p95"), 3.0); + assert_eq!(get_column(&rows, "p99"), 3.0); } #[test] @@ -289,16 +328,17 @@ mod tests { }, ); let insert = metric_to_insert_request(metric); + let rows = insert.rows.expect("Empty insert request"); assert_eq!( - insert.columns.len(), + rows.rows[0].values.len(), 1 + SUMMARY_STAT_FIELD_COUNT + buckets_len ); - assert_eq!(get_column(&insert.columns, "b1"), 1.0); - assert_eq!(get_column(&insert.columns, "b2"), 2.0); - assert_eq!(get_column(&insert.columns, "b3"), 1.0); - assert_eq!(get_column(&insert.columns, "count"), 4.0); - assert_eq!(get_column(&insert.columns, "sum"), 8.0); + assert_eq!(get_column(&rows, "b1"), 1.0); + assert_eq!(get_column(&rows, "b2"), 2.0); + assert_eq!(get_column(&rows, "b3"), 1.0); + assert_eq!(get_column(&rows, "count"), 4.0); + assert_eq!(get_column(&rows, "sum"), 8.0); } #[test] @@ -316,16 +356,17 @@ mod tests { ); let insert = metric_to_insert_request(metric); + let rows = insert.rows.expect("Empty insert request"); assert_eq!( - insert.columns.len(), + rows.rows[0].values.len(), 1 + SUMMARY_STAT_FIELD_COUNT + quantiles_len ); - assert_eq!(get_column(&insert.columns, "p01"), 1.5); - assert_eq!(get_column(&insert.columns, "p50"), 2.0); - assert_eq!(get_column(&insert.columns, "p99"), 3.0); - assert_eq!(get_column(&insert.columns, "count"), 6.0); - assert_eq!(get_column(&insert.columns, "sum"), 12.0); + assert_eq!(get_column(&rows, "p01"), 1.5); + assert_eq!(get_column(&rows, "p50"), 2.0); + assert_eq!(get_column(&rows, "p99"), 3.0); + assert_eq!(get_column(&rows, "count"), 6.0); + assert_eq!(get_column(&rows, "sum"), 12.0); } #[test] @@ -345,19 +386,20 @@ mod tests { ); let insert = metric_to_insert_request(metric); + let rows = insert.rows.expect("Empty insert request"); assert_eq!( - insert.columns.len(), + rows.rows[0].values.len(), 1 + DISTRIBUTION_QUANTILES.len() + DISTRIBUTION_STAT_FIELD_COUNT ); - assert!(get_column(&insert.columns, "p50") <= 4.0); - assert!(get_column(&insert.columns, "p95") > 8.0); - assert!(get_column(&insert.columns, "p95") <= 9.0); - assert!(get_column(&insert.columns, "p99") > 8.0); - assert!(get_column(&insert.columns, "p99") <= 9.0); - assert_eq!(get_column(&insert.columns, "count"), samples as f64); - assert_eq!(get_column(&insert.columns, "sum"), 45.0); - assert_eq!(get_column(&insert.columns, "max"), 9.0); - assert_eq!(get_column(&insert.columns, "min"), 0.0); + assert!(get_column(&rows, "p50") <= 4.0); + assert!(get_column(&rows, "p95") > 8.0); + assert!(get_column(&rows, "p95") <= 9.0); + assert!(get_column(&rows, "p99") > 8.0); + assert!(get_column(&rows, "p99") <= 9.0); + assert_eq!(get_column(&rows, "count"), samples as f64); + assert_eq!(get_column(&rows, "sum"), 45.0); + assert_eq!(get_column(&rows, "max"), 9.0); + assert_eq!(get_column(&rows, "min"), 0.0); } } diff --git a/src/sinks/greptimedb/service.rs b/src/sinks/greptimedb/service.rs index c12d25d227a05..46516c745f917 100644 --- a/src/sinks/greptimedb/service.rs +++ b/src/sinks/greptimedb/service.rs @@ -28,7 +28,7 @@ impl RetryLogic for GreptimeDBRetryLogic { #[derive(Clone)] pub(super) struct GreptimeDBRequest { - items: Vec, + items: RowInsertRequests, finalizers: EventFinalizers, metadata: RequestMetadata, } @@ -54,7 +54,7 @@ impl GreptimeDBRequest { NonZeroUsize::new(estimated_request_size).expect("request should never be zero length"); GreptimeDBRequest { - items, + items: RowInsertRequests { inserts: items }, finalizers, metadata: request_metadata_builder.with_request_size(request_size), } @@ -179,7 +179,7 @@ impl Service for GreptimeDBService { Box::pin(async move { let metadata = req.metadata; - let result = client.insert(req.items).await?; + let result = client.row_insert(req.items).await?; Ok(GreptimeDBBatchOutput { item_count: result,