Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using mutable self for update #14

Merged
merged 2 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/src/endpoints/common_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ impl RequestExt for CreateRequest {
Some(record)
}

fn update(&self, record: &mut Self::Output) {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.name = self.name.clone();
record.record_metadata.version = self.version.clone();
record.fields = self.fields.clone();
record.category = self.category.clone();
record.sample = self.sample.clone();
record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
5 changes: 3 additions & 2 deletions api/src/endpoints/connection_definition.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{
create, delete, read, update, ApiResult, CachedRequest, HookExt, ReadResponse, RequestExt, Unit,
create, delete, read, update, ApiResult, CachedRequest, HookExt, ReadResponse, RequestExt,
};
use crate::{
internal_server_error, not_found,
Expand Down Expand Up @@ -319,7 +319,7 @@ impl RequestExt for CreateRequest {
Some(record)
}

fn update(&self, record: &mut Self::Output) -> Unit {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.name = self.name.clone();
record.frontend.spec.description = self.description.clone();
record.frontend.spec.category = self.category.clone();
Expand All @@ -328,6 +328,7 @@ impl RequestExt for CreateRequest {
record.test_connection = self.test_connection;
record.platform = self.platform.clone();
record.record_metadata.active = self.active;
record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
6 changes: 4 additions & 2 deletions api/src/endpoints/connection_model_definition.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{create, delete, read, update, HookExt, RequestExt, Unit};
use super::{create, delete, read, update, HookExt, RequestExt};
use crate::{
api_payloads::ErrorResponse,
internal_server_error, not_found,
Expand Down Expand Up @@ -348,7 +348,7 @@ impl RequestExt for CreateRequest {
Some(record)
}

fn update(&self, record: &mut Self::Output) -> Unit {
fn update(&self, mut record: Self::Output) -> Self::Output {
let key = format!(
"api::{}::{}::{}::{}::{}::{}",
self.connection_platform,
Expand Down Expand Up @@ -383,6 +383,8 @@ impl RequestExt for CreateRequest {
record.mapping = self.mapping.clone();
record.extractor_config = self.extractor_config.clone();
record.record_metadata.version = self.version.clone();

record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
3 changes: 2 additions & 1 deletion api/src/endpoints/connection_model_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl RequestExt for CreateRequest {
})
}

fn update(&self, record: &mut Self::Output) {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.platform_id = self.platform_id;
record.platform_page_id = self.platform_page_id;
record.connection_platform = self.connection_platform.clone();
Expand All @@ -123,6 +123,7 @@ impl RequestExt for CreateRequest {
record.sample = self.sample.clone();
record.paths = self.paths.clone();
record.mapping = self.mapping.clone();
record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
6 changes: 4 additions & 2 deletions api/src/endpoints/connection_oauth_definition.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{create, delete, read, update, CachedRequest, HookExt, ReadResponse, RequestExt, Unit};
use super::{create, delete, read, update, CachedRequest, HookExt, ReadResponse, RequestExt};
use crate::server::{AppState, AppStores};
use axum::{
routing::{patch, post},
Expand Down Expand Up @@ -125,7 +125,7 @@ impl RequestExt for CreateRequest {
})
}

fn update(&self, record: &mut Self::Output) -> Unit {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.connection_platform = self.connection_platform.clone();
record.configuration = OAuthApiConfig {
init: self.init.configuration.clone(),
Expand Down Expand Up @@ -180,6 +180,8 @@ impl RequestExt for CreateRequest {
};
record.record_metadata.updated_at = Utc::now().timestamp_millis();
record.record_metadata.updated = true;

record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
11 changes: 5 additions & 6 deletions api/src/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ pub type InMemoryCache<T> = Arc<Cache<Option<BTreeMap<String, String>>, Arc<T>>>

pub trait RequestExt: Sized {
type Output: Serialize + DeserializeOwned + Unpin + Sync + Send + 'static;

/// Generate `Self::Output` of the request based on the given payload.
///
/// @param self
Expand All @@ -69,10 +68,10 @@ pub trait RequestExt: Sized {
None
}

/// Update the output of the request based on the input.
fn update(&self, _: &mut Self::Output) -> Unit {}
fn update(&self, output: Self::Output) -> Self::Output {
output
}

/// Get the store for the request.
fn get_store(stores: AppStores) -> MongoStore<Self::Output>;
}

Expand Down Expand Up @@ -285,7 +284,7 @@ where

let store = T::get_store(state.app_stores.clone());

let Some(mut record) = (match store.get_one(query.filter).await {
let Some(record) = (match store.get_one(query.filter).await {
Ok(ret) => ret,
Err(e) => {
error!("Error getting record in store: {e}");
Expand All @@ -295,7 +294,7 @@ where
return Err(not_found!("Record"));
};

req.update(&mut record);
let record = req.update(record);

let bson = bson::to_bson_with_options(
&record,
Expand Down
4 changes: 3 additions & 1 deletion api/src/endpoints/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl RequestExt for CreatePipelineRequest {
})
}

fn update(&self, record: &mut Self::Output) {
fn update(&self, mut record: Self::Output) -> Self::Output {
let CreatePipelineRequest {
name,
key,
Expand All @@ -83,6 +83,8 @@ impl RequestExt for CreatePipelineRequest {
record.signature = signature.clone();
record.config = Some(config.clone());
record.record_metadata.mark_updated(&record.ownership.id);

record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
4 changes: 3 additions & 1 deletion api/src/endpoints/platform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ impl RequestExt for CreateRequest {
})
}

fn update(&self, record: &mut Self::Output) {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.connection_definition_id = self.connection_definition_id;
record.name = self.name.clone();
record.url = self.url.clone();
record.platform_version = self.version.clone();
record.ownership = self.ownership.clone();
record.analyzed = self.analyzed;

record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
4 changes: 3 additions & 1 deletion api/src/endpoints/platform_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl RequestExt for CreateRequest {
})
}

fn update(&self, record: &mut Self::Output) {
fn update(&self, mut record: Self::Output) -> Self::Output {
record.platform_id = self.platform_id;
record.connection_definition_id = self.connection_definition_id;
record.r#type = self.r#type.clone();
Expand All @@ -127,6 +127,8 @@ impl RequestExt for CreateRequest {
record.content = self.content.clone();
record.ownership = self.ownership.clone();
record.analyzed = self.analyzed;

record
}

fn get_store(stores: AppStores) -> MongoStore<Self::Output> {
Expand Down
6 changes: 1 addition & 5 deletions api/src/routes/authenticated.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
endpoints::{
common_model, connection_definition,
connection_model_definition::{self, test_connection_model_definition},
connection_model_definition::{self},
connection_model_schema, connection_oauth_definition, openapi, platform, platform_page,
},
middleware::jwt_auth::{self, JwtState},
Expand All @@ -13,10 +13,6 @@ use tower_http::trace::TraceLayer;

pub async fn get_router(state: &Arc<AppState>) -> Router<Arc<AppState>> {
let routes = Router::new()
.route(
"/connection-model-definitions/test/:id",
post(test_connection_model_definition),
)
.nest(
"/connection-definitions",
connection_definition::get_router(),
Expand Down
7 changes: 6 additions & 1 deletion api/src/routes/public.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
endpoints::{
connection_definition, connection_model_schema, connection_oauth_definition,
connection_definition, connection_model_definition::test_connection_model_definition,
connection_model_schema, connection_oauth_definition,
event_access::create_event_access_for_new_user, openapi, read_cached,
},
middleware::jwt_auth::{self, JwtState},
Expand All @@ -24,6 +25,10 @@ pub fn get_router(state: &Arc<AppState>) -> Router<Arc<AppState>> {
jwt_auth::jwt_auth,
)),
)
.route(
"/connection-model-definitions/test/:id",
post(test_connection_model_definition),
)
.route(
"/connection-definitions",
get(read_cached::<connection_definition::CreateRequest, ConnectionDefinition>),
Expand Down
15 changes: 5 additions & 10 deletions api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
config::Config,
endpoints::{
connection_oauth_definition::FrontendOauthConnectionDefinition, openapi::OpenAPIData,
ReadResponse,
InMemoryCache, ReadResponse,
},
metrics::Metric,
routes,
Expand All @@ -27,7 +27,7 @@ use integrationos_domain::{
use moka::future::Cache;
use mongodb::{options::UpdateOptions, Client, Database};
use segment::{AutoBatcher, Batcher, HttpClient};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};
use tokio::{net::TcpListener, sync::mpsc::Sender, time::timeout, try_join};
use tracing::{error, info, trace, warn};

Expand Down Expand Up @@ -63,14 +63,9 @@ pub struct AppState {
pub openapi_data: OpenAPIData,
pub http_client: reqwest::Client,
pub connections_cache: Cache<(Arc<str>, HeaderValue), Arc<Connection>>,
pub connection_definitions_cache:
Arc<Cache<Option<BTreeMap<String, String>>, Arc<ReadResponse<ConnectionDefinition>>>>,
pub connection_oauth_definitions_cache: Arc<
Cache<
Option<BTreeMap<String, String>>,
Arc<ReadResponse<FrontendOauthConnectionDefinition>>,
>,
>,
pub connection_definitions_cache: InMemoryCache<ReadResponse<ConnectionDefinition>>,
pub connection_oauth_definitions_cache:
InMemoryCache<ReadResponse<FrontendOauthConnectionDefinition>>,
pub secrets_client: Arc<dyn CryptoExt + Sync + Send>,
pub extractor_caller: UnifiedDestination,
pub event_tx: Sender<Event>,
Expand Down
Loading