Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implementing secret retrieval from connections-api #206

Merged
merged 8 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions integrationos-api/src/domain/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ pub struct ConnectionsConfig {
pub jwt_secret: String,
#[envconfig(from = "EMIT_URL", default = "http://localhost:3001")]
pub emit_url: String,
#[envconfig(from = "EMITTER_ENABLED", default = "false")]
pub emitter_enabled: bool,
#[envconfig(from = "CONNECTIONS_URL", default = "http://localhost:3005")]
/// Same as self url, but this may vary in a k8s environment hence it's a separate config
pub connections_url: String,
/// Burst size limit
#[envconfig(from = "API_VERSION", default = "v1")]
pub api_version: String,
Expand Down
21 changes: 5 additions & 16 deletions integrationos-api/src/helper/k8s_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,7 @@ impl K8sDriver for K8sDriverLogger {
&self,
params: ServiceSpecParams,
) -> Result<Service, IntegrationOSError> {
tracing::info!(
"Creating k8s resource {} in namespace {}",
params.name,
params.namespace
);
tracing::info!("Creating k8s service resource with params {:#?}", params);
Ok(Service::default())
}

Expand All @@ -154,11 +150,7 @@ impl K8sDriver for K8sDriverLogger {
&self,
params: DeploymentSpecParams,
) -> Result<Deployment, IntegrationOSError> {
tracing::info!(
"Creating k8s resource {} in namespace {}",
params.name,
params.namespace
);
tracing::info!("Creating k8s deployment resource with params {:#?}", params);
Ok(Deployment::default())
}

Expand Down Expand Up @@ -192,14 +184,11 @@ impl K8sDriver for K8sDriverLogger {
/// resources.
async fn coordinator(
&self,
_service: ServiceSpecParams,
service: ServiceSpecParams,
deployment: DeploymentSpecParams,
) -> Result<Unit, IntegrationOSError> {
tracing::info!(
"Creating k8s resource {} in namespace {}",
deployment.name,
deployment.namespace
);
self.create_deployment(deployment).await?;
self.create_service(service).await?;
Ok(())
}
}
Expand Down
117 changes: 67 additions & 50 deletions integrationos-api/src/logic/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ use axum::{
Extension, Json, Router,
};
use chrono::Utc;
use envconfig::Envconfig;
use http::HeaderMap;
use integrationos_domain::{
algebra::MongoStore,
connection_definition::{ConnectionDefinition, ConnectionDefinitionType},
database::DatabaseConnectionConfig,
database::{DatabasePodConfig, PostgresConfig},
database_secret::DatabaseConnectionSecret,
domain::connection::SanitizedConnection,
environment::Environment,
event_access::EventAccess,
Expand All @@ -29,7 +31,7 @@ use integrationos_domain::{
InternalError, Throughput,
};
use k8s_openapi::{
api::core::v1::{ContainerPort, EnvVar, ServicePort},
api::core::v1::{ContainerPort, EnvVar, EnvVarSource, SecretKeySelector, ServicePort},
apimachinery::pkg::util::intstr::IntOrString,
};
use mongodb::bson::doc;
Expand All @@ -56,6 +58,9 @@ pub fn get_router() -> Router<Arc<AppState>> {
const APP_LABEL: &str = "app";
const DATABASE_TYPE_LABEL: &str = "database-type";

const JWT_SECRET_REF_KEY: &str = "jwt-secret";
const JWT_SECRET_REF_NAME: &str = "database-secrets";

#[derive(Debug, Clone, PartialEq, Deserialize, Serialize, Validate)]
#[serde(rename_all = "camelCase")]
pub struct CreateConnectionPayload {
Expand All @@ -68,16 +73,6 @@ pub struct CreateConnectionPayload {
pub name: Option<String>,
}

#[derive(Clone, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "UPPERCASE")]
pub struct DatabaseConnectionSecret {
#[serde(flatten)]
pub value: DatabaseConnectionConfig,
pub namespace: String,
pub service_name: String,
pub connection_id: Id,
}

async fn test_connection(
state: &AppState,
connection_config: &ConnectionDefinition,
Expand Down Expand Up @@ -271,14 +266,9 @@ pub async fn create_connection(

let connection_id = Id::new(IdPrefix::Connection, Utc::now());

let (secret_value, service, deployment) = generate_k8s_specs_and_secret(
&connection_id,
&state,
&connection_config,
&payload,
&auth_form_data,
)
.await?;
let (secret_value, service, deployment) =
generate_k8s_specs_and_secret(&connection_id, &state, &connection_config, &auth_form_data)
.await?;

if let (Some(service), Some(deployment)) = (service.clone(), deployment.clone()) {
state.k8s_client.coordinator(service, deployment).await?;
Expand Down Expand Up @@ -378,9 +368,7 @@ async fn generate_k8s_specs_and_secret(
connection_id: &Id,
state: &AppState,
connection_config: &ConnectionDefinition,
payload: &CreateConnectionPayload,
auth_form_data: &Value,
// emit_url: &str,
) -> Result<
(
Value,
Expand All @@ -391,23 +379,6 @@ async fn generate_k8s_specs_and_secret(
> {
Ok(match connection_config.to_connection_type() {
integrationos_domain::ConnectionType::DatabaseSql {} => {
// Override for security reasons
let auth_form: HashMap<String, String> = payload
.auth_form_data
.clone()
.into_iter()
.chain(vec![
("WORKER_THREADS".into(), "1".into()),
("INTERNAL_SERVER_ADDRESS".into(), "0.0.0.0:5005".into()),
("CONNECTION_ID".into(), connection_id.to_string()),
("EMIT_URL".into(), state.config.emit_url.clone()),
(
"DATABASE_CONNECTION_TYPE".into(),
connection_config.platform.clone(),
),
])
.collect();

let service_name = ServiceName::from_id(*connection_id)?;

let namespace = match state.config.environment {
Expand All @@ -422,14 +393,44 @@ async fn generate_k8s_specs_and_secret(
connection_config.platform.clone(),
);

let database_connection_config =
DatabaseConnectionConfig::default().merge_unknown(auth_form)?;
let payload: HashMap<String, String> = serde_json::from_value(auth_form_data.clone())
.map_err(|e| {
error!("Error serializing auth form data for connection: {:?}", e);

ApplicationError::bad_request(&format!("Invalid auth form data: {:?}", e), None)
})?;

let database_pod_config = DatabasePodConfig {
worker_threads: Some(1),
address: "0.0.0.0:5000".parse().map_err(|_| {
InternalError::serialize_error("Unable to convert address to SocketAddr", None)
})?,
environment: state.config.environment,
emit_url: state.config.emit_url.clone(),
connections_url: state.config.connections_url.clone(),
database_connection_type: connection_config.platform.parse().map_err(|_| {
InternalError::serialize_error(
"Unable to convert database_connection_type to DatabaseConnectionType",
None,
)
})?,
connection_id: connection_id.to_string(),
emitter_enabled: state.config.emitter_enabled,
jwt_secret: None,
};

let secret = DatabaseConnectionSecret {
value: database_connection_config,
service_name: service_name.to_string(),
namespace: namespace.to_string(),
connection_id: *connection_id,
postgres_config: PostgresConfig::init_from_hashmap(&payload).map_err(|e| {
error!("Error initializing postgres config for connection: {:?}", e);

InternalError::serialize_error(
&format!("Unable to initialize postgres config: {:?}", e),
None,
)
})?,
};

let service = ServiceSpecParams {
Expand All @@ -452,18 +453,34 @@ async fn generate_k8s_specs_and_secret(
namespace,
image: state.config.database_connection_docker_image.clone(),
env: {
secret
.value
.as_hashmap()
.iter()
.fold(vec![], |mut env, (key, value)| {
env.push(EnvVar {
let mut env = database_pod_config.as_hashmap().iter().fold(
vec![],
|mut vars, (key, value)| {
vars.push(EnvVar {
name: key.to_string(),
value: Some(value.to_string()),
..Default::default()
});
env
})

vars
},
);

// JWT_SECRET
env.push(EnvVar {
name: "JWT_SECRET".to_string(),
value_from: Some(EnvVarSource {
secret_key_ref: Some(SecretKeySelector {
key: JWT_SECRET_REF_KEY.to_string(),
name: JWT_SECRET_REF_NAME.to_owned(),
optional: Some(false),
}),
..Default::default()
}),
..Default::default()
});

env
},
ports: vec![ContainerPort {
container_port: 5005,
Expand Down
4 changes: 2 additions & 2 deletions integrationos-api/src/logic/event_callback.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use super::connection::DatabaseConnectionSecret;
use crate::{
helper::{NamespaceScope, ServiceName},
server::AppState,
Expand All @@ -10,7 +9,8 @@ use axum::{
};
use bson::doc;
use integrationos_domain::{
emitted_events::ConnectionLostReason, ApplicationError, Connection, Id, IntegrationOSError,
database_secret::DatabaseConnectionSecret, emitted_events::ConnectionLostReason,
ApplicationError, Connection, Id, IntegrationOSError,
};
use std::sync::Arc;

Expand Down
22 changes: 21 additions & 1 deletion integrationos-api/src/logic/secrets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use axum::{
Extension, Json, Router,
};
use bson::doc;
use integrationos_domain::{event_access::EventAccess, secret::Secret, IntegrationOSError};
use integrationos_domain::{
event_access::EventAccess, secret::Secret, ApplicationError, Id, IntegrationOSError,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::Arc;
Expand Down Expand Up @@ -47,3 +49,21 @@ async fn get_secret(
.await?,
))
}

pub async fn get_admin_secret(
state: State<Arc<AppState>>,
Path(connection_id): Path<Id>,
) -> Result<Json<Secret>, IntegrationOSError> {
let (secret_id, owner) = state
.app_stores
.connection
.get_one_by_id(&connection_id.to_string())
.await?
.map(|c| (c.secrets_service_id, c.ownership.id))
.ok_or(ApplicationError::not_found(
&format!("connection with id {} not found", connection_id),
None,
))?;

Ok(Json(state.secrets_client.get(&secret_id, &owner).await?))
}
2 changes: 1 addition & 1 deletion integrationos-api/src/router/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub fn get_router(state: &Arc<AppState>) -> Router<Arc<AppState>> {
get(read::<common_enum::GetRequest, CommonEnum>),
),
)
.nest("/schemas", schema_generator::get_router())
.route(
"/connection-data",
get(read::<GetPublicConnectionDetailsRequest, PublicConnectionDetails>),
Expand All @@ -52,7 +53,6 @@ pub fn get_router(state: &Arc<AppState>) -> Router<Arc<AppState>> {
"/connection-definitions",
get(read::<connection_definition::CreateRequest, ConnectionDefinition>),
)
.nest("/schemas", schema_generator::get_router())
.route(
"/connection-oauth-definition-schema",
get(read::<
Expand Down
23 changes: 12 additions & 11 deletions integrationos-api/src/router/secured_jwt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use crate::{
common_enum, common_model, connection_definition,
connection_model_definition::{self},
connection_model_schema, connection_oauth_definition, event_callback, openapi, platform,
platform_page,
platform_page, secrets,
},
middleware::jwt_auth::{self, JwtState},
server::AppState,
};
use axum::{
middleware::{from_fn, from_fn_with_state},
routing::post,
routing::{get, post},
Router,
};
use integrationos_domain::telemetry::log_request_middleware;
Expand All @@ -23,24 +23,25 @@ pub async fn get_router(state: &Arc<AppState>) -> Router<Arc<AppState>> {
"/connection-definitions",
connection_definition::get_router(),
)
.nest(
"/connection-oauth-definitions",
connection_oauth_definition::get_router(),
)
.nest(
"/connection-model-definitions",
connection_model_definition::get_router(),
)
.route("/openapi", post(openapi::refresh_openapi))
.nest(
"/connection-model-schemas",
connection_model_schema::get_router(),
)
.nest("/platforms", platform::get_router())
.nest("/platform-pages", platform_page::get_router())
.nest("/common-models", common_model::get_router())
.nest(
"/connection-oauth-definitions",
connection_oauth_definition::get_router(),
)
.nest("/common-enums", common_enum::get_router())
.nest("/event-callbacks", event_callback::get_router());
.nest("/common-models", common_model::get_router())
.nest("/event-callbacks", event_callback::get_router())
.nest("/platform-pages", platform_page::get_router())
.nest("/platforms", platform::get_router())
.route("/admin/connection/:id", get(secrets::get_admin_secret))
.route("/openapi", post(openapi::refresh_openapi));

routes
.layer(from_fn_with_state(
Expand Down
8 changes: 4 additions & 4 deletions integrationos-api/src/router/secured_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,18 @@ use tracing::warn;

pub async fn get_router(state: &Arc<AppState>) -> Router<Arc<AppState>> {
let routes = Router::new()
.layer(TraceLayer::new_for_http())
.nest("/connections", connection::get_router())
.nest("/vault/connections", vault_connection::get_router())
.nest("/event-access", event_access::get_router())
.nest("/events", events::get_router())
.nest("/metrics", metrics::get_router())
.nest("/oauth", oauth::get_router())
.nest("/passthrough", passthrough::get_router())
.nest("/pipelines", pipeline::get_router())
.nest("/secrets", secrets::get_router())
.nest("/transactions", transactions::get_router())
.nest("/unified", unified::get_router())
.nest("/vault/connections", vault_connection::get_router())
.route(
"/connection-model-definitions/test/:id",
post(test_connection_model_definition),
Expand All @@ -52,9 +54,7 @@ pub async fn get_router(state: &Arc<AppState>) -> Router<Arc<AppState>> {
PublicGetConnectionModelSchema,
PublicConnectionModelSchema,
>),
)
.layer(TraceLayer::new_for_http())
.nest("/metrics", metrics::get_router());
);

let routes = match RateLimiter::from_state(state.clone()).await {
Ok(rate_limiter) => routes.layer(axum::middleware::from_fn_with_state(
Expand Down
Loading
Loading