Skip to content

Commit

Permalink
Refactor, move config outside of the main handler, add it to the main…
Browse files Browse the repository at this point in the history
… function

Also fix the examples, had to rename to camelCase
  • Loading branch information
edenreich committed Mar 19, 2024
1 parent f5fb747 commit 0189049
Show file tree
Hide file tree
Showing 10 changed files with 76 additions and 58 deletions.
1 change: 1 addition & 0 deletions .openapi-generator-ignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
src/controllers/cats.rs
35 changes: 17 additions & 18 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ fn main() {

#[derive(serde::Serialize)]
struct K8sManifest {
#[serde(rename = "apiVersion")]
api_version: String,
kind: String,
metadata: Metadata,
Expand All @@ -189,7 +190,7 @@ fn generate_manifest_from_example(name: &str, example: &openapiv3::Example) -> S
obj.remove("uuid");
}
let k8s_manifest = K8sManifest {
api_version: format!("{}s.{}/v1", name.to_lowercase(), API_GROUP),
api_version: format!("{}/v1", API_GROUP),
kind: name.to_string(),
metadata: Metadata {
name: "example".to_string(),
Expand Down Expand Up @@ -312,6 +313,8 @@ fn generate_lib_imports(scope: &mut Scope) {
scope.import("k8s_openapi::api::core::v1", "EventSource");
scope.import("k8s_openapi::api::core::v1", "ObjectReference");
scope.import("k8s_openapi::apimachinery::pkg::apis::meta::v1", "Time");
scope.import("openapi::apis::configuration", "Configuration");
scope.import("std::sync", "Arc");

scope.raw("pub mod types;");
scope.raw("pub mod controllers;");
Expand All @@ -320,23 +323,22 @@ fn generate_lib_imports(scope: &mut Scope) {
fn generate_event_capturing_function(scope: &mut Scope) {
let function: ItemFn = parse_quote! {
pub async fn watch_resource<T>(
config: Arc<Configuration>,
kubernetes_api: Api<T>,
watch_params: WatchParams,
handler: fn(WatchEvent<T>, Api<T>),
handler: fn(Arc<Configuration>, WatchEvent<T>, Api<T>),
) -> anyhow::Result<()>
where
T: Clone + core::fmt::Debug + DeserializeOwned + 'static,
{
let mut stream = kubernetes_api.watch(&watch_params, "0").await?.boxed();

loop {
while let Some(event) = stream.next().await {
match event {
Ok(event) => handler(event, kubernetes_api.clone()),
Ok(event) => handler(Arc::clone(&config), event, kubernetes_api.clone()),
Err(e) => error!("Error watching resource: {:?}", e),
}
}

sleep(Duration::from_secs(1)).await;
stream = kubernetes_api.watch(&watch_params, "0").await?.boxed();
}
Expand Down Expand Up @@ -552,6 +554,7 @@ fn generate_controller_imports(identifiers: &Identifiers) -> String {
use openapi::apis::{}_api::{};
use openapi::models::{} as {};
use openapi::apis::configuration::Configuration;
use std::sync::Arc;
\n\n",
identifiers.arg_name,
identifiers.struct_name,
Expand All @@ -578,18 +581,12 @@ fn generate_functions(identifiers: &Identifiers) -> Functions {

fn generate_main_handler(identifiers: &Identifiers) -> String {
format!(
"pub async fn {}(event: WatchEvent<{}>, kubernetes_api: Api<{}>) {{
"pub async fn {}(config: Arc<Configuration>, event: WatchEvent<{}>, kubernetes_api: Api<{}>) {{
let kind = {}::kind(&());
let kind_str = kind.to_string();
let config = &Configuration {{
base_path: \"http://localhost:8080\".to_string(),
user_agent: None,
client: reqwest::Client::new(),
..Configuration::default()
}};
match event {{
WatchEvent::Added(mut {}) => handle_added(config, kind_str, &mut {}, kubernetes_api).await,
WatchEvent::Modified(mut {}) => handle_modified(config, kind_str, &mut {}, kubernetes_api).await,
WatchEvent::Added(mut {}) => handle_added(&config, kind_str, &mut {}, kubernetes_api).await,
WatchEvent::Modified(mut {}) => handle_modified(&config, kind_str, &mut {}, kubernetes_api).await,
WatchEvent::Bookmark(bookmark) => {{
info!(\"{} Bookmark: {{:?}}\", bookmark.metadata.resource_version);
return;
Expand All @@ -616,19 +613,21 @@ fn generate_main_handler(identifiers: &Identifiers) -> String {
fn generate_function_dto(identifiers: &Identifiers) -> String {
format!(
"fn convert_to_dto({}: {}) -> {}Dto {{
let uuid = match {}.status {{
let _uuid = match {}.status {{
Some(status) => status.uuid,
None => None,
}};
{}Dto {{
uuid: uuid,
}}
// {}Dto {{
// uuid: uuid,
// }}
todo!(\"Implement the mapping for {}\")
}}",
identifiers.arg_name,
identifiers.struct_name,
identifiers.struct_name,
identifiers.arg_name,
identifiers.struct_name,
identifiers.tag_name,
)
}

Expand Down
2 changes: 1 addition & 1 deletion manifests/examples/cat.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion manifests/examples/dog.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion manifests/examples/horse.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions src/controllers/cats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,30 @@ use openapi::apis::cats_api::delete_cat_by_id;
use openapi::apis::cats_api::update_cat_by_id;
use openapi::apis::configuration::Configuration;
use openapi::models::Cat as CatDto;
use std::sync::Arc;

fn convert_to_dto(cat: Cat) -> CatDto {
let uuid = match cat.status {
Some(status) => status.uuid,
None => None,
};
CatDto { uuid: uuid }
CatDto {
uuid,
name: cat.spec.name,
breed: cat.spec.breed,
age: cat.spec.age,
}
}

pub async fn handle(event: WatchEvent<Cat>, kubernetes_api: Api<Cat>) {
pub async fn handle(config: Arc<Configuration>, event: WatchEvent<Cat>, kubernetes_api: Api<Cat>) {
let kind = Cat::kind(&());
let kind_str = kind.to_string();
let config = &Configuration {
base_path: "http://localhost:8080".to_string(),
user_agent: None,
client: reqwest::Client::new(),
..Configuration::default()
};
match event {
WatchEvent::Added(mut cat) => {
handle_added(config, kind_str, &mut cat, kubernetes_api).await
handle_added(&config, kind_str, &mut cat, kubernetes_api).await
}
WatchEvent::Modified(mut cat) => {
handle_modified(config, kind_str, &mut cat, kubernetes_api).await
handle_modified(&config, kind_str, &mut cat, kubernetes_api).await
}
WatchEvent::Bookmark(bookmark) => {
info!("cat Bookmark: {:?}", bookmark.metadata.resource_version);
Expand Down
20 changes: 9 additions & 11 deletions src/controllers/dogs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,28 @@ use openapi::apis::dogs_api::create_dog;
use openapi::apis::dogs_api::delete_dog_by_id;
use openapi::apis::dogs_api::update_dog_by_id;
use openapi::models::Dog as DogDto;
use std::sync::Arc;

fn convert_to_dto(dog: Dog) -> DogDto {
let uuid = match dog.status {
let _uuid = match dog.status {
Some(status) => status.uuid,
None => None,
};
DogDto { uuid: uuid }
// DogDto {
// uuid: uuid,
// }
todo!("Implement the mapping for dogs")
}

pub async fn handle(event: WatchEvent<Dog>, kubernetes_api: Api<Dog>) {
pub async fn handle(config: Arc<Configuration>, event: WatchEvent<Dog>, kubernetes_api: Api<Dog>) {
let kind = Dog::kind(&());
let kind_str = kind.to_string();
let config = &Configuration {
base_path: "http://localhost:8080".to_string(),
user_agent: None,
client: reqwest::Client::new(),
..Configuration::default()
};
match event {
WatchEvent::Added(mut dog) => {
handle_added(config, kind_str, &mut dog, kubernetes_api).await
handle_added(&config, kind_str, &mut dog, kubernetes_api).await
}
WatchEvent::Modified(mut dog) => {
handle_modified(config, kind_str, &mut dog, kubernetes_api).await
handle_modified(&config, kind_str, &mut dog, kubernetes_api).await
}
WatchEvent::Bookmark(bookmark) => {
info!("dog Bookmark: {:?}", bookmark.metadata.resource_version);
Expand Down
24 changes: 13 additions & 11 deletions src/controllers/horses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,32 @@ use openapi::apis::horses_api::create_horse;
use openapi::apis::horses_api::delete_horse_by_id;
use openapi::apis::horses_api::update_horse_by_id;
use openapi::models::Horse as HorseDto;
use std::sync::Arc;

fn convert_to_dto(horse: Horse) -> HorseDto {
let uuid = match horse.status {
let _uuid = match horse.status {
Some(status) => status.uuid,
None => None,
};
HorseDto { uuid: uuid }
// HorseDto {
// uuid: uuid,
// }
todo!("Implement the mapping for horses")
}

pub async fn handle(event: WatchEvent<Horse>, kubernetes_api: Api<Horse>) {
pub async fn handle(
config: Arc<Configuration>,
event: WatchEvent<Horse>,
kubernetes_api: Api<Horse>,
) {
let kind = Horse::kind(&());
let kind_str = kind.to_string();
let config = &Configuration {
base_path: "http://localhost:8080".to_string(),
user_agent: None,
client: reqwest::Client::new(),
..Configuration::default()
};
match event {
WatchEvent::Added(mut horse) => {
handle_added(config, kind_str, &mut horse, kubernetes_api).await
handle_added(&config, kind_str, &mut horse, kubernetes_api).await
}
WatchEvent::Modified(mut horse) => {
handle_modified(config, kind_str, &mut horse, kubernetes_api).await
handle_modified(&config, kind_str, &mut horse, kubernetes_api).await
}
WatchEvent::Bookmark(bookmark) => {
info!("horse Bookmark: {:?}", bookmark.metadata.resource_version);
Expand Down
7 changes: 5 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@ use kube::api::{Api, ObjectMeta, Patch, PatchParams, PostParams, WatchEvent, Wat
use kube::core::CustomResourceExt;
use kube::{Resource, ResourceExt};
use log::{debug, error, info};
use openapi::apis::configuration::Configuration;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::json;
use std::sync::Arc;
use tokio::time::{sleep, Duration};

pub mod types;

pub mod controllers;

pub async fn watch_resource<T>(
config: Arc<Configuration>,
kubernetes_api: Api<T>,
watch_params: WatchParams,
handler: fn(WatchEvent<T>, Api<T>),
handler: fn(Arc<Configuration>, WatchEvent<T>, Api<T>),
) -> anyhow::Result<()>
where
T: Clone + core::fmt::Debug + DeserializeOwned + 'static,
Expand All @@ -27,7 +30,7 @@ where
loop {
while let Some(event) = stream.next().await {
match event {
Ok(event) => handler(event, kubernetes_api.clone()),
Ok(event) => handler(Arc::clone(&config), event, kubernetes_api.clone()),
Err(e) => error!("Error watching resource: {:?}", e),
}
}
Expand Down
21 changes: 18 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use kube::{
Client,
};
use log::{error, info};
use openapi::apis::configuration::Configuration;
use std::sync::Arc;
use tokio::time::{sleep, Duration};

async fn check_any_crd_from_group(client: Client, group: &str) -> anyhow::Result<bool> {
Expand All @@ -32,31 +34,44 @@ async fn main() -> anyhow::Result<()> {
return Ok(());
}

let config = Arc::new(Configuration {
base_path: "http://localhost:8080".to_string(),
user_agent: None,
client: reqwest::Client::new(),
..Configuration::default()
});

tokio::spawn(watch_resource::<k8s_operator::types::cat::Cat>(
Arc::clone(&config),
Api::default_namespaced(client.clone()).clone(),
watch_params.clone(),
|event, kubernetes_api| {
|config, event, kubernetes_api| {
tokio::spawn(k8s_operator::controllers::cats::handle(
config,
event,
kubernetes_api,
));
},
));
tokio::spawn(watch_resource::<k8s_operator::types::dog::Dog>(
Arc::clone(&config),
Api::default_namespaced(client.clone()).clone(),
watch_params.clone(),
|event, kubernetes_api| {
|config, event, kubernetes_api| {
tokio::spawn(k8s_operator::controllers::dogs::handle(
config,
event,
kubernetes_api,
));
},
));
tokio::spawn(watch_resource::<k8s_operator::types::horse::Horse>(
Arc::clone(&config),
Api::default_namespaced(client.clone()).clone(),
watch_params.clone(),
|event, kubernetes_api| {
|config, event, kubernetes_api| {
tokio::spawn(k8s_operator::controllers::horses::handle(
config,
event,
kubernetes_api,
));
Expand Down

0 comments on commit 0189049

Please sign in to comment.