Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implementing secret retrieval from connections-api #206

Merged
merged 8 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion integrationos-api/src/domain/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ pub struct ConnectionsConfig {
pub jwt_secret: String,
#[envconfig(from = "EMIT_URL", default = "http://localhost:3001")]
pub emit_url: String,
#[envconfig(from = "EMITTER_ENABLED", default = "false")]
pub emitter_enabled: bool,
#[envconfig(from = "CONNECTIONS_URL", default = "http://localhost:3005")]
/// Same as self url, but this may vary in a k8s environment hence it's a separate config
pub connections_url: String,
/// Burst size limit
#[envconfig(from = "API_VERSION", default = "v1")]
pub api_version: String,
Expand All @@ -81,6 +86,8 @@ pub struct ConnectionsConfig {
default = "integrationos-database"
)]
pub database_connection_docker_image: String,
#[envconfig(from = "NAMESPACE", default = "development")]
pub namespace: String,
#[envconfig(from = "DATABASE_CONNECTION_PROBE_TIMEOUT_SECS", default = "10")]
pub database_connection_probe_timeout_secs: u64,
#[envconfig(from = "K8S_MODE", default = "logger")]
Expand Down Expand Up @@ -147,7 +154,13 @@ impl Display for ConnectionsConfig {
writeln!(f, "{}", self.db_config)?;
writeln!(f, "{}", self.cache_config)?;
writeln!(f, "RATE_LIMIT_ENABLED: {}", self.rate_limit_enabled)?;
writeln!(f, "ENVIRONMENT: {}", self.environment)
writeln!(f, "ENVIRONMENT: {}", self.environment)?;
writeln!(
f,
"DATABASE_CONNECTION_DOCKER_IMAGE: {}",
self.database_connection_docker_image
)?;
writeln!(f, "NAMESPACE: {}", self.namespace)
}
}

Expand Down
78 changes: 15 additions & 63 deletions integrationos-api/src/helper/k8s_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,6 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::fmt::Debug;
use std::{collections::BTreeMap, fmt::Display};

#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum NamespaceScope {
Development,
Production,
}

impl TryFrom<&str> for NamespaceScope {
type Error = IntegrationOSError;

fn try_from(value: &str) -> Result<Self, Self::Error> {
match value {
"development-db-conns" => Ok(NamespaceScope::Development),
"production-db-conns" => Ok(NamespaceScope::Production),
_ => Err(InternalError::invalid_argument(
&format!("Invalid namespace scope: {}", value),
None,
)),
}
}
}

impl AsRef<str> for NamespaceScope {
fn as_ref(&self) -> &str {
match self {
NamespaceScope::Development => "development-db-conns",
NamespaceScope::Production => "production-db-conns",
}
}
}

impl Display for NamespaceScope {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.as_ref())
}
}

#[async_trait]
pub trait K8sDriver: Send + Sync {
async fn create_service(
Expand All @@ -68,7 +31,7 @@ pub trait K8sDriver: Send + Sync {
) -> Result<Deployment, IntegrationOSError>;
async fn delete_all(
&self,
namespace: NamespaceScope,
namespace: String,
name: ServiceName,
) -> Result<Unit, IntegrationOSError>;
async fn coordinator(
Expand Down Expand Up @@ -110,7 +73,7 @@ impl K8sDriver for K8sDriverImpl {

async fn delete_all(
&self,
namespace: NamespaceScope,
namespace: String,
name: ServiceName,
) -> Result<Unit, IntegrationOSError> {
delete_all_impl(self.client.clone(), namespace, name).await
Expand Down Expand Up @@ -138,11 +101,7 @@ impl K8sDriver for K8sDriverLogger {
&self,
params: ServiceSpecParams,
) -> Result<Service, IntegrationOSError> {
tracing::info!(
"Creating k8s resource {} in namespace {}",
params.name,
params.namespace
);
tracing::info!("Creating k8s service resource with params {:#?}", params);
Ok(Service::default())
}

Expand All @@ -154,11 +113,7 @@ impl K8sDriver for K8sDriverLogger {
&self,
params: DeploymentSpecParams,
) -> Result<Deployment, IntegrationOSError> {
tracing::info!(
"Creating k8s resource {} in namespace {}",
params.name,
params.namespace
);
tracing::info!("Creating k8s deployment resource with params {:#?}", params);
Ok(Deployment::default())
}

Expand All @@ -169,13 +124,13 @@ impl K8sDriver for K8sDriverLogger {
/// - `namespace` - Namespace the existing deployment resides in
async fn delete_all(
&self,
namespace: NamespaceScope,
namespace: String,
name: ServiceName,
) -> Result<Unit, IntegrationOSError> {
tracing::info!(
"Deleting k8s resource {} in namespace {}",
name.as_ref(),
namespace.as_ref()
namespace,
);
Ok(())
}
Expand All @@ -192,14 +147,11 @@ impl K8sDriver for K8sDriverLogger {
/// resources.
async fn coordinator(
&self,
_service: ServiceSpecParams,
service: ServiceSpecParams,
deployment: DeploymentSpecParams,
) -> Result<Unit, IntegrationOSError> {
tracing::info!(
"Creating k8s resource {} in namespace {}",
deployment.name,
deployment.namespace
);
self.create_deployment(deployment).await?;
self.create_service(service).await?;
Ok(())
}
}
Expand All @@ -215,7 +167,7 @@ pub struct ServiceSpecParams {
/// Annotations to apply to the service. Has to match with the deployment metadata
pub name: ServiceName,
/// Namespace the service should reside in
pub namespace: NamespaceScope,
pub namespace: String,
}

async fn create_service_impl(
Expand All @@ -226,7 +178,7 @@ async fn create_service_impl(
metadata: ObjectMeta {
name: Some(params.name.as_ref().to_string()),
labels: Some(params.labels.clone()),
namespace: Some(params.namespace.as_ref().to_owned()),
namespace: Some(params.namespace.clone()),
..Default::default()
},
spec: Some(ServiceSpec {
Expand All @@ -238,7 +190,7 @@ async fn create_service_impl(
..Default::default()
};

let service_api: Api<Service> = Api::namespaced(client, params.namespace.as_ref());
let service_api: Api<Service> = Api::namespaced(client, &params.namespace);
service_api
.create(&PostParams::default(), &service)
.await
Expand All @@ -252,7 +204,7 @@ pub struct DeploymentSpecParams {
/// Labels to apply to the deployment
pub labels: BTreeMap<String, String>,
/// Namespace the deployment should reside in
pub namespace: NamespaceScope,
pub namespace: String,
/// Image to use for the deployment
pub image: String,
/// Environment variables to apply
Expand All @@ -271,7 +223,7 @@ async fn create_deployment_impl(
let deployment: Deployment = Deployment {
metadata: ObjectMeta {
name: Some(params.name.as_ref().to_string()),
namespace: Some(params.namespace.as_ref().to_owned()),
namespace: Some(params.namespace.clone()),
labels: Some(params.labels.clone()),
..ObjectMeta::default()
},
Expand Down Expand Up @@ -328,7 +280,7 @@ where

pub async fn delete_all_impl(
client: Client,
namespace: NamespaceScope,
namespace: String,
name: ServiceName,
) -> Result<Unit, IntegrationOSError> {
delete_resource_impl::<Service>(client.clone(), name.as_ref(), namespace.as_ref()).await?;
Expand Down
Loading
Loading