diff --git a/Cargo.lock b/Cargo.lock index ee96285c6..3160f96cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1617,6 +1617,7 @@ dependencies = [ "anyhow", "insta", "jsonschema", + "prometheus", "query-engine-metadata", "schemars", "serde", diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index c1effea3d..1ff6b2500 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -11,6 +11,7 @@ workspace = true query-engine-metadata = { path = "../query-engine/metadata" } anyhow = { workspace = true } +prometheus = {workspace = true } schemars = { workspace = true, features = ["smol_str", "preserve_order"] } serde = { workspace = true } serde_json = { workspace = true, features = ["raw_value"] } diff --git a/crates/configuration/src/configuration.rs b/crates/configuration/src/configuration.rs index 70db818c5..56c5967ef 100644 --- a/crates/configuration/src/configuration.rs +++ b/crates/configuration/src/configuration.rs @@ -12,6 +12,7 @@ use crate::error::{ use crate::values::{IsolationLevel, PoolSettings}; use crate::version3; use crate::version4; +use crate::VersionTag; use schemars::{gen::SchemaSettings, schema::RootSchema}; pub fn generate_latest_schema() -> RootSchema { @@ -60,12 +61,12 @@ impl ParsedConfiguration { #[derive(Debug)] pub struct Configuration { pub metadata: metadata::Metadata, + pub configuration_version_tag: VersionTag, pub pool_settings: PoolSettings, pub connection_uri: String, pub isolation_level: IsolationLevel, pub mutations_version: Option, } - pub async fn introspect( input: ParsedConfiguration, environment: impl Environment, diff --git a/crates/configuration/src/lib.rs b/crates/configuration/src/lib.rs index c451894c7..e31e078bf 100644 --- a/crates/configuration/src/lib.rs +++ b/crates/configuration/src/lib.rs @@ -3,6 +3,8 @@ mod values; pub mod environment; pub mod error; +pub mod metrics; + mod version3; mod version4; @@ -13,6 +15,14 @@ pub use configuration::{ }; pub use values::{ConnectionUri, IsolationLevel, PoolSettings, Secret}; +pub use metrics::Metrics; + +#[derive(Debug, Copy, Clone)] +pub enum VersionTag { + Version3, + Version4, +} + #[cfg(test)] pub mod tests; diff --git a/crates/configuration/src/metrics.rs b/crates/configuration/src/metrics.rs new file mode 100644 index 000000000..10de6515e --- /dev/null +++ b/crates/configuration/src/metrics.rs @@ -0,0 +1,61 @@ +//! Metrics setup and update for our connector. + +use prometheus::{IntGauge, Registry}; + +use crate::VersionTag; + +/// The collection of configuration-related metrics exposed through the `/metrics` endpoint. +#[derive(Debug, Clone)] +pub struct Metrics { + configuration_version_3: IntGauge, + configuration_version_4: IntGauge, +} + +impl Metrics { + /// Set up counters and gauges used to produce Prometheus metrics + pub fn initialize(metrics_registry: &mut Registry) -> Result { + let configuration_version_3 = add_int_gauge_metric( + metrics_registry, + "ndc_postgres_configuration_version_3", + "Get whether configuration version 3 is used", + )?; + + let configuration_version_4 = add_int_gauge_metric( + metrics_registry, + "ndc_postgres_configuration_version_4", + "Get whether configuration version 4 is used", + )?; + + Ok(Self { + configuration_version_3, + configuration_version_4, + }) + } + + /// Set the configuration version used by this connector instance. + pub fn set_configuration_version(&self, version: VersionTag) { + match version { + VersionTag::Version3 => self.configuration_version_3.set(1), + VersionTag::Version4 => self.configuration_version_4.set(1), + } + } +} + +/// Create a new int gauge metric and register it with the provided Prometheus Registry +fn add_int_gauge_metric( + metrics_registry: &mut Registry, + metric_name: &str, + metric_description: &str, +) -> Result { + let int_gauge = IntGauge::with_opts(prometheus::Opts::new(metric_name, metric_description))?; + register_collector(metrics_registry, int_gauge) +} + +/// Register a new collector with the registry, and returns it for later use. +fn register_collector( + metrics_registry: &mut Registry, + collector: Collector, +) -> Result { + metrics_registry.register(Box::new(collector.clone()))?; + Ok(collector) +} diff --git a/crates/configuration/src/version3/mod.rs b/crates/configuration/src/version3/mod.rs index b6a3fc6b4..f98238d2a 100644 --- a/crates/configuration/src/version3/mod.rs +++ b/crates/configuration/src/version3/mod.rs @@ -23,6 +23,7 @@ use crate::error::{ MakeRuntimeConfigurationError, ParseConfigurationError, WriteParsedConfigurationError, }; use crate::values::{ConnectionUri, Secret}; +use crate::VersionTag; #[cfg(test)] mod tests; @@ -551,6 +552,7 @@ pub fn make_runtime_configuration( connection_uri, isolation_level: configuration.connection_settings.isolation_level, mutations_version: convert_mutations_version(configuration.mutations_version), + configuration_version_tag: VersionTag::Version3, }) } diff --git a/crates/configuration/src/version4/to_runtime_configuration.rs b/crates/configuration/src/version4/to_runtime_configuration.rs index af38bf273..dbe7283db 100644 --- a/crates/configuration/src/version4/to_runtime_configuration.rs +++ b/crates/configuration/src/version4/to_runtime_configuration.rs @@ -6,6 +6,7 @@ use super::ParsedConfiguration; use crate::environment::Environment; use crate::error::MakeRuntimeConfigurationError; use crate::values::{ConnectionUri, Secret}; +use crate::VersionTag; /// Convert the parsed configuration metadata to internal engine metadata /// That can be used by the connector at runtime. @@ -30,6 +31,7 @@ pub fn make_runtime_configuration( connection_uri, isolation_level: parsed_config.connection_settings.isolation_level, mutations_version: convert_mutations_version(parsed_config.mutations_version), + configuration_version_tag: VersionTag::Version4, }) } diff --git a/crates/connectors/ndc-postgres/src/connector.rs b/crates/connectors/ndc-postgres/src/connector.rs index 11c5496ce..136b5cc67 100644 --- a/crates/connectors/ndc-postgres/src/connector.rs +++ b/crates/connectors/ndc-postgres/src/connector.rs @@ -43,7 +43,7 @@ impl Connector for Postgres { _configuration: &Self::Configuration, state: &Self::State, ) -> Result<(), connector::FetchMetricsError> { - state.metrics.update_pool_metrics(&state.pool); + state.query_metrics.update_pool_metrics(&state.pool); Ok(()) } @@ -292,6 +292,7 @@ impl ConnectorSetup for PostgresSetup { &configuration.connection_uri, &configuration.pool_settings, metrics, + configuration.configuration_version_tag, ) .instrument(info_span!("Initialise state")) .await diff --git a/crates/connectors/ndc-postgres/src/mutation.rs b/crates/connectors/ndc-postgres/src/mutation.rs index fdb703b47..cc4340d41 100644 --- a/crates/connectors/ndc-postgres/src/mutation.rs +++ b/crates/connectors/ndc-postgres/src/mutation.rs @@ -30,7 +30,7 @@ pub async fn mutation( state: &state::State, request: models::MutationRequest, ) -> Result, connector::MutationError> { - let timer = state.metrics.time_mutation_total(); + let timer = state.query_metrics.time_mutation_total(); // See https://docs.rs/tracing/0.1.29/tracing/span/struct.Span.html#in-asynchronous-code let result = async move { @@ -41,7 +41,7 @@ pub async fn mutation( let plan = async { plan_mutation(configuration, state, request).map_err(|err| { - record::translation_error(&err, &state.metrics); + record::translation_error(&err, &state.query_metrics); convert::translation_error_to_mutation_error(&err) }) } @@ -50,14 +50,14 @@ pub async fn mutation( let result = async { execute_mutation(state, plan).await.map_err(|err| { - record::execution_error(&err, &state.metrics); + record::execution_error(&err, &state.query_metrics); convert::execution_error_to_mutation_error(err) }) } .instrument(info_span!("Execute mutation")) .await?; - state.metrics.record_successful_mutation(); + state.query_metrics.record_successful_mutation(); Ok(result) } .instrument(info_span!("/mutation")) @@ -75,7 +75,7 @@ fn plan_mutation( sql::execution_plan::ExecutionPlan, translation::error::Error, > { - let timer = state.metrics.time_mutation_plan(); + let timer = state.query_metrics.time_mutation_plan(); let mutations = request .operations .into_iter() @@ -101,7 +101,7 @@ async fn execute_mutation( query_engine_execution::mutation::execute( &state.pool, &state.database_info, - &state.metrics, + &state.query_metrics, plan, ) .await diff --git a/crates/connectors/ndc-postgres/src/mutation/explain.rs b/crates/connectors/ndc-postgres/src/mutation/explain.rs index 2108ee597..a5c982294 100644 --- a/crates/connectors/ndc-postgres/src/mutation/explain.rs +++ b/crates/connectors/ndc-postgres/src/mutation/explain.rs @@ -37,7 +37,7 @@ pub async fn explain( .instrument(info_span!("Plan mutation")) .await .map_err(|err| { - record::translation_error(&err, &state.metrics); + record::translation_error(&err, &state.query_metrics); convert::translation_error_to_explain_error(&err) })?; @@ -46,19 +46,19 @@ pub async fn explain( query_engine_execution::mutation::explain( &state.pool, &state.database_info, - &state.metrics, + &state.query_metrics, plan, ) .await .map_err(|err| { - record::execution_error(&err, &state.metrics); + record::execution_error(&err, &state.query_metrics); convert::execution_error_to_explain_error(err) }) } .instrument(info_span!("Explain mutation")) .await?; - state.metrics.record_successful_explain(); + state.query_metrics.record_successful_explain(); let details: BTreeMap = results .into_iter() diff --git a/crates/connectors/ndc-postgres/src/query.rs b/crates/connectors/ndc-postgres/src/query.rs index 02164cc09..ad88c7e88 100644 --- a/crates/connectors/ndc-postgres/src/query.rs +++ b/crates/connectors/ndc-postgres/src/query.rs @@ -29,7 +29,7 @@ pub async fn query( state: &state::State, query_request: models::QueryRequest, ) -> Result, connector::QueryError> { - let timer = state.metrics.time_query_total(); + let timer = state.query_metrics.time_query_total(); // See https://docs.rs/tracing/0.1.29/tracing/span/struct.Span.html#in-asynchronous-code let result = async move { @@ -40,7 +40,7 @@ pub async fn query( let plan = async { plan_query(configuration, state, query_request).map_err(|err| { - record::translation_error(&err, &state.metrics); + record::translation_error(&err, &state.query_metrics); convert::translation_error_to_query_error(&err) }) } @@ -49,14 +49,14 @@ pub async fn query( let result = async { execute_query(state, plan).await.map_err(|err| { - record::execution_error(&err, &state.metrics); + record::execution_error(&err, &state.query_metrics); convert::execution_error_to_query_error(err) }) } .instrument(info_span!("Execute query")) .await?; - state.metrics.record_successful_query(); + state.query_metrics.record_successful_query(); Ok(result) } .instrument(info_span!("/query")) @@ -71,7 +71,7 @@ fn plan_query( query_request: models::QueryRequest, ) -> Result, translation::error::Error> { - let timer = state.metrics.time_query_plan(); + let timer = state.query_metrics.time_query_plan(); let result = translation::query::translate(&configuration.metadata, query_request); timer.complete_with(result) } @@ -80,7 +80,12 @@ async fn execute_query( state: &state::State, plan: sql::execution_plan::ExecutionPlan, ) -> Result, query_engine_execution::error::Error> { - query_engine_execution::query::execute(&state.pool, &state.database_info, &state.metrics, plan) - .await - .map(JsonResponse::Serialized) + query_engine_execution::query::execute( + &state.pool, + &state.database_info, + &state.query_metrics, + plan, + ) + .await + .map(JsonResponse::Serialized) } diff --git a/crates/connectors/ndc-postgres/src/query/explain.rs b/crates/connectors/ndc-postgres/src/query/explain.rs index 20f760ce5..00c6d0062 100644 --- a/crates/connectors/ndc-postgres/src/query/explain.rs +++ b/crates/connectors/ndc-postgres/src/query/explain.rs @@ -33,7 +33,7 @@ pub async fn explain( // Compile the query. let plan = async { super::plan_query(configuration, state, query_request).map_err(|err| { - record::translation_error(&err, &state.metrics); + record::translation_error(&err, &state.query_metrics); convert::translation_error_to_explain_error(&err) }) } @@ -45,19 +45,19 @@ pub async fn explain( query_engine_execution::query::explain( &state.pool, &state.database_info, - &state.metrics, + &state.query_metrics, plan, ) .await .map_err(|err| { - record::execution_error(&err, &state.metrics); + record::execution_error(&err, &state.query_metrics); convert::execution_error_to_explain_error(err) }) } .instrument(info_span!("Explain query")) .await?; - state.metrics.record_successful_explain(); + state.query_metrics.record_successful_explain(); let details = BTreeMap::from_iter([("SQL Query".into(), query), ("Execution Plan".into(), plan)]); diff --git a/crates/connectors/ndc-postgres/src/state.rs b/crates/connectors/ndc-postgres/src/state.rs index 447626ebd..24e49b789 100644 --- a/crates/connectors/ndc-postgres/src/state.rs +++ b/crates/connectors/ndc-postgres/src/state.rs @@ -18,7 +18,8 @@ use query_engine_execution::metrics; pub struct State { pub pool: PgPool, pub database_info: DatabaseInfo, - pub metrics: metrics::Metrics, + pub query_metrics: metrics::Metrics, + pub configuration_metrics: ndc_postgres_configuration::Metrics, } /// Create a connection pool and wrap it inside a connector State. @@ -26,6 +27,7 @@ pub async fn create_state( connection_uri: &str, pool_settings: &PoolSettings, metrics_registry: &mut prometheus::Registry, + version_tag: ndc_postgres_configuration::VersionTag, ) -> Result { let connection_url: Url = connection_uri .parse() @@ -55,19 +57,27 @@ pub async fn create_state( }; let database_info = parse_database_info(&connection_url, database_version); - let metrics = async { - let metrics_inner = metrics::Metrics::initialize(metrics_registry) + let (query_metrics, configuration_metrics) = async { + let query_metrics_inner = metrics::Metrics::initialize(metrics_registry) .map_err(InitializationError::MetricsError)?; - metrics_inner.set_pool_options_metrics(pool.options()); - Ok(metrics_inner) + query_metrics_inner.set_pool_options_metrics(pool.options()); + + let configuration_metrics_inner = + ndc_postgres_configuration::Metrics::initialize(metrics_registry) + .map_err(InitializationError::MetricsError)?; + + Ok((query_metrics_inner, configuration_metrics_inner)) } .instrument(info_span!("Setup metrics")) .await?; + configuration_metrics.set_configuration_version(version_tag); + Ok(State { pool, database_info, - metrics, + query_metrics, + configuration_metrics, }) } @@ -161,7 +171,7 @@ pub enum InitializationError { #[error("unable to connect to the database: {0}")] UnableToConnect(sqlx::Error), #[error("error initializing metrics: {0}")] - MetricsError(metrics::Error), + MetricsError(prometheus::Error), } #[cfg(test)] diff --git a/crates/query-engine/execution/src/metrics.rs b/crates/query-engine/execution/src/metrics.rs index 3efba8a49..019dc60af 100644 --- a/crates/query-engine/execution/src/metrics.rs +++ b/crates/query-engine/execution/src/metrics.rs @@ -30,7 +30,7 @@ pub struct Metrics { impl Metrics { /// Set up counters and gauges used to produce Prometheus metrics - pub fn initialize(metrics_registry: &mut Registry) -> Result { + pub fn initialize(metrics_registry: &mut Registry) -> Result { let query_total = add_int_counter_metric( metrics_registry, "ndc_postgres_query_total", @@ -250,9 +250,9 @@ fn add_int_counter_metric( metrics_registry: &mut Registry, metric_name: &str, metric_description: &str, -) -> Result { - let int_counter = IntCounter::with_opts(prometheus::Opts::new(metric_name, metric_description)) - .map_err(Error)?; +) -> Result { + let int_counter = + IntCounter::with_opts(prometheus::Opts::new(metric_name, metric_description))?; register_collector(metrics_registry, int_counter) } @@ -261,9 +261,8 @@ fn add_int_gauge_metric( metrics_registry: &mut Registry, metric_name: &str, metric_description: &str, -) -> Result { - let int_gauge = IntGauge::with_opts(prometheus::Opts::new(metric_name, metric_description)) - .map_err(Error)?; +) -> Result { + let int_gauge = IntGauge::with_opts(prometheus::Opts::new(metric_name, metric_description))?; register_collector(metrics_registry, int_gauge) } @@ -272,9 +271,8 @@ fn add_gauge_metric( metrics_registry: &mut Registry, metric_name: &str, metric_description: &str, -) -> Result { - let gauge = - Gauge::with_opts(prometheus::Opts::new(metric_name, metric_description)).map_err(Error)?; +) -> Result { + let gauge = Gauge::with_opts(prometheus::Opts::new(metric_name, metric_description))?; register_collector(metrics_registry, gauge) } @@ -284,12 +282,11 @@ fn add_histogram_metric( metrics_registry: &mut prometheus::Registry, metric_name: &str, metric_description: &str, -) -> Result { +) -> Result { let histogram = Histogram::with_opts(prometheus::HistogramOpts::new( metric_name, metric_description, - )) - .map_err(Error)?; + ))?; register_collector(metrics_registry, histogram) } @@ -297,10 +294,8 @@ fn add_histogram_metric( fn register_collector( metrics_registry: &mut Registry, collector: Collector, -) -> Result { - metrics_registry - .register(Box::new(collector.clone())) - .map_err(Error)?; +) -> Result { + metrics_registry.register(Box::new(collector.clone()))?; Ok(collector) } @@ -324,19 +319,6 @@ impl Timer { } } -/// A wrapper around the internal Prometheus error type to avoid exposing more -/// than we need. -#[derive(Debug)] -pub struct Error(prometheus::Error); - -impl std::fmt::Display for Error { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl std::error::Error for Error {} - /// A collection of metrics indicating errors. #[derive(Debug, Clone)] pub struct ErrorMetrics { @@ -357,7 +339,9 @@ pub struct ErrorMetrics { impl ErrorMetrics { /// Set up counters and gauges used to produce Prometheus metrics - pub fn initialize(metrics_registry: &mut prometheus::Registry) -> Result { + pub fn initialize( + metrics_registry: &mut prometheus::Registry, + ) -> Result { let invalid_request_total = add_int_counter_metric( metrics_registry, "ndc_postgres_error_invalid_request_total_count",