diff --git a/crates/client-sdk/docs/CatResponse.md b/crates/client-sdk/docs/CatResponse.md deleted file mode 100644 index df0aa75..0000000 --- a/crates/client-sdk/docs/CatResponse.md +++ /dev/null @@ -1,14 +0,0 @@ -# CatResponse - -## Properties - -Name | Type | Description | Notes ------------- | ------------- | ------------- | ------------- -**uuid** | **String** | | -**name** | **String** | | -**breed** | **String** | | -**age** | **i32** | | - -[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) - - diff --git a/crates/client-sdk/src/models/cat_response.rs b/crates/client-sdk/src/models/cat_response.rs deleted file mode 100644 index 3550495..0000000 --- a/crates/client-sdk/src/models/cat_response.rs +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Pets API - * - * An API for managing pets - * - * The version of the OpenAPI document: 1.0.0 - * - * Generated by: https://openapi-generator.tech - */ - -use crate::models; - -#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] -pub struct CatResponse { - #[serde(rename = "uuid")] - pub uuid: String, - #[serde(rename = "name")] - pub name: String, - #[serde(rename = "breed")] - pub breed: String, - #[serde(rename = "age")] - pub age: i32, -} - -impl CatResponse { - pub fn new(uuid: String, name: String, breed: String, age: i32) -> CatResponse { - CatResponse { - uuid, - name, - breed, - age, - } - } -} - diff --git a/crates/k8s-codegen/src/main.rs b/crates/k8s-codegen/src/main.rs index 16276dd..bfac7cc 100644 --- a/crates/k8s-codegen/src/main.rs +++ b/crates/k8s-codegen/src/main.rs @@ -453,6 +453,7 @@ struct ControllerTemplate<'a> { has_create_action: bool, has_update_action: bool, has_delete_action: bool, + api_url: String, } #[derive(Template)] @@ -465,7 +466,7 @@ struct ControllerActionDeleteTemplate<'a> { } #[derive(Template)] -#[template(path = "k8s_operator_controller_action_put.jinja")] +#[template(path = "k8s_operator_controller_action_update.jinja")] struct ControllerActionPutTemplate<'a> { arg_name: String, kind_struct: String, @@ -474,7 +475,7 @@ struct ControllerActionPutTemplate<'a> { } #[derive(Template)] -#[template(path = "k8s_operator_controller_action_post.jinja")] +#[template(path = "k8s_operator_controller_action_create.jinja")] struct ControllerActionPostTemplate<'a> { arg_name: String, kind_struct: String, @@ -529,6 +530,7 @@ fn generate_controller( has_create_action, has_update_action, has_delete_action, + api_url: "http://localhost:8080".to_string(), } .render() .unwrap(); diff --git a/crates/k8s-codegen/templates/k8s_operator_controller.jinja b/crates/k8s-codegen/templates/k8s_operator_controller.jinja index 54c71d7..69303f3 100644 --- a/crates/k8s-codegen/templates/k8s_operator_controller.jinja +++ b/crates/k8s-codegen/templates/k8s_operator_controller.jinja @@ -9,7 +9,12 @@ use std::{sync::Arc, time::Duration}; use openapi::{ apis::{ - {{ tag }}_api::{create_{{ arg_name }}, delete_{{ arg_name }}_by_id, get_{{ arg_name }}_by_id, update_{{ arg_name }}_by_id}, + {{ tag }}_api::{ + create_{{ arg_name }}, + delete_{{ arg_name }}_by_id, + get_{{ arg_name }}_by_id, + update_{{ arg_name }}_by_id + }, configuration::Configuration, }, models::{{ kind_struct }} as {{ kind_struct }}Dto, @@ -28,6 +33,8 @@ use crate::{ }; const REQUEUE_AFTER_IN_SEC: u64 = 30; +const API_URL: &str = "{{ api_url }}"; +const API_USER_AGENT: &str = "k8s-operator"; fn convert_kube_type_to_dto({{ arg_name }}: {{ kind_struct }}) -> {{ kind_struct }}Dto { let uuid = match {{ arg_name }}.status { @@ -52,7 +59,6 @@ fn convert_dto_to_kube_type({{ arg_name }}: {{ kind_struct }}Dto) -> {{ kind_str struct ExtraArgs { kube_client: Api<{{ kind_struct }}>, - config: Configuration, } #[derive(Debug, Error)] @@ -69,8 +75,8 @@ enum OperatorError { // FailedToCreate{{ kind_struct }}(#[source] anyhow::Error), // #[error("Failed to get a {{ arg_name }}: {0}")] // FailedToGet{{ kind_struct }}(#[source] anyhow::Error), - // #[error("Failed to update status: {0}")] - // FailedToUpdateStatus(#[source] anyhow::Error), + #[error("Failed to update status: {0}")] + FailedToUpdateStatus(#[source] anyhow::Error), // #[error("Failed to remove finalizer: {0}")] // FailedToRemoveFinalizer(#[source] anyhow::Error), // #[error("Failed to add finalizer: {0}")] @@ -79,13 +85,12 @@ enum OperatorError { // FailedToCheckForDrift(#[source] anyhow::Error), } -pub async fn handle(kube_client: Api<{{ kind_struct }}>, config: Configuration) -> Result<()> { +pub async fn handle(kube_client: Api<{{ kind_struct }}>) -> Result<()> { info!("Starting the controller"); let controller = Controller::new(kube_client.clone(), watcher::Config::default()); let extra_args = Arc::new(ExtraArgs { kube_client: kube_client.clone(), - config, }); info!("Running the controller"); @@ -104,37 +109,28 @@ pub async fn handle(kube_client: Api<{{ kind_struct }}>, config: Configuration) } async fn reconcile({{ arg_name }}: Arc<{{ kind_struct }}>, ctx: Arc) -> Result { - let config = &ctx.config; let kube_client = ctx.kube_client.clone(); - let {{ arg_name }} = {{ arg_name }}.as_ref(); + let mut {{ arg_name }} = {{ arg_name }}.as_ref().clone(); // Add default stauts if it's missing if {{ arg_name }}.status.is_none() { - let status = {{ kind_struct }}Status { - conditions: vec![], - uuid: None, - observed_generation: Some(0), - }; - let mut {{ arg_name }}_clone = {{ arg_name }}.clone(); - {{ arg_name }}_clone.status = Some(status); - update_status(kube_client.clone(), {{ arg_name }}_clone).await?; + add_default_status(&kube_client, &mut {{ arg_name }}).await?; } + {% if has_delete_action %} // If the resource was marked for deletion, we need to delete it if {{ arg_name }}.meta().deletion_timestamp.is_some() { - if let Err(e) = handle_delete_{{ arg_name }}_by_id(config, &mut {{ arg_name }}.clone(), kube_client.clone()).await - { - error!("Failed to delete {{ arg_name }}: {:?}", e); - return Err(OperatorError::FailedToDelete{{ kind_struct }}(e)); - } - return Ok(Action::requeue(Duration::from_secs(REQUEUE_AFTER_IN_SEC))); + handle_delete(&kube_client, &mut {{ arg_name }}).await?; } + {% else %} + warn!("OpenAPI Spec doesn't have a delete operation implemented for {{ tag }} tag."); + {% endif %} // If the resource has no remote reference, meaning it's a new resource, so we need to create it // Otherwise, we need to check for drift match {{ arg_name }}.clone().status.unwrap().uuid { Some(_) => { - check_for_drift(config, kube_client.clone(), &mut {{ arg_name }}.clone()).await?; + check_for_drift(kube_client.clone(), &mut {{ arg_name }}.clone()).await?; Ok(Action::requeue(Duration::from_secs(REQUEUE_AFTER_IN_SEC))) } None => { @@ -150,19 +146,47 @@ async fn reconcile({{ arg_name }}: Arc<{{ kind_struct }}>, ctx: Arc) status.observed_generation = {{ arg_name }}.meta().generation; } update_status(kube_client.clone(), {{ arg_name }}.clone()).await?; - handle_create_{{ arg_name }}(config, {{ arg_name }}.clone(), kube_client).await?; + {% if has_create_action %} + handle_create(kube_client, &mut {{ arg_name }}.clone()).await?; + {% else %} + warn!("OpenAPI Spec doesn't have a create operation implemented for {{ tag }} tag."); + {% endif %} Ok(Action::requeue(Duration::from_secs(REQUEUE_AFTER_IN_SEC))) } } } +async fn get_client_config() -> Result { + let config = Configuration { + base_path: API_URL.to_string(), + client: reqwest::Client::new(), + user_agent: Some(API_USER_AGENT.to_string()), + bearer_access_token: Some(std::env::var("ACCESS_TOKEN").unwrap_or_default()), + ..Default::default() + }; + Ok(config) +} + +async fn add_default_status( + kube_client: &Api<{{ kind_struct }}>, + {{ arg_name }}: &mut {{ kind_struct }}, +) -> Result<(), OperatorError> { + let status = {{ kind_struct }}Status { + conditions: vec![], + uuid: None, + observed_generation: Some(0), + }; + {{ arg_name }}.status = Some(status); + update_status(kube_client.clone(), {{ arg_name }}.clone()).await.map_err(OperatorError::FailedToUpdateStatus) +} + pub async fn check_for_drift( - config: &Configuration, - kubernetes_api: Api<{{ kind_struct }}>, + kube_client: Api<{{ kind_struct }}>, {{ arg_name }}: &mut {{ kind_struct }}, ) -> Result<()> { let dto = convert_kube_type_to_dto({{ arg_name }}.clone()); let {{ resource_remote_ref }} = dto.{{ resource_remote_ref }}.clone().unwrap_or_default(); + let config = get_client_config().await?; if dto.{{ resource_remote_ref }}.is_none() { warn!("{{ kind_struct }} has no status, cannot get by id or check for drift. Skipping..."); @@ -190,7 +214,7 @@ pub async fn check_for_drift( status.conditions.push(condition); status.observed_generation = {{ arg_name }}.meta().generation; } - return update_status(kubernetes_api.clone(), {{ arg_name }}_clone).await; + return update_status(kube_client.clone(), {{ arg_name }}_clone).await; } Err(e) => { error!("Failed to update {{ kind_struct }}: {:?}", e); diff --git a/crates/k8s-codegen/templates/k8s_operator_controller_action_post.jinja b/crates/k8s-codegen/templates/k8s_operator_controller_action_create.jinja similarity index 83% rename from crates/k8s-codegen/templates/k8s_operator_controller_action_post.jinja rename to crates/k8s-codegen/templates/k8s_operator_controller_action_create.jinja index b889e88..22111b1 100644 --- a/crates/k8s-codegen/templates/k8s_operator_controller_action_post.jinja +++ b/crates/k8s-codegen/templates/k8s_operator_controller_action_create.jinja @@ -1,16 +1,16 @@ {% for controller in controllers %} {% if controller.http_method == "post" %} -pub async fn handle_{{ controller.operation_id }}( - config: &Configuration, +pub async fn handle_create( + kube_client: Api<{{ kind_struct }}>, {{ arg_name }}: &mut {{ kind_struct }}, - kubernetes_api: Api<{{ kind_struct }}>, ) -> Result<(), anyhow::Error> { let dto = convert_kube_type_to_dto({{ arg_name }}.clone()); + let config = get_client_config().await?; match {{ controller.operation_id }}(&config, dto.clone()).await { Ok(remote_{{ arg_name }}) => match remote_{{ arg_name }}.{{ resource_remote_ref }} { Some({{ resource_remote_ref }}) => { - add_finalizer({{ arg_name }}, kubernetes_api.clone()).await?; + add_finalizer({{ arg_name }}, kube_client.clone()).await?; let condition = create_condition( "Created", "AvailableCreated", @@ -24,7 +24,7 @@ pub async fn handle_{{ controller.operation_id }}( status.uuid = Some(uuid); status.observed_generation = {{ arg_name }}.meta().generation; } - return update_status(kubernetes_api.clone(), {{ arg_name }}_clone).await; + return update_status(kube_client.clone(), {{ arg_name }}_clone).await; } None => { warn!("Remote {{ arg_name }} has no {{ resource_remote_ref }}, cannot update status"); diff --git a/crates/k8s-codegen/templates/k8s_operator_controller_action_delete.jinja b/crates/k8s-codegen/templates/k8s_operator_controller_action_delete.jinja index 5486412..7747430 100644 --- a/crates/k8s-codegen/templates/k8s_operator_controller_action_delete.jinja +++ b/crates/k8s-codegen/templates/k8s_operator_controller_action_delete.jinja @@ -1,36 +1,32 @@ {% for controller in controllers %} {% if controller.http_method == "delete" %} -pub async fn handle_{{ controller.operation_id }}( - config: &Configuration, +async fn handle_{{ controller.http_method }}( + kube_client: &Api<{{ kind_struct }}>, {{ arg_name }}: &mut {{ kind_struct }}, - kubernetes_api: Api<{{ kind_struct }}>, -) -> Result<()> { - let dto = convert_kube_type_to_dto({{ arg_name }}.clone()); - let {{ resource_remote_ref }} = dto.{{ resource_remote_ref }}.clone().unwrap_or_default(); +)-> Result<(), OperatorError> { + let config = get_client_config().await?; + let {{ resource_remote_ref }} = match {{ arg_name }}.clone().status { + Some(status) => match status.clone().{{ resource_remote_ref}} { + Some({{ resource_remote_ref }}) => {{ resource_remote_ref }}, + None => { + warn!("{{ kind_struct }} has no resource reference in status, cannot delete by id. Skipping..."); + return Ok(()); + } + }, + None => { + warn!("{{ kind_struct }} has no status, cannot delete by id. Skipping..."); + return Ok(()); + } + }; - if {{ resource_remote_ref }}.is_empty() { - warn!("{{ kind_struct }} has no {{ resource_remote_ref }}, cannot {{ controller.action_summary }}. Skipping..."); - return Ok(()); + if let Err(e) = {{ controller.operation_id }}(&config, &{{ resource_remote_ref }}).await { + error!("Failed to delete {{ arg_name }}: {:?}", e); + return Err(OperatorError::FailedToDelete{{ kind_struct }}(e.into())); } - {{ controller.operation_id }}(&config, &{{ resource_remote_ref }}) - .await - .context("Failed to {{ controller.action_summary }}")?; - - remove_finalizer({{ arg_name }}, kubernetes_api.clone()).await?; - let condition = create_condition( - "Deleted", - "UnavailableDeleted", - "Deleted the resource", - "Resource has has deleted", - {{ arg_name }}.meta().generation, - ); - let mut {{ arg_name }}_clone = {{ arg_name }}.clone(); - if let Some(status) = {{ arg_name }}_clone.status.as_mut() { - status.conditions.push(condition); - status.observed_generation = {{ arg_name }}.meta().generation; - } - return update_status(kubernetes_api.clone(), {{ arg_name }}_clone).await; + remove_finalizer({{ arg_name }}, kube_client.clone()).await?; + info!("Successfully deleted {{ arg_name }}"); + Ok(()) } {% endif %} {% endfor %} diff --git a/crates/k8s-codegen/templates/k8s_operator_controller_action_put.jinja b/crates/k8s-codegen/templates/k8s_operator_controller_action_update.jinja similarity index 100% rename from crates/k8s-codegen/templates/k8s_operator_controller_action_put.jinja rename to crates/k8s-codegen/templates/k8s_operator_controller_action_update.jinja diff --git a/crates/k8s-codegen/templates/k8s_operator_main.jinja b/crates/k8s-codegen/templates/k8s_operator_main.jinja index 0e6f094..463d356 100644 --- a/crates/k8s-codegen/templates/k8s_operator_main.jinja +++ b/crates/k8s-codegen/templates/k8s_operator_main.jinja @@ -6,57 +6,9 @@ use kube::{ Client as KubeClient, CustomResourceExt, }; use log::{error, info, warn}; -use openapi::apis::configuration::Configuration; use tokio::time::timeout; use warp::Filter; -async fn watch_resource( - config: Arc, - api: Api, - handle: F, -) -> anyhow::Result<()> -where - T: ResourceExt + Clone + Debug + Send + DeserializeOwned + 'static, - F: Fn(Arc, WatchEvent, Api) -> anyhow::Result<()> + Send + Sync + 'static, -{ - loop { - let watcher_stream = api.watch(&WatchParams::default(), "0").await?; - let events = watcher_stream.boxed(); - process_events(events, config.clone(), api.clone(), &handle).await; - sleep(Duration::from_secs(5)).await; - } -} - -async fn process_events<'a, T, F>( - events: BoxStream<'a, Result, kube::Error>>, - config: Arc, - api: Api, - handle: &'a F, -) where - T: ResourceExt + Clone + Debug + Send + DeserializeOwned + 'static, - F: Fn(Arc, WatchEvent, Api) -> anyhow::Result<()> + Send + Sync + 'static, -{ - events - .for_each(|event| { - let config = config.clone(); - let api = api.clone(); - let handle = handle.clone(); - async move { - match event { - Ok(event) => { - if let Err(e) = handle(config, event, api) { - error!("Error handling event: {:?}", e); - } - } - Err(e) => { - error!("Error watching events: {:?}", e); - } - } - } - }) - .await; -} - async fn deploy_crd( kube_client: Api, crd: CustomResourceDefinition, @@ -111,17 +63,12 @@ async fn main() -> anyhow::Result<()> { info!("Starting operator..."); + let client = reqwest::Client::new(); + let access_token = std::env::var("ACCESS_TOKEN").context("ACCESS_TOKEN is not set")?; - let client = Client::try_default().await?; - let config = Arc::new(Configuration { - base_path: "http://localhost:8080".to_string(), - client: reqwest::Client::new(), - user_agent: Some("k8s-operator".to_string()), - bearer_access_token: Some(access_token), - ..Default::default() - }); - let kube_client: Api = Api::all(client); + let kube_client = KubeClient::try_default().await?; + let kube_client_api: Api = Api::all(kube_client.clone()); if std::env::var("INSTALL_CRDS") .unwrap_or_default() @@ -131,37 +78,36 @@ async fn main() -> anyhow::Result<()> { info!("INSTALL_CRDS is set to true. Deploying CRDs..."); let crds = vec![ - {% for (module, schema_name) in schemas %} - k8s_operator::types::{{ module }}::{{ schema_name }}::crd(), - {% endfor %} + k8s_operator::types::cat::Cat::crd(), ]; for crd in crds { - deploy_crd(kube_client.clone(), crd).await?; + deploy_crd(kube_client_api.clone(), crd).await?; } } let controllers = vec![ - {%- for controller in controllers.iter() %} - format!("{{ controller }}.{{ api_group }}"), - {%- endfor %} + format!("cats.example.com"), ]; for controller in controllers { - if let Err(e) = wait_for_crd(kube_client.clone(), &controller).await { + if let Err(e) = wait_for_crd(kube_client_api.clone(), &controller).await { error!("Error waiting for CRD {}: {}", &controller, e); } } - {% for controller in controllers.iter() %} - // add controllers for {{ controller}}.{{ api_group }}/{{ api_version }} here - {% endfor %} + // add controllers for cats.example.com/v1 here + let _ = k8s_operator::controllers::cats::handle(Api::namespaced(kube_client, "default")).await; + + // add controllers for dogs.example.com/v1 here + + // add controllers for horses.example.com/v1 here tokio::spawn(async { - let liveness_route = - warp::path!("healthz").map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK)); + let liveness_route = warp::path!("healthz") + .map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK)); - let readiness_route = - warp::path!("readyz").map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK)); + let readiness_route = warp::path!("readyz") + .map(|| warp::reply::with_status("OK", warp::http::StatusCode::OK)); let health_routes = liveness_route.or(readiness_route); @@ -171,7 +117,7 @@ async fn main() -> anyhow::Result<()> { tokio::signal::ctrl_c() .await .context("Failed to listen for Ctrl+C")?; - println!("Termination signal received. Shutting down."); + info!("Termination signal received. Shutting down."); Ok(()) } diff --git a/crates/k8s-operator/src/controllers/cats.rs b/crates/k8s-operator/src/controllers/cats.rs index d6f9bee..8e04ab2 100644 --- a/crates/k8s-operator/src/controllers/cats.rs +++ b/crates/k8s-operator/src/controllers/cats.rs @@ -19,6 +19,8 @@ use crate::types::cat::{Cat, CatSpec, CatStatus}; use crate::{add_finalizer, create_condition, remove_finalizer, update_status}; const REQUEUE_AFTER_IN_SEC: u64 = 30; +const API_URL: &str = "http://localhost:8080"; +const API_USER_AGENT: &str = "k8s-operator"; fn convert_kube_type_to_dto(cat: Cat) -> CatDto { let uuid = match cat.status { @@ -43,7 +45,6 @@ fn convert_dto_to_kube_type(cat: CatDto) -> CatSpec { struct ExtraArgs { kube_client: Api, - config: Configuration, } #[derive(Debug, Error)] @@ -60,8 +61,8 @@ enum OperatorError { // FailedToCreateCat(#[source] anyhow::Error), // #[error("Failed to get a cat: {0}")] // FailedToGetCat(#[source] anyhow::Error), - // #[error("Failed to update status: {0}")] - // FailedToUpdateStatus(#[source] anyhow::Error), + #[error("Failed to update status: {0}")] + FailedToUpdateStatus(#[source] anyhow::Error), // #[error("Failed to remove finalizer: {0}")] // FailedToRemoveFinalizer(#[source] anyhow::Error), // #[error("Failed to add finalizer: {0}")] @@ -70,13 +71,12 @@ enum OperatorError { // FailedToCheckForDrift(#[source] anyhow::Error), } -pub async fn handle(kube_client: Api, config: Configuration) -> Result<()> { +pub async fn handle(kube_client: Api) -> Result<()> { info!("Starting the controller"); let controller = Controller::new(kube_client.clone(), watcher::Config::default()); let extra_args = Arc::new(ExtraArgs { kube_client: kube_client.clone(), - config, }); info!("Running the controller"); @@ -95,39 +95,24 @@ pub async fn handle(kube_client: Api, config: Configuration) -> Result<()> } async fn reconcile(cat: Arc, ctx: Arc) -> Result { - let config = &ctx.config; let kube_client = ctx.kube_client.clone(); - let cat = cat.as_ref(); + let mut cat = cat.as_ref().clone(); // Add default stauts if it's missing if cat.status.is_none() { - let status = CatStatus { - conditions: vec![], - uuid: None, - observed_generation: Some(0), - }; - let mut cat_clone = cat.clone(); - cat_clone.status = Some(status); - update_status(kube_client.clone(), cat_clone).await?; + add_default_status(&kube_client, &mut cat).await?; } - let cat_status = cat.status.as_ref().unwrap(); - // If the resource was marked for deletion, we need to delete it if cat.meta().deletion_timestamp.is_some() { - if let Err(e) = handle_delete_cat_by_id(config, &mut cat.clone(), kube_client.clone()).await - { - error!("Failed to delete cat: {:?}", e); - return Err(OperatorError::FailedToDeleteCat(e)); - } - return Ok(Action::requeue(Duration::from_secs(REQUEUE_AFTER_IN_SEC))); + handle_delete(&kube_client, &mut cat).await?; } // If the resource has no remote reference, meaning it's a new resource, so we need to create it // Otherwise, we need to check for drift - match cat_status.uuid { + match cat.clone().status.unwrap().uuid { Some(_) => { - check_for_drift(config, kube_client.clone(), &mut cat.clone()).await?; + check_for_drift(kube_client.clone(), &mut cat.clone()).await?; Ok(Action::requeue(Duration::from_secs(REQUEUE_AFTER_IN_SEC))) } None => { @@ -143,32 +128,54 @@ async fn reconcile(cat: Arc, ctx: Arc) -> Result, - cat: &mut Cat, -) -> Result<()> { +async fn get_client_config() -> Result { + let config = Configuration { + base_path: API_URL.to_string(), + client: reqwest::Client::new(), + user_agent: Some(API_USER_AGENT.to_string()), + bearer_access_token: Some(std::env::var("ACCESS_TOKEN").unwrap_or_default()), + ..Default::default() + }; + Ok(config) +} + +async fn add_default_status(kube_client: &Api, cat: &mut Cat) -> Result<(), OperatorError> { + let status = CatStatus { + conditions: vec![], + uuid: None, + observed_generation: Some(0), + }; + cat.status = Some(status); + update_status(kube_client.clone(), cat.clone()) + .await + .map_err(OperatorError::FailedToUpdateStatus) +} + +pub async fn check_for_drift(kube_client: Api, cat: &mut Cat) -> Result<()> { let dto = convert_kube_type_to_dto(cat.clone()); let uuid = dto.uuid.clone().unwrap_or_default(); + let config = get_client_config().await?; if dto.uuid.is_none() { warn!("Cat has no status, cannot get by id or check for drift. Skipping..."); return Ok(()); } - match get_cat_by_id(config, &uuid).await { + match get_cat_by_id(&config, &uuid).await { Ok(dto) => { let remote_cat = convert_dto_to_kube_type(dto); if remote_cat != cat.spec { let current_cat_dto = convert_kube_type_to_dto(cat.clone()); warn!("Cat has drifted remotely, sending an update to remote..."); - match update_cat_by_id(config, &uuid, current_cat_dto).await { + match update_cat_by_id(&config, &uuid, current_cat_dto).await { Ok(_) => { info!("Cat updated successfully"); let condition = create_condition( @@ -183,7 +190,7 @@ pub async fn check_for_drift( status.conditions.push(condition); status.observed_generation = cat.meta().generation; } - return update_status(kubernetes_api.clone(), cat_clone).await; + return update_status(kube_client.clone(), cat_clone).await; } Err(e) => { error!("Failed to update Cat: {:?}", e); @@ -201,37 +208,35 @@ pub async fn check_for_drift( Ok(()) } -pub async fn handle_delete_cat_by_id( - config: &Configuration, - cat: &mut Cat, - kubernetes_api: Api, -) -> Result<()> { - let dto = convert_kube_type_to_dto(cat.clone()); - let uuid = dto.uuid.clone().unwrap_or_default(); +fn error_policy(_resource: Arc, error: &OperatorError, _ctx: Arc) -> Action { + error!("Error processing event: {:?}", error); + Action::requeue(Duration::from_secs(REQUEUE_AFTER_IN_SEC)) +} - if uuid.is_empty() { - warn!("Cat has no uuid, cannot delete a cat by id. Skipping..."); - return Ok(()); - } +async fn handle_delete(kube_client: &Api, cat: &mut Cat) -> Result<(), OperatorError> { + let config = get_client_config().await?; + let uuid = match cat.clone().status { + Some(status) => match status.clone().uuid { + Some(uuid) => uuid, + None => { + warn!("Cat has no resource reference in status, cannot delete by id. Skipping..."); + return Ok(()); + } + }, + None => { + warn!("Cat has no status, cannot delete by id. Skipping..."); + return Ok(()); + } + }; - delete_cat_by_id(config, &uuid) - .await - .context("Failed to delete a cat by id")?; - - remove_finalizer(cat, kubernetes_api.clone()).await?; - let condition = create_condition( - "Deleted", - "UnavailableDeleted", - "Deleted the resource", - "Resource has has deleted", - cat.meta().generation, - ); - let mut cat_clone = cat.clone(); - if let Some(status) = cat_clone.status.as_mut() { - status.conditions.push(condition); - status.observed_generation = cat.meta().generation; + if let Err(e) = delete_cat_by_id(&config, &uuid).await { + error!("Failed to delete cat: {:?}", e); + return Err(OperatorError::FailedToDeleteCat(e.into())); } - update_status(kubernetes_api.clone(), cat_clone).await + + remove_finalizer(cat, kube_client.clone()).await?; + info!("Successfully deleted cat"); + Ok(()) } pub async fn handle_update_cat_by_id( @@ -248,13 +253,13 @@ pub async fn handle_update_cat_by_id( } }; - update_cat_by_id(config, &uuid, dto) + update_cat_by_id(&config, &uuid, dto) .await .context("Failed to update a cat by id")?; let cat_name = cat.metadata.name.as_deref().unwrap_or_default(); kubernetes_api - .replace(cat_name, &PostParams::default(), cat) + .replace(cat_name, &PostParams::default(), &cat) .await .context("Failed to update a cat by id")?; @@ -262,48 +267,37 @@ pub async fn handle_update_cat_by_id( Ok(()) } -pub async fn handle_create_cat( - config: &Configuration, - mut cat: Cat, - kubernetes_api: Api, -) -> Result<(), anyhow::Error> { - let dto: CatDto = convert_kube_type_to_dto(cat.clone()); - - let remote_cat = create_cat(config, dto.clone()).await.map_err(|e| { - error!("Failed to create a new cat: {:?}", e); - anyhow::anyhow!("Failed to create a new cat: {:?}", e) - })?; - - let uuid = remote_cat.uuid; - if uuid.is_none() { - error!("Remote cat has no uuid, cannot update status"); - return Err(anyhow::anyhow!( - "Remote cat has no uuid, cannot update status" - )); - } - - add_finalizer(&mut cat, kubernetes_api.clone()).await?; - - let condition = create_condition( - "Created", - "AvailableCreated", - "Created the resource", - "Resource has been created", - cat.meta().generation, - ); - - let generation = cat.meta().generation; - - if let Some(status) = cat.status.as_mut() { - status.conditions.push(condition); - status.uuid = uuid; - status.observed_generation = generation; +pub async fn handle_create(kube_client: Api, cat: &mut Cat) -> Result<(), anyhow::Error> { + let dto = convert_kube_type_to_dto(cat.clone()); + let config = get_client_config().await?; + + match create_cat(&config, dto.clone()).await { + Ok(remote_cat) => match remote_cat.uuid { + Some(uuid) => { + add_finalizer(cat, kube_client.clone()).await?; + let condition = create_condition( + "Created", + "AvailableCreated", + "Created the resource", + "Resource has been created", + cat.meta().generation, + ); + let mut cat_clone = cat.clone(); + if let Some(status) = cat_clone.status.as_mut() { + status.conditions.push(condition); + status.uuid = Some(uuid); + status.observed_generation = cat.meta().generation; + } + return update_status(kube_client.clone(), cat_clone).await; + } + None => { + warn!("Remote cat has no uuid, cannot update status"); + Ok(()) + } + }, + Err(e) => { + error!("Failed to create a new cat: {:?}", e); + Err(anyhow::anyhow!("Failed to create a new cat: {:?}", e)) + } } - - update_status(kubernetes_api.clone(), cat).await -} - -fn error_policy(_resource: Arc, error: &OperatorError, _ctx: Arc) -> Action { - error!("Error processing event: {:?}", error); - Action::requeue(Duration::from_secs(REQUEUE_AFTER_IN_SEC)) } diff --git a/crates/k8s-operator/src/main.rs b/crates/k8s-operator/src/main.rs index 2c5c8fc..7a6d5f7 100644 --- a/crates/k8s-operator/src/main.rs +++ b/crates/k8s-operator/src/main.rs @@ -6,7 +6,6 @@ use kube::{ Client as KubeClient, CustomResourceExt, }; use log::{error, info, warn}; -use openapi::apis::configuration::Configuration; use tokio::time::timeout; use warp::Filter; @@ -68,14 +67,6 @@ async fn main() -> anyhow::Result<()> { let access_token = std::env::var("ACCESS_TOKEN").context("ACCESS_TOKEN is not set")?; - let config = Configuration { - base_path: "http://localhost:8080".to_string(), - client, - user_agent: Some("k8s-operator".to_string()), - bearer_access_token: Some(access_token), - ..Default::default() - }; - let kube_client = KubeClient::try_default().await?; let kube_client_api: Api = Api::all(kube_client.clone()); @@ -109,11 +100,7 @@ async fn main() -> anyhow::Result<()> { } // add controllers for cats.example.com/v1 here - let _ = k8s_operator::controllers::cats::handle( - Api::namespaced(kube_client, "default"), - config.clone(), - ) - .await; + let _ = k8s_operator::controllers::cats::handle(Api::namespaced(kube_client, "default")).await; // add controllers for dogs.example.com/v1 here diff --git a/crates/k8s-operator/src/types/catresponse.rs b/crates/k8s-operator/src/types/catresponse.rs deleted file mode 100644 index e922d02..0000000 --- a/crates/k8s-operator/src/types/catresponse.rs +++ /dev/null @@ -1,60 +0,0 @@ -use k8s_openapi::apimachinery::pkg::apis::meta::v1::Condition; -use kube::CustomResource; -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema, PartialEq, CustomResource)] -#[kube( - group = "example.com", - version = "v1", - kind = "CatResponse", - plural = "catresponses", - derive = "PartialEq", - status = "CatResponseStatus", - namespaced, - printcolumn = r#"{"name": "Status", "type": "string", "jsonPath": ".status.conditions[0].status", "description": "The current status of the resource"}"#, - printcolumn = r#"{"name": "Reference ID", "type": "string", "jsonPath": ".status.uuid", "description": "The reference ID of the resource"}"#, - printcolumn = r#"{"name": "Age", "type": "date", "jsonPath": ".metadata.creationTimestamp", "description": "The creation time of the resource"}"# -)] -pub struct CatResponseSpec { - pub name: String, - pub breed: String, - pub age: i32, -} - -#[derive(Debug, Default, Clone, Deserialize, Serialize, JsonSchema, PartialEq)] -pub struct CatResponseStatus { - pub uuid: Option, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - #[schemars(schema_with = "conditions")] - pub conditions: Vec, - #[serde(rename = "observedGeneration")] - pub observed_generation: Option, -} - -fn conditions(_: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { - serde_json::from_value(serde_json::json!({ - "type": "array", - "x-kubernetes-list-type": "map", - "x-kubernetes-list-map-keys": ["type"], - "items": { - "type": "object", - "properties": { - "lastTransitionTime": { "format": "date-time", "type": "string" }, - "message": { "type": "string" }, - "observedGeneration": { "type": "integer", "format": "int64", "default": 0 }, - "reason": { "type": "string" }, - "status": { "type": "string" }, - "type": { "type": "string" } - }, - "required": [ - "lastTransitionTime", - "message", - "reason", - "status", - "type" - ], - }, - })) - .unwrap() -}