Skip to content

Commit

Permalink
Delete and Create works nicely now
Browse files Browse the repository at this point in the history
Still need to test if drift works as expected with the API
  • Loading branch information
edenreich committed Jun 1, 2024
1 parent 8d1f450 commit 425a0de
Show file tree
Hide file tree
Showing 11 changed files with 204 additions and 364 deletions.
14 changes: 0 additions & 14 deletions crates/client-sdk/docs/CatResponse.md

This file was deleted.

35 changes: 0 additions & 35 deletions crates/client-sdk/src/models/cat_response.rs

This file was deleted.

6 changes: 4 additions & 2 deletions crates/k8s-codegen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ struct ControllerTemplate<'a> {
has_create_action: bool,
has_update_action: bool,
has_delete_action: bool,
api_url: String,
}

#[derive(Template)]
Expand All @@ -465,7 +466,7 @@ struct ControllerActionDeleteTemplate<'a> {
}

#[derive(Template)]
#[template(path = "k8s_operator_controller_action_put.jinja")]
#[template(path = "k8s_operator_controller_action_update.jinja")]
struct ControllerActionPutTemplate<'a> {
arg_name: String,
kind_struct: String,
Expand All @@ -474,7 +475,7 @@ struct ControllerActionPutTemplate<'a> {
}

#[derive(Template)]
#[template(path = "k8s_operator_controller_action_post.jinja")]
#[template(path = "k8s_operator_controller_action_create.jinja")]
struct ControllerActionPostTemplate<'a> {
arg_name: String,
kind_struct: String,
Expand Down Expand Up @@ -529,6 +530,7 @@ fn generate_controller(
has_create_action,
has_update_action,
has_delete_action,
api_url: "http://localhost:8080".to_string(),
}
.render()
.unwrap();
Expand Down
78 changes: 51 additions & 27 deletions crates/k8s-codegen/templates/k8s_operator_controller.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ use std::{sync::Arc, time::Duration};

use openapi::{
apis::{
{{ tag }}_api::{create_{{ arg_name }}, delete_{{ arg_name }}_by_id, get_{{ arg_name }}_by_id, update_{{ arg_name }}_by_id},
{{ tag }}_api::{
create_{{ arg_name }},
delete_{{ arg_name }}_by_id,
get_{{ arg_name }}_by_id,
update_{{ arg_name }}_by_id
},
configuration::Configuration,
},
models::{{ kind_struct }} as {{ kind_struct }}Dto,
Expand All @@ -28,6 +33,8 @@ use crate::{
};

const REQUEUE_AFTER_IN_SEC: u64 = 30;
const API_URL: &str = "{{ api_url }}";
const API_USER_AGENT: &str = "k8s-operator";

fn convert_kube_type_to_dto({{ arg_name }}: {{ kind_struct }}) -> {{ kind_struct }}Dto {
let uuid = match {{ arg_name }}.status {
Expand All @@ -52,7 +59,6 @@ fn convert_dto_to_kube_type({{ arg_name }}: {{ kind_struct }}Dto) -> {{ kind_str

struct ExtraArgs {
kube_client: Api<{{ kind_struct }}>,
config: Configuration,
}

#[derive(Debug, Error)]
Expand All @@ -69,8 +75,8 @@ enum OperatorError {
// FailedToCreate{{ kind_struct }}(#[source] anyhow::Error),
// #[error("Failed to get a {{ arg_name }}: {0}")]
// FailedToGet{{ kind_struct }}(#[source] anyhow::Error),
// #[error("Failed to update status: {0}")]
// FailedToUpdateStatus(#[source] anyhow::Error),
#[error("Failed to update status: {0}")]
FailedToUpdateStatus(#[source] anyhow::Error),
// #[error("Failed to remove finalizer: {0}")]
// FailedToRemoveFinalizer(#[source] anyhow::Error),
// #[error("Failed to add finalizer: {0}")]
Expand All @@ -79,13 +85,12 @@ enum OperatorError {
// FailedToCheckForDrift(#[source] anyhow::Error),
}

pub async fn handle(kube_client: Api<{{ kind_struct }}>, config: Configuration) -> Result<()> {
pub async fn handle(kube_client: Api<{{ kind_struct }}>) -> Result<()> {
info!("Starting the controller");
let controller = Controller::new(kube_client.clone(), watcher::Config::default());

let extra_args = Arc::new(ExtraArgs {
kube_client: kube_client.clone(),
config,
});

info!("Running the controller");
Expand All @@ -104,37 +109,28 @@ pub async fn handle(kube_client: Api<{{ kind_struct }}>, config: Configuration)
}

async fn reconcile({{ arg_name }}: Arc<{{ kind_struct }}>, ctx: Arc<ExtraArgs>) -> Result<Action, OperatorError> {
let config = &ctx.config;
let kube_client = ctx.kube_client.clone();
let {{ arg_name }} = {{ arg_name }}.as_ref();
let mut {{ arg_name }} = {{ arg_name }}.as_ref().clone();

// Add default stauts if it's missing
if {{ arg_name }}.status.is_none() {
let status = {{ kind_struct }}Status {
conditions: vec![],
uuid: None,
observed_generation: Some(0),
};
let mut {{ arg_name }}_clone = {{ arg_name }}.clone();
{{ arg_name }}_clone.status = Some(status);
update_status(kube_client.clone(), {{ arg_name }}_clone).await?;
add_default_status(&kube_client, &mut {{ arg_name }}).await?;
}

{% if has_delete_action %}
// If the resource was marked for deletion, we need to delete it
if {{ arg_name }}.meta().deletion_timestamp.is_some() {
if let Err(e) = handle_delete_{{ arg_name }}_by_id(config, &mut {{ arg_name }}.clone(), kube_client.clone()).await
{
error!("Failed to delete {{ arg_name }}: {:?}", e);
return Err(OperatorError::FailedToDelete{{ kind_struct }}(e));
}
return Ok(Action::requeue(Duration::from_secs(REQUEUE_AFTER_IN_SEC)));
handle_delete(&kube_client, &mut {{ arg_name }}).await?;
}
{% else %}
warn!("OpenAPI Spec doesn't have a delete operation implemented for {{ tag }} tag.");
{% endif %}

// If the resource has no remote reference, meaning it's a new resource, so we need to create it
// Otherwise, we need to check for drift
match {{ arg_name }}.clone().status.unwrap().uuid {
Some(_) => {
check_for_drift(config, kube_client.clone(), &mut {{ arg_name }}.clone()).await?;
check_for_drift(kube_client.clone(), &mut {{ arg_name }}.clone()).await?;
Ok(Action::requeue(Duration::from_secs(REQUEUE_AFTER_IN_SEC)))
}
None => {
Expand All @@ -150,19 +146,47 @@ async fn reconcile({{ arg_name }}: Arc<{{ kind_struct }}>, ctx: Arc<ExtraArgs>)
status.observed_generation = {{ arg_name }}.meta().generation;
}
update_status(kube_client.clone(), {{ arg_name }}.clone()).await?;
handle_create_{{ arg_name }}(config, {{ arg_name }}.clone(), kube_client).await?;
{% if has_create_action %}
handle_create(kube_client, &mut {{ arg_name }}.clone()).await?;
{% else %}
warn!("OpenAPI Spec doesn't have a create operation implemented for {{ tag }} tag.");
{% endif %}
Ok(Action::requeue(Duration::from_secs(REQUEUE_AFTER_IN_SEC)))
}
}
}

async fn get_client_config() -> Result<Configuration> {
let config = Configuration {
base_path: API_URL.to_string(),
client: reqwest::Client::new(),
user_agent: Some(API_USER_AGENT.to_string()),
bearer_access_token: Some(std::env::var("ACCESS_TOKEN").unwrap_or_default()),
..Default::default()
};
Ok(config)
}

async fn add_default_status(
kube_client: &Api<{{ kind_struct }}>,
{{ arg_name }}: &mut {{ kind_struct }},
) -> Result<(), OperatorError> {
let status = {{ kind_struct }}Status {
conditions: vec![],
uuid: None,
observed_generation: Some(0),
};
{{ arg_name }}.status = Some(status);
update_status(kube_client.clone(), {{ arg_name }}.clone()).await.map_err(OperatorError::FailedToUpdateStatus)
}

pub async fn check_for_drift(
config: &Configuration,
kubernetes_api: Api<{{ kind_struct }}>,
kube_client: Api<{{ kind_struct }}>,
{{ arg_name }}: &mut {{ kind_struct }},
) -> Result<()> {
let dto = convert_kube_type_to_dto({{ arg_name }}.clone());
let {{ resource_remote_ref }} = dto.{{ resource_remote_ref }}.clone().unwrap_or_default();
let config = get_client_config().await?;

if dto.{{ resource_remote_ref }}.is_none() {
warn!("{{ kind_struct }} has no status, cannot get by id or check for drift. Skipping...");
Expand Down Expand Up @@ -190,7 +214,7 @@ pub async fn check_for_drift(
status.conditions.push(condition);
status.observed_generation = {{ arg_name }}.meta().generation;
}
return update_status(kubernetes_api.clone(), {{ arg_name }}_clone).await;
return update_status(kube_client.clone(), {{ arg_name }}_clone).await;
}
Err(e) => {
error!("Failed to update {{ kind_struct }}: {:?}", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{% for controller in controllers %}
{% if controller.http_method == "post" %}
pub async fn handle_{{ controller.operation_id }}(
config: &Configuration,
pub async fn handle_create(
kube_client: Api<{{ kind_struct }}>,
{{ arg_name }}: &mut {{ kind_struct }},
kubernetes_api: Api<{{ kind_struct }}>,
) -> Result<(), anyhow::Error> {
let dto = convert_kube_type_to_dto({{ arg_name }}.clone());
let config = get_client_config().await?;

match {{ controller.operation_id }}(&config, dto.clone()).await {
Ok(remote_{{ arg_name }}) => match remote_{{ arg_name }}.{{ resource_remote_ref }} {
Some({{ resource_remote_ref }}) => {
add_finalizer({{ arg_name }}, kubernetes_api.clone()).await?;
add_finalizer({{ arg_name }}, kube_client.clone()).await?;
let condition = create_condition(
"Created",
"AvailableCreated",
Expand All @@ -24,7 +24,7 @@ pub async fn handle_{{ controller.operation_id }}(
status.uuid = Some(uuid);
status.observed_generation = {{ arg_name }}.meta().generation;
}
return update_status(kubernetes_api.clone(), {{ arg_name }}_clone).await;
return update_status(kube_client.clone(), {{ arg_name }}_clone).await;
}
None => {
warn!("Remote {{ arg_name }} has no {{ resource_remote_ref }}, cannot update status");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,36 +1,32 @@
{% for controller in controllers %}
{% if controller.http_method == "delete" %}
pub async fn handle_{{ controller.operation_id }}(
config: &Configuration,
async fn handle_{{ controller.http_method }}(
kube_client: &Api<{{ kind_struct }}>,
{{ arg_name }}: &mut {{ kind_struct }},
kubernetes_api: Api<{{ kind_struct }}>,
) -> Result<()> {
let dto = convert_kube_type_to_dto({{ arg_name }}.clone());
let {{ resource_remote_ref }} = dto.{{ resource_remote_ref }}.clone().unwrap_or_default();
)-> Result<(), OperatorError> {
let config = get_client_config().await?;
let {{ resource_remote_ref }} = match {{ arg_name }}.clone().status {
Some(status) => match status.clone().{{ resource_remote_ref}} {
Some({{ resource_remote_ref }}) => {{ resource_remote_ref }},
None => {
warn!("{{ kind_struct }} has no resource reference in status, cannot delete by id. Skipping...");
return Ok(());
}
},
None => {
warn!("{{ kind_struct }} has no status, cannot delete by id. Skipping...");
return Ok(());
}
};

if {{ resource_remote_ref }}.is_empty() {
warn!("{{ kind_struct }} has no {{ resource_remote_ref }}, cannot {{ controller.action_summary }}. Skipping...");
return Ok(());
if let Err(e) = {{ controller.operation_id }}(&config, &{{ resource_remote_ref }}).await {
error!("Failed to delete {{ arg_name }}: {:?}", e);
return Err(OperatorError::FailedToDelete{{ kind_struct }}(e.into()));
}

{{ controller.operation_id }}(&config, &{{ resource_remote_ref }})
.await
.context("Failed to {{ controller.action_summary }}")?;

remove_finalizer({{ arg_name }}, kubernetes_api.clone()).await?;
let condition = create_condition(
"Deleted",
"UnavailableDeleted",
"Deleted the resource",
"Resource has has deleted",
{{ arg_name }}.meta().generation,
);
let mut {{ arg_name }}_clone = {{ arg_name }}.clone();
if let Some(status) = {{ arg_name }}_clone.status.as_mut() {
status.conditions.push(condition);
status.observed_generation = {{ arg_name }}.meta().generation;
}
return update_status(kubernetes_api.clone(), {{ arg_name }}_clone).await;
remove_finalizer({{ arg_name }}, kube_client.clone()).await?;
info!("Successfully deleted {{ arg_name }}");
Ok(())
}
{% endif %}
{% endfor %}
Loading

0 comments on commit 425a0de

Please sign in to comment.