Skip to content

Commit

Permalink
Using mutable self for update (#14)
Browse files Browse the repository at this point in the history
* Using mutable self for update

* Moving connection model definition test to proper place
  • Loading branch information
sagojez authored May 6, 2024
1 parent 9fc0b09 commit 0f6b9cf
Show file tree
Hide file tree
Showing 12 changed files with 41 additions and 33 deletions.
3 changes: 2 additions & 1 deletion api/src/endpoints/common_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ impl RequestExt for CreateRequest {
Some(record)
}

fn update(&self, record: &mut Self::Output) {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.name = self.name.clone();
record.record_metadata.version = self.version.clone();
record.fields = self.fields.clone();
record.category = self.category.clone();
record.sample = self.sample.clone();
record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
5 changes: 3 additions & 2 deletions api/src/endpoints/connection_definition.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{
create, delete, read, update, ApiResult, CachedRequest, HookExt, ReadResponse, RequestExt, Unit,
create, delete, read, update, ApiResult, CachedRequest, HookExt, ReadResponse, RequestExt,
};
use crate::{
internal_server_error, not_found,
Expand Down Expand Up @@ -319,7 +319,7 @@ impl RequestExt for CreateRequest {
Some(record)
}

fn update(&self, record: &mut Self::Output) -> Unit {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.name = self.name.clone();
record.frontend.spec.description = self.description.clone();
record.frontend.spec.category = self.category.clone();
Expand All @@ -328,6 +328,7 @@ impl RequestExt for CreateRequest {
record.test_connection = self.test_connection;
record.platform = self.platform.clone();
record.record_metadata.active = self.active;
record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
6 changes: 4 additions & 2 deletions api/src/endpoints/connection_model_definition.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{create, delete, read, update, HookExt, RequestExt, Unit};
use super::{create, delete, read, update, HookExt, RequestExt};
use crate::{
api_payloads::ErrorResponse,
internal_server_error, not_found,
Expand Down Expand Up @@ -348,7 +348,7 @@ impl RequestExt for CreateRequest {
Some(record)
}

fn update(&self, record: &mut Self::Output) -> Unit {
fn update(&self, mut record: Self::Output) -> Self::Output {
let key = format!(
"api::{}::{}::{}::{}::{}::{}",
self.connection_platform,
Expand Down Expand Up @@ -383,6 +383,8 @@ impl RequestExt for CreateRequest {
record.mapping = self.mapping.clone();
record.extractor_config = self.extractor_config.clone();
record.record_metadata.version = self.version.clone();

record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
3 changes: 2 additions & 1 deletion api/src/endpoints/connection_model_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl RequestExt for CreateRequest {
})
}

fn update(&self, record: &mut Self::Output) {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.platform_id = self.platform_id;
record.platform_page_id = self.platform_page_id;
record.connection_platform = self.connection_platform.clone();
Expand All @@ -123,6 +123,7 @@ impl RequestExt for CreateRequest {
record.sample = self.sample.clone();
record.paths = self.paths.clone();
record.mapping = self.mapping.clone();
record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
6 changes: 4 additions & 2 deletions api/src/endpoints/connection_oauth_definition.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{create, delete, read, update, CachedRequest, HookExt, ReadResponse, RequestExt, Unit};
use super::{create, delete, read, update, CachedRequest, HookExt, ReadResponse, RequestExt};
use crate::server::{AppState, AppStores};
use axum::{
routing::{patch, post},
Expand Down Expand Up @@ -125,7 +125,7 @@ impl RequestExt for CreateRequest {
})
}

fn update(&self, record: &mut Self::Output) -> Unit {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.connection_platform = self.connection_platform.clone();
record.configuration = OAuthApiConfig {
init: self.init.configuration.clone(),
Expand Down Expand Up @@ -180,6 +180,8 @@ impl RequestExt for CreateRequest {
};
record.record_metadata.updated_at = Utc::now().timestamp_millis();
record.record_metadata.updated = true;

record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
11 changes: 5 additions & 6 deletions api/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ pub type InMemoryCache<T> = Arc<Cache<Option<BTreeMap<String, String>>, Arc<T>>>

pub trait RequestExt: Sized {
type Output: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static;

/// Generate `Self::Output` of the request based on the given payload.
///
/// @param self
Expand All @@ -69,10 +68,10 @@ pub trait RequestExt: Sized {
None
}

/// Update the output of the request based on the input.
fn update(&self, _: &mut Self::Output) -> Unit {}
fn update(&self, output: Self::Output) -> Self::Output {
output
}

/// Get the store for the request.
fn get_store(stores: AppStores) -> MongoStore<Self::Output>;
}

Expand Down Expand Up @@ -285,7 +284,7 @@ where

let store = T::get_store(state.app_stores.clone());

let Some(mut record) = (match store.get_one(query.filter).await {
let Some(record) = (match store.get_one(query.filter).await {
Ok(ret) => ret,
Err(e) => {
error!("Error getting record in store: {e}");
Expand All @@ -295,7 +294,7 @@ where
return Err(not_found!("Record"));
};

req.update(&mut record);
let record = req.update(record);

let bson = bson::to_bson_with_options(
&record,
Expand Down
4 changes: 3 additions & 1 deletion api/src/endpoints/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl RequestExt for CreatePipelineRequest {
})
}

fn update(&self, record: &mut Self::Output) {
fn update(&self, mut record: Self::Output) -> Self::Output {
let CreatePipelineRequest {
name,
key,
Expand All @@ -83,6 +83,8 @@ impl RequestExt for CreatePipelineRequest {
record.signature = signature.clone();
record.config = Some(config.clone());
record.record_metadata.mark_updated(&record.ownership.id);

record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
4 changes: 3 additions & 1 deletion api/src/endpoints/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ impl RequestExt for CreateRequest {
})
}

fn update(&self, record: &mut Self::Output) {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.connection_definition_id = self.connection_definition_id;
record.name = self.name.clone();
record.url = self.url.clone();
record.platform_version = self.version.clone();
record.ownership = self.ownership.clone();
record.analyzed = self.analyzed;

record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
4 changes: 3 additions & 1 deletion api/src/endpoints/platform_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl RequestExt for CreateRequest {
})
}

fn update(&self, record: &mut Self::Output) {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.platform_id = self.platform_id;
record.connection_definition_id = self.connection_definition_id;
record.r#type = self.r#type.clone();
Expand All @@ -127,6 +127,8 @@ impl RequestExt for CreateRequest {
record.content = self.content.clone();
record.ownership = self.ownership.clone();
record.analyzed = self.analyzed;

record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
6 changes: 1 addition & 5 deletions api/src/routes/authenticated.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
endpoints::{
common_model, connection_definition,
connection_model_definition::{self, test_connection_model_definition},
connection_model_definition::{self},
connection_model_schema, connection_oauth_definition, openapi, platform, platform_page,
},
middleware::jwt_auth::{self, JwtState},
Expand All @@ -13,10 +13,6 @@ use tower_http::trace::TraceLayer;

pub async fn get_router(state: &Arc<AppState>) -> Router<Arc<AppState>> {
let routes = Router::new()
.route(
"/connection-model-definitions/test/:id",
post(test_connection_model_definition),
)
.nest(
"/connection-definitions",
connection_definition::get_router(),
Expand Down
7 changes: 6 additions & 1 deletion api/src/routes/public.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
endpoints::{
connection_definition, connection_model_schema, connection_oauth_definition,
connection_definition, connection_model_definition::test_connection_model_definition,
connection_model_schema, connection_oauth_definition,
event_access::create_event_access_for_new_user, openapi, read_cached,
},
middleware::jwt_auth::{self, JwtState},
Expand All @@ -24,6 +25,10 @@ pub fn get_router(state: &Arc<AppState>) -> Router<Arc<AppState>> {
jwt_auth::jwt_auth,
)),
)
.route(
"/connection-model-definitions/test/:id",
post(test_connection_model_definition),
)
.route(
"/connection-definitions",
get(read_cached::<connection_definition::CreateRequest, ConnectionDefinition>),
Expand Down
15 changes: 5 additions & 10 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
config::Config,
endpoints::{
connection_oauth_definition::FrontendOauthConnectionDefinition, openapi::OpenAPIData,
ReadResponse,
InMemoryCache, ReadResponse,
},
metrics::Metric,
routes,
Expand All @@ -27,7 +27,7 @@ use integrationos_domain::{
use moka::future::Cache;
use mongodb::{options::UpdateOptions, Client, Database};
use segment::{AutoBatcher, Batcher, HttpClient};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};
use tokio::{net::TcpListener, sync::mpsc::Sender, time::timeout, try_join};
use tracing::{error, info, trace, warn};

Expand Down Expand Up @@ -63,14 +63,9 @@ pub struct AppState {
pub openapi_data: OpenAPIData,
pub http_client: reqwest::Client,
pub connections_cache: Cache<(Arc<str>, HeaderValue), Arc<Connection>>,
pub connection_definitions_cache:
Arc<Cache<Option<BTreeMap<String, String>>, Arc<ReadResponse<ConnectionDefinition>>>>,
pub connection_oauth_definitions_cache: Arc<
Cache<
Option<BTreeMap<String, String>>,
Arc<ReadResponse<FrontendOauthConnectionDefinition>>,
>,
>,
pub connection_definitions_cache: InMemoryCache<ReadResponse<ConnectionDefinition>>,
pub connection_oauth_definitions_cache:
InMemoryCache<ReadResponse<FrontendOauthConnectionDefinition>>,
pub secrets_client: Arc<dyn CryptoExt + Sync + Send>,
pub extractor_caller: UnifiedDestination,
pub event_tx: Sender<Event>,
Expand Down

0 comments on commit 0f6b9cf

Please sign in to comment.