Skip to content

Commit

Permalink
Introduce a "fast path" for returning already-serialized JSON.
Browse files Browse the repository at this point in the history
This introduces a new enum, `JsonResponse`, which is designed to
supplant `axum::Json`. It implements `axum::IntoResponse`.

It has two variants:

1. `Value`, which behaves the same as `axum::Json`.
2. `Serialized`, which allows the connector to provide
   already-serialized JSON in the form of a `Bytes` value.

The latter can be used to construct the JSON directly in the database
and avoid deserialization and re-serialization.

This is a breaking change, as at the very least, the connector
implementations will now need to wrap responses in
`JsonResponse::Value`. Hopefully it's not too much of a big deal.

There is a helper function, `JsonResponse::into_value`, used to
deserialize the serialized bytes in certain situations. This is because
the v2 compatibility layer and the connector used for ndc-test both
require transforming the data, and so cannot work directly with bytes.
We should not call this when running the connector normally, and it is
not available outside this crate.
  • Loading branch information
SamirTalwar committed Oct 10, 2023
1 parent 2d19614 commit bbf7751
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 209 deletions.
36 changes: 36 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions rust-connector-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ url = "2.4.1"
uuid = "^1.3.4"
gdc_rust_types = { git = "https://github.com/hasura/gdc_rust_types", rev = "bc57c40" }
indexmap = "^1"
bytes = "1.5.0"
mime = "0.3.17"

[dev-dependencies]
axum-test-helper = "0.3.0"
29 changes: 16 additions & 13 deletions rust-connector-sdk/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ use ndc_client::models;
use serde::Serialize;
use std::error::Error;
use thiserror::Error;

use crate::json_response::JsonResponse;

pub mod example;

/// Errors which occur when trying to validate connector
Expand Down Expand Up @@ -35,7 +38,7 @@ pub enum KeyOrIndex {
#[derive(Debug, Error)]
pub enum UpdateConfigurationError {
#[error("error validating configuration: {0}")]
Other(Box<dyn Error + Send + Sync>),
Other(#[from] Box<dyn Error + Send + Sync>),
}

/// Errors which occur when trying to initialize connector
Expand All @@ -45,7 +48,7 @@ pub enum UpdateConfigurationError {
#[derive(Debug, Error)]
pub enum InitializationError {
#[error("error initializing connector state: {0}")]
Other(Box<dyn Error + Send + Sync>),
Other(#[from] Box<dyn Error + Send + Sync>),
}

/// Errors which occur when trying to update metrics.
Expand All @@ -54,7 +57,7 @@ pub enum InitializationError {
#[derive(Debug, Error)]
pub enum FetchMetricsError {
#[error("error fetching metrics: {0}")]
Other(Box<dyn Error + Send + Sync>),
Other(#[from] Box<dyn Error + Send + Sync>),
}

/// Errors which occur when checking connector health.
Expand All @@ -63,7 +66,7 @@ pub enum FetchMetricsError {
#[derive(Debug, Error)]
pub enum HealthError {
#[error("error checking health status: {0}")]
Other(Box<dyn Error + Send + Sync>),
Other(#[from] Box<dyn Error + Send + Sync>),
}

/// Errors which occur when retrieving the connector schema.
Expand All @@ -72,7 +75,7 @@ pub enum HealthError {
#[derive(Debug, Error)]
pub enum SchemaError {
#[error("error retrieving the schema: {0}")]
Other(Box<dyn Error + Send + Sync>),
Other(#[from] Box<dyn Error + Send + Sync>),
}

/// Errors which occur when executing a query.
Expand All @@ -91,7 +94,7 @@ pub enum QueryError {
#[error("unsupported operation: {0}")]
UnsupportedOperation(String),
#[error("error executing query: {0}")]
Other(Box<dyn Error + Send + Sync>),
Other(#[from] Box<dyn Error + Send + Sync>),
}

/// Errors which occur when explaining a query.
Expand All @@ -110,7 +113,7 @@ pub enum ExplainError {
#[error("unsupported operation: {0}")]
UnsupportedOperation(String),
#[error("error explaining query: {0}")]
Other(Box<dyn Error + Send + Sync>),
Other(#[from] Box<dyn Error + Send + Sync>),
}

/// Errors which occur when executing a mutation.
Expand All @@ -137,7 +140,7 @@ pub enum MutationError {
#[error("mutation violates constraint: {0}")]
ConstraintNotMet(String),
#[error("error executing mutation: {0}")]
Other(Box<dyn Error + Send + Sync>),
Other(#[from] Box<dyn Error + Send + Sync>),
}

/// Connectors using this library should implement this trait.
Expand Down Expand Up @@ -238,15 +241,15 @@ pub trait Connector {
///
/// This function implements the [capabilities endpoint](https://hasura.github.io/ndc-spec/specification/capabilities.html)
/// from the NDC specification.
async fn get_capabilities() -> models::CapabilitiesResponse;
async fn get_capabilities() -> JsonResponse<models::CapabilitiesResponse>;

/// Get the connector's schema.
///
/// This function implements the [schema endpoint](https://hasura.github.io/ndc-spec/specification/schema/index.html)
/// from the NDC specification.
async fn get_schema(
configuration: &Self::Configuration,
) -> Result<models::SchemaResponse, SchemaError>;
) -> Result<JsonResponse<models::SchemaResponse>, SchemaError>;

/// Explain a query by creating an execution plan
///
Expand All @@ -256,7 +259,7 @@ pub trait Connector {
configuration: &Self::Configuration,
state: &Self::State,
request: models::QueryRequest,
) -> Result<models::ExplainResponse, ExplainError>;
) -> Result<JsonResponse<models::ExplainResponse>, ExplainError>;

/// Execute a mutation
///
Expand All @@ -266,7 +269,7 @@ pub trait Connector {
configuration: &Self::Configuration,
state: &Self::State,
request: models::MutationRequest,
) -> Result<models::MutationResponse, MutationError>;
) -> Result<JsonResponse<models::MutationResponse>, MutationError>;

/// Execute a query
///
Expand All @@ -276,5 +279,5 @@ pub trait Connector {
configuration: &Self::Configuration,
state: &Self::State,
request: models::QueryRequest,
) -> Result<models::QueryResponse, QueryError>;
) -> Result<JsonResponse<models::QueryResponse>, QueryError>;
}
18 changes: 9 additions & 9 deletions rust-connector-sdk/src/connector/example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ impl Connector for Example {
Ok(())
}

async fn get_capabilities() -> models::CapabilitiesResponse {
models::CapabilitiesResponse {
async fn get_capabilities() -> JsonResponse<models::CapabilitiesResponse> {
JsonResponse::Value(models::CapabilitiesResponse {
versions: "^0.1.0".into(),
capabilities: models::Capabilities {
explain: None,
Expand All @@ -63,48 +63,48 @@ impl Connector for Example {
relation_comparisons: None,
}),
},
}
})
}

async fn get_schema(
_configuration: &Self::Configuration,
) -> Result<models::SchemaResponse, SchemaError> {
) -> Result<JsonResponse<models::SchemaResponse>, SchemaError> {
async {
info_span!("inside tracing example");
}
.instrument(info_span!("tracing example"))
.await;

Ok(models::SchemaResponse {
Ok(JsonResponse::Value(models::SchemaResponse {
collections: vec![],
functions: vec![],
procedures: vec![],
object_types: BTreeMap::new(),
scalar_types: BTreeMap::new(),
})
}))
}

async fn explain(
_configuration: &Self::Configuration,
_state: &Self::State,
_request: models::QueryRequest,
) -> Result<models::ExplainResponse, ExplainError> {
) -> Result<JsonResponse<models::ExplainResponse>, ExplainError> {
todo!()
}

async fn mutation(
_configuration: &Self::Configuration,
_state: &Self::State,
_request: models::MutationRequest,
) -> Result<models::MutationResponse, MutationError> {
) -> Result<JsonResponse<models::MutationResponse>, MutationError> {
todo!()
}

async fn query(
_configuration: &Self::Configuration,
_state: &Self::State,
_request: models::QueryRequest,
) -> Result<models::QueryResponse, QueryError> {
) -> Result<JsonResponse<models::QueryResponse>, QueryError> {
todo!()
}
}
40 changes: 26 additions & 14 deletions rust-connector-sdk/src/default_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod v2_compat;
use crate::{
check_health,
connector::{Connector, InvalidRange, SchemaError, UpdateConfigurationError},
json_response::JsonResponse,
routes,
tracing::{init_tracing, make_span, on_response},
};
Expand Down Expand Up @@ -373,7 +374,7 @@ async fn get_metrics<C: Connector>(
routes::get_metrics::<C>(&state.configuration, &state.state, state.metrics)
}

async fn get_capabilities<C: Connector>() -> Json<CapabilitiesResponse> {
async fn get_capabilities<C: Connector>() -> JsonResponse<CapabilitiesResponse> {
routes::get_capabilities::<C>().await
}

Expand All @@ -385,28 +386,28 @@ async fn get_health<C: Connector>(

async fn get_schema<C: Connector>(
State(state): State<ServerState<C>>,
) -> Result<Json<SchemaResponse>, (StatusCode, Json<ErrorResponse>)> {
) -> Result<JsonResponse<SchemaResponse>, (StatusCode, Json<ErrorResponse>)> {
routes::get_schema::<C>(&state.configuration).await
}

async fn post_explain<C: Connector>(
State(state): State<ServerState<C>>,
request: Json<QueryRequest>,
) -> Result<Json<ExplainResponse>, (StatusCode, Json<ErrorResponse>)> {
) -> Result<JsonResponse<ExplainResponse>, (StatusCode, Json<ErrorResponse>)> {
routes::post_explain::<C>(&state.configuration, &state.state, request).await
}

async fn post_mutation<C: Connector>(
State(state): State<ServerState<C>>,
request: Json<MutationRequest>,
) -> Result<Json<MutationResponse>, (StatusCode, Json<ErrorResponse>)> {
) -> Result<JsonResponse<MutationResponse>, (StatusCode, Json<ErrorResponse>)> {
routes::post_mutation::<C>(&state.configuration, &state.state, request).await
}

async fn post_query<C: Connector>(
State(state): State<ServerState<C>>,
request: Json<QueryRequest>,
) -> Result<Json<QueryResponse>, (StatusCode, Json<ErrorResponse>)> {
) -> Result<JsonResponse<QueryResponse>, (StatusCode, Json<ErrorResponse>)> {
routes::post_query::<C>(&state.configuration, &state.state, request).await
}

Expand Down Expand Up @@ -528,12 +529,15 @@ where
Json(ValidateErrors::InvalidConfiguration { ranges }),
),
})?;
let schema = C::get_schema(&configuration).await.map_err(|e| match e {
SchemaError::Other(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ValidateErrors::UnableToBuildSchema),
),
})?;
let schema = C::get_schema(&configuration)
.await
.and_then(JsonResponse::into_value)
.map_err(|e| match e {
SchemaError::Other(_) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ValidateErrors::UnableToBuildSchema),
),
})?;
let resolved_config_bytes = serde_json::to_vec(&configuration).map_err(|e| {
(
StatusCode::INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -561,12 +565,17 @@ where
async fn get_capabilities(
&self,
) -> Result<ndc_client::models::CapabilitiesResponse, ndc_test::Error> {
Ok(C::get_capabilities().await)
C::get_capabilities()
.await
.into_value::<Box<dyn std::error::Error + Send + Sync>>()
.map_err(|err| ndc_test::Error::OtherError(err))
}

async fn get_schema(&self) -> Result<ndc_client::models::SchemaResponse, ndc_test::Error> {
match C::get_schema(&self.configuration).await {
Ok(response) => Ok(response),
Ok(response) => response
.into_value::<Box<dyn std::error::Error + Send + Sync>>()
.map_err(|err| ndc_test::Error::OtherError(err)),
Err(err) => Err(ndc_test::Error::OtherError(err.into())),
}
}
Expand All @@ -575,7 +584,10 @@ where
&self,
request: ndc_client::models::QueryRequest,
) -> Result<ndc_client::models::QueryResponse, ndc_test::Error> {
match C::query(&self.configuration, &self.state, request).await {
match C::query(&self.configuration, &self.state, request)
.await
.and_then(JsonResponse::into_value)
{
Ok(response) => Ok(response),
Err(err) => Err(ndc_test::Error::OtherError(err.into())),
}
Expand Down
Loading

0 comments on commit bbf7751

Please sign in to comment.