From 844f82bdefe83067bbd296939d8332bee854048a Mon Sep 17 00:00:00 2001 From: wasup-yash Date: Mon, 11 Dec 2023 02:10:14 +0530 Subject: [PATCH] updated structure of log drivers Signed-off-by: wasup-yash --- .../server/src/container_log/container_log.rs | 66 ++++++++++++------- .../src/{ => container_log}/json_logger.rs | 2 +- conmon-rs/server/src/container_log/mod.rs | 3 +- conmon-rs/server/src/lib.rs | 1 - 4 files changed, 47 insertions(+), 25 deletions(-) rename conmon-rs/server/src/{ => container_log}/json_logger.rs (99%) diff --git a/conmon-rs/server/src/container_log/container_log.rs b/conmon-rs/server/src/container_log/container_log.rs index d9bcf8847e..0c71bc1cd6 100644 --- a/conmon-rs/server/src/container_log/container_log.rs +++ b/conmon-rs/server/src/container_log/container_log.rs @@ -1,4 +1,4 @@ -use crate::{container_io::Pipe, container_log::cri_logger::CriLogger, container_log::journald::JournaldLogger}; +use crate::{container_io::Pipe, container_log::cri_logger::CriLogger, container_log::journald::JournaldLogger, container_log::json_logger::JsonLogger}; use anyhow::Result; use capnp::struct_list::Reader; use conmon_common::conmon_capnp::conmon::log_driver::{Owned, Type}; @@ -16,42 +16,59 @@ pub struct ContainerLog { enum LogDriver { ContainerRuntimeInterface(CriLogger), Journald(JournaldLogger), + Json(JsonLogger), } impl ContainerLog { - //Create a new default SharedContainerLog + ///Create a new default SharedContainerLog pub fn new() -> SharedContainerLog { Arc::new(RwLock::new(Self::default())) } - + /// Create a new SharedContainerLog from an capnp owned reader. pub fn from(reader: Reader) -> Result { let drivers = reader .iter() .map(|x| -> Result<_> { match x.get_type()? { Type::ContainerRuntimeInterface => { - Ok(LogDriver::ContainerRuntimeInterface(CriLogger::new( + Ok(Some(LogDriver::ContainerRuntimeInterface(CriLogger::new( x.get_path()?, if x.get_max_size() > 0 { Some(x.get_max_size() as usize) } else { None }, - )?)) + )?))) + } + Type::Json => { + Ok(Some(LogDriver::Json(JsonLogger::new( + x.get_path()?, + if x.get_max_size() > 0 { + Some(x.get_max_size() as usize) + } else { + None + }, + )?))) + } + Type::Journald => { + Ok(Some(LogDriver::Journald(JournaldLogger::new( + + if x.get_max_size() > 0 { + Some(x.get_max_size() as usize) + } else { + None + }, + )?))) } - Type::Journald => Ok(LogDriver::Journald(JournaldLogger::new( - if x.get_max_size() > 0 { - Some(x.get_max_size() as usize) - } else { - None - }, - )?)), - } }) - .collect::>>()?; + .filter_map(Result::transpose) + .collect::, _>>()?; Ok(Arc::new(RwLock::new(Self { drivers }))) } + + + /// Asynchronously initialize all loggers. pub async fn init(&mut self) -> Result<()> { join_all( @@ -61,7 +78,7 @@ impl ContainerLog { LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => { cri_logger.init().boxed() } - + LogDriver::Json(ref mut json_logger) => json_logger.init().boxed(), LogDriver::Journald(ref mut journald_logger) => journald_logger.init().boxed(), }) .collect::>(), @@ -71,6 +88,7 @@ impl ContainerLog { .collect::>>()?; Ok(()) } + /// Reopen the container logs. pub async fn reopen(&mut self) -> Result<()> { join_all( @@ -80,6 +98,7 @@ impl ContainerLog { LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => { cri_logger.reopen().boxed() } + LogDriver::Json(ref mut json_logger) => json_logger.reopen().boxed(), LogDriver::Journald(ref mut journald_logger) => journald_logger.reopen().boxed(), }) .collect::>(), @@ -89,10 +108,10 @@ impl ContainerLog { .collect::>>()?; Ok(()) } - + /// Write the contents of the provided reader into all loggers. pub async fn write(&mut self, pipe: Pipe, bytes: T) -> Result<()> where - T: AsyncBufRead + Unpin + Clone, + T: AsyncBufRead + Unpin + Clone, // Using Clone to satisfy both Clone and Copy requirements { let futures = self .drivers @@ -105,20 +124,23 @@ impl ContainerLog { ) -> Result<()> { match logger { LogDriver::ContainerRuntimeInterface(cri_logger) => { - cri_logger.write(pipe, bytes).await + cri_logger.write(pipe, bytes.clone()).await + } + LogDriver::Journald(journald_logger) => { + journald_logger.write(pipe, bytes.clone()).await + } + LogDriver::Json(json_logger) => { + json_logger.write(pipe, bytes.clone()).await } - LogDriver::Journald(journald_logger) => journald_logger.write(pipe, bytes).await, } } - box_future(x, pipe, bytes.clone()) }) .collect::>(); - join_all(futures) .await .into_iter() .collect::>>()?; Ok(()) - } + } } \ No newline at end of file diff --git a/conmon-rs/server/src/json_logger.rs b/conmon-rs/server/src/container_log/json_logger.rs similarity index 99% rename from conmon-rs/server/src/json_logger.rs rename to conmon-rs/server/src/container_log/json_logger.rs index 211f1f83cd..d7c28f3841 100644 --- a/conmon-rs/server/src/json_logger.rs +++ b/conmon-rs/server/src/container_log/json_logger.rs @@ -1,4 +1,4 @@ -use crate::container_io::Pipe; +pub use crate::container_io::Pipe; use anyhow::{Context, Result}; use getset::{CopyGetters, Getters, Setters}; use serde_json::json; diff --git a/conmon-rs/server/src/container_log/mod.rs b/conmon-rs/server/src/container_log/mod.rs index 08eff8f3a2..293f4e3d9f 100644 --- a/conmon-rs/server/src/container_log/mod.rs +++ b/conmon-rs/server/src/container_log/mod.rs @@ -1,3 +1,4 @@ pub mod cri_logger; pub mod container_log; -pub mod journald; \ No newline at end of file +pub mod journald; +pub mod json_logger; \ No newline at end of file diff --git a/conmon-rs/server/src/lib.rs b/conmon-rs/server/src/lib.rs index 591b43db74..19b5781491 100644 --- a/conmon-rs/server/src/lib.rs +++ b/conmon-rs/server/src/lib.rs @@ -17,7 +17,6 @@ mod container_log; mod fd_socket; mod init; mod journal; -mod json_logger; mod listener; mod oom_watcher; mod pause;