diff --git a/src/algebra/refresh.rs b/src/algebra/refresh.rs index 7989f92..168aebe 100644 --- a/src/algebra/refresh.rs +++ b/src/algebra/refresh.rs @@ -1,10 +1,9 @@ use crate::{ algebra::{StorageExt, TriggerActor}, - domain::{Query, Refresh, StatefulActor, Trigger, Unit}, + domain::{Refresh, Trigger, Unit}, }; use actix::prelude::*; use chrono::{Duration, Utc}; -use futures::lock::Mutex; use integrationos_domain::{ algebra::MongoStore, client::secrets_client::SecretsClient, connection_oauth_definition::ConnectionOAuthDefinition, error::IntegrationOSError as Error, @@ -18,7 +17,6 @@ pub struct RefreshActor { oauths: Arc>, secrets: Arc, client: Client, - state: Arc>, } impl RefreshActor { @@ -33,7 +31,6 @@ impl RefreshActor { oauths, secrets, client, - state: StatefulActor::empty(), } } } @@ -64,7 +61,6 @@ impl Handler for RefreshActor { let client = self.client.clone(); let connections_store = self.connections.clone(); let oauths_store = self.oauths.clone(); - let state = self.state.clone(); Box::pin(async move { tracing::info!("Searching for connections to refresh"); @@ -99,14 +95,6 @@ impl Handler for RefreshActor { .collect::, _>>() { Ok(vec) => { - let vec_as_json = serde_json::to_value(&vec).map_err(|e| { - InternalError::encryption_error( - "Failed to serialize outcome", - Some(e.to_string().as_str()), - ) - })?; - StatefulActor::update(vec_as_json, state).await; - tracing::info!( "Refreshed {} connections with outcome: {:?}", vec.len(), @@ -120,13 +108,3 @@ impl Handler for RefreshActor { }) } } - -impl Handler for RefreshActor { - type Result = ResponseFuture; - - fn handle(&mut self, _: Query, _: &mut Self::Context) -> Self::Result { - let state = self.state.clone(); - - Box::pin(async move { state.lock().await.clone() }) - } -} diff --git a/src/algebra/trigger.rs b/src/algebra/trigger.rs index 1a696b7..82cd9f9 100644 --- a/src/algebra/trigger.rs +++ b/src/algebra/trigger.rs @@ -91,7 +91,7 @@ impl Handler for TriggerActor { let template = DefaultTemplate::default(); let ask = || async { - let conn_id = match &msg.connection().oauth { + let conn_oauth_id = match &msg.connection().oauth { Some(OAuth::Enabled { connection_oauth_definition_id: conn_oauth_definition_id, .. @@ -104,7 +104,7 @@ impl Handler for TriggerActor { let conn_oauth_definition = oauths .get_one(doc! { - "_id": conn_id.to_string(), + "_id": conn_oauth_id.to_string(), }) .await .map_err(|e| { @@ -115,7 +115,8 @@ impl Handler for TriggerActor { ) })? .ok_or(ApplicationError::not_found( - format!("Connection oauth definition not found: {}", conn_id).as_str(), + format!("Connection oauth definition not found: {}", conn_oauth_id) + .as_str(), None, ))?; @@ -203,7 +204,6 @@ impl Handler for TriggerActor { })?; let oauth_secret = secret.from_refresh(decoded, None, None, json); - let secret = secrets_client .create_secret( msg.connection().clone().ownership.client_id, @@ -216,7 +216,7 @@ impl Handler for TriggerActor { })?; let set = OAuth::Enabled { - connection_oauth_definition_id: *conn_id, + connection_oauth_definition_id: *conn_oauth_id, expires_at: Some( (chrono::Utc::now() + Duration::seconds(oauth_secret.expires_in as i64)) .timestamp(), @@ -248,7 +248,7 @@ impl Handler for TriggerActor { msg.connection().id ); - Ok::(*conn_id) + Ok::(msg.connection().id) }; match ask().await { diff --git a/src/domain/mod.rs b/src/domain/mod.rs index 3bb1e90..94c9631 100644 --- a/src/domain/mod.rs +++ b/src/domain/mod.rs @@ -1,11 +1,9 @@ mod outcome; -mod query; mod refresh; mod state; mod trigger; pub use outcome::*; -pub use query::*; pub use refresh::*; pub use state::*; pub use trigger::*; diff --git a/src/domain/query.rs b/src/domain/query.rs deleted file mode 100644 index bea6e81..0000000 --- a/src/domain/query.rs +++ /dev/null @@ -1,6 +0,0 @@ -use super::state::StatefulActor; -use actix::prelude::*; - -#[derive(Message, Debug, Clone)] -#[rtype(result = "StatefulActor")] -pub struct Query; diff --git a/src/lib.rs b/src/lib.rs index 1cd821d..23fc46e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -118,7 +118,6 @@ async fn run( scope(&(PREFIX.to_owned() + INTEGRATION_PREFIX)) // /v1/integration .wrap(from_fn(auth_middleware)) .service(trigger_refresh) - .service(get_state), ) .service(scope(PREFIX).service(health_check)) // /v1 .app_data(Data::new(state.clone())) diff --git a/src/service/http/private/mod.rs b/src/service/http/private/mod.rs index f6e3670..e863fd1 100644 --- a/src/service/http/private/mod.rs +++ b/src/service/http/private/mod.rs @@ -1,5 +1,3 @@ -mod refresh; mod trigger; -pub use refresh::*; pub use trigger::*; diff --git a/src/service/http/private/refresh.rs b/src/service/http/private/refresh.rs deleted file mode 100644 index 89e57ac..0000000 --- a/src/service/http/private/refresh.rs +++ /dev/null @@ -1,17 +0,0 @@ -use crate::domain::Query; -use crate::service::{AppState, ResponseType, ServerResponse}; -use actix_web::{get, web::Data, HttpResponse}; -use integrationos_domain::error::IntegrationOSError as Error; -use integrationos_domain::InternalError; - -#[tracing::instrument(skip(state))] -#[get("/get_state")] -pub async fn get_state(state: Data) -> Result { - let response = state - .refresh_actor - .send(Query) - .await - .map_err(|e| InternalError::io_err(e.to_string().as_str(), None))?; - - Ok(ServerResponse::from(ResponseType::Query, response, 200)) -}