Skip to content

Commit

Permalink
Move state into its own module.
Browse files Browse the repository at this point in the history
It doesn't need to be coupled with configuration.
  • Loading branch information
SamirTalwar committed Oct 22, 2023
1 parent 7eb46fb commit 05f081c
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 82 deletions.
5 changes: 3 additions & 2 deletions crates/connectors/ndc-citus/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use ndc_postgres::explain;
use ndc_postgres::health;
use ndc_postgres::query;
use ndc_postgres::schema;
use ndc_postgres::state;

use tracing::{info_span, Instrument};

Expand All @@ -36,7 +37,7 @@ impl connector::Connector for Citus {
/// The type of validated configuration
type Configuration = Arc<configuration::Configuration>;
/// The type of unserializable state
type State = Arc<configuration::State>;
type State = Arc<state::State>;

fn make_empty_configuration() -> Self::RawConfiguration {
configuration::RawConfiguration::empty()
Expand Down Expand Up @@ -73,7 +74,7 @@ impl connector::Connector for Citus {
configuration: &Self::Configuration,
metrics: &mut prometheus::Registry,
) -> Result<Self::State, connector::InitializationError> {
configuration::create_state(configuration, metrics)
state::create_state(configuration, metrics)
.instrument(info_span!("Initialise state"))
.await
.map(Arc::new)
Expand Down
5 changes: 3 additions & 2 deletions crates/connectors/ndc-cockroach/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use ndc_postgres::explain;
use ndc_postgres::health;
use ndc_postgres::query;
use ndc_postgres::schema;
use ndc_postgres::state;

use tracing::info_span;
use tracing::Instrument;
Expand All @@ -37,7 +38,7 @@ impl connector::Connector for Cockroach {
/// The type of validated configuration
type Configuration = Arc<configuration::Configuration>;
/// The type of unserializable state
type State = Arc<configuration::State>;
type State = Arc<state::State>;

fn make_empty_configuration() -> Self::RawConfiguration {
configuration::RawConfiguration::empty()
Expand Down Expand Up @@ -74,7 +75,7 @@ impl connector::Connector for Cockroach {
configuration: &Self::Configuration,
metrics: &mut prometheus::Registry,
) -> Result<Self::State, connector::InitializationError> {
configuration::create_state(configuration, metrics)
state::create_state(configuration, metrics)
.instrument(info_span!("Initialise state"))
.await
.map(Arc::new)
Expand Down
73 changes: 2 additions & 71 deletions crates/connectors/ndc-postgres/src/configuration.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
//! Internal Configuration and state for our connector.

use sqlx::postgres::{PgPool, PgPoolOptions};

use thiserror::Error;

use query_engine_execution::metrics;
use query_engine_metadata::metadata;
//! Configuration for the connector.

mod version1;

use tracing::{info_span, Instrument};
use query_engine_metadata::metadata;

pub use version1::{
configure, occurring_scalar_types, validate_raw_configuration, Configuration, ConnectionUri,
Expand Down Expand Up @@ -40,65 +33,3 @@ impl<'a> version1::Configuration {
}
}
}

/// State for our connector.
#[derive(Debug)]
pub struct State {
pub pool: PgPool,
pub metrics: metrics::Metrics,
}

/// Create a connection pool and wrap it inside a connector State.
pub async fn create_state(
configuration: &Configuration,
metrics_registry: &mut prometheus::Registry,
) -> Result<State, InitializationError> {
let pool = create_pool(configuration)
.instrument(info_span!("Create connection pool"))
.await?;

let metrics = async {
let metrics_inner = metrics::Metrics::initialize(metrics_registry)
.map_err(InitializationError::MetricsError)?;
metrics_inner.set_pool_options_metrics(pool.options());
Ok(metrics_inner)
}
.instrument(info_span!("Setup metrics"))
.await?;

Ok(State { pool, metrics })
}

/// Create a connection pool with default settings.
/// - <https://docs.rs/sqlx/latest/sqlx/pool/struct.PoolOptions.html>
async fn create_pool(configuration: &Configuration) -> Result<PgPool, InitializationError> {
let ConnectionUri::Uri(ResolvedSecret(uri)) = &configuration.config.connection_uri;

let pool_settings = &configuration.config.pool_settings;

PgPoolOptions::new()
.max_connections(pool_settings.max_connections)
.acquire_timeout(std::time::Duration::from_secs(pool_settings.pool_timeout))
.idle_timeout(
pool_settings
.idle_timeout
.map(std::time::Duration::from_secs),
)
.max_lifetime(
pool_settings
.connection_lifetime
.map(std::time::Duration::from_secs),
)
.connect(uri)
.await
.map_err(InitializationError::UnableToCreatePool)
}

/// State initialization error.
#[derive(Debug, Error)]
pub enum InitializationError {
#[error("unable to initialize connection pool: {0}")]
UnableToCreatePool(sqlx::Error),
#[error("error initializing metrics: {0}")]
MetricsError(metrics::Error),
}
12 changes: 9 additions & 3 deletions crates/connectors/ndc-postgres/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ use ndc_sdk::connector;
use ndc_sdk::json_response::JsonResponse;
use ndc_sdk::models;

use super::{capabilities, configuration, explain, health, query, schema};
use super::capabilities;
use super::configuration;
use super::explain;
use super::health;
use super::query;
use super::schema;
use super::state;

const CONFIGURATION_QUERY: &str = include_str!("configuration.sql");

Expand All @@ -28,7 +34,7 @@ impl connector::Connector for Postgres {
/// The type of validated configuration
type Configuration = Arc<configuration::Configuration>;
/// The type of unserializable state
type State = Arc<configuration::State>;
type State = Arc<state::State>;

fn make_empty_configuration() -> Self::RawConfiguration {
configuration::RawConfiguration::empty()
Expand Down Expand Up @@ -65,7 +71,7 @@ impl connector::Connector for Postgres {
configuration: &Self::Configuration,
metrics: &mut prometheus::Registry,
) -> Result<Self::State, connector::InitializationError> {
configuration::create_state(configuration, metrics)
state::create_state(configuration, metrics)
.instrument(info_span!("Initialise state"))
.await
.map(Arc::new)
Expand Down
3 changes: 2 additions & 1 deletion crates/connectors/ndc-postgres/src/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ use query_engine_execution::execution;
use query_engine_translation::translation;

use super::configuration;
use super::state;

/// Explain a query by creating an execution plan
///
/// This function implements the [explain endpoint](https://hasura.github.io/ndc-spec/specification/explain.html)
/// from the NDC specification.
pub async fn explain<'a>(
configuration: &configuration::RuntimeConfiguration<'a>,
state: &configuration::State,
state: &state::State,
query_request: models::QueryRequest,
) -> Result<models::ExplainResponse, connector::ExplainError> {
async move {
Expand Down
1 change: 1 addition & 0 deletions crates/connectors/ndc-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ pub mod explain;
pub mod health;
pub mod query;
pub mod schema;
pub mod state;
7 changes: 4 additions & 3 deletions crates/connectors/ndc-postgres/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ use query_engine_sql::sql;
use query_engine_translation::translation;

use super::configuration;
use super::state;

/// Execute a query
///
/// This function implements the [query endpoint](https://hasura.github.io/ndc-spec/specification/queries/index.html)
/// from the NDC specification.
pub async fn query<'a>(
configuration: &configuration::RuntimeConfiguration<'a>,
state: &configuration::State,
state: &state::State,
query_request: models::QueryRequest,
) -> Result<JsonResponse<models::QueryResponse>, connector::QueryError> {
let timer = state.metrics.time_query_total();
Expand Down Expand Up @@ -51,7 +52,7 @@ pub async fn query<'a>(

fn plan_query(
configuration: &configuration::RuntimeConfiguration,
state: &configuration::State,
state: &state::State,
query_request: models::QueryRequest,
) -> Result<sql::execution_plan::ExecutionPlan, connector::QueryError> {
let timer = state.metrics.time_query_plan();
Expand All @@ -69,7 +70,7 @@ fn plan_query(
}

async fn execute_query(
state: &configuration::State,
state: &state::State,
plan: sql::execution_plan::ExecutionPlan,
) -> Result<JsonResponse<models::QueryResponse>, connector::QueryError> {
execution::execute(&state.pool, &state.metrics, plan)
Expand Down
72 changes: 72 additions & 0 deletions crates/connectors/ndc-postgres/src/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//! Transient state used by the connector.
//!
//! This is initialized on startup.

use query_engine_execution::metrics;
use sqlx::postgres::{PgPool, PgPoolOptions};
use thiserror::Error;
use tracing::{info_span, Instrument};

use crate::configuration::{Configuration, ConnectionUri, ResolvedSecret};

/// State for our connector.
#[derive(Debug)]
pub struct State {
pub pool: PgPool,
pub metrics: metrics::Metrics,
}

/// Create a connection pool and wrap it inside a connector State.
pub async fn create_state(
configuration: &Configuration,
metrics_registry: &mut prometheus::Registry,
) -> Result<State, InitializationError> {
let pool = create_pool(configuration)
.instrument(info_span!("Create connection pool"))
.await?;

let metrics = async {
let metrics_inner = metrics::Metrics::initialize(metrics_registry)
.map_err(InitializationError::MetricsError)?;
metrics_inner.set_pool_options_metrics(pool.options());
Ok(metrics_inner)
}
.instrument(info_span!("Setup metrics"))
.await?;

Ok(State { pool, metrics })
}

/// Create a connection pool with default settings.
/// - <https://docs.rs/sqlx/latest/sqlx/pool/struct.PoolOptions.html>
async fn create_pool(configuration: &Configuration) -> Result<PgPool, InitializationError> {
let ConnectionUri::Uri(ResolvedSecret(uri)) = &configuration.config.connection_uri;

let pool_settings = &configuration.config.pool_settings;

PgPoolOptions::new()
.max_connections(pool_settings.max_connections)
.acquire_timeout(std::time::Duration::from_secs(pool_settings.pool_timeout))
.idle_timeout(
pool_settings
.idle_timeout
.map(std::time::Duration::from_secs),
)
.max_lifetime(
pool_settings
.connection_lifetime
.map(std::time::Duration::from_secs),
)
.connect(uri)
.await
.map_err(InitializationError::UnableToCreatePool)
}

/// State initialization error.
#[derive(Debug, Error)]
pub enum InitializationError {
#[error("unable to initialize connection pool: {0}")]
UnableToCreatePool(sqlx::Error),
#[error("error initializing metrics: {0}")]
MetricsError(metrics::Error),
}

0 comments on commit 05f081c

Please sign in to comment.