Skip to content

Commit

Permalink
updated structure of log drivers
Browse files Browse the repository at this point in the history
Signed-off-by: wasup-yash <[email protected]>
  • Loading branch information
wasup-yash committed Dec 10, 2023
1 parent b8f33cb commit 844f82b
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 25 deletions.
66 changes: 44 additions & 22 deletions conmon-rs/server/src/container_log/container_log.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{container_io::Pipe, container_log::cri_logger::CriLogger, container_log::journald::JournaldLogger};
use crate::{container_io::Pipe, container_log::cri_logger::CriLogger, container_log::journald::JournaldLogger, container_log::json_logger::JsonLogger};
use anyhow::Result;
use capnp::struct_list::Reader;
use conmon_common::conmon_capnp::conmon::log_driver::{Owned, Type};
Expand All @@ -16,42 +16,59 @@ pub struct ContainerLog {
enum LogDriver {
ContainerRuntimeInterface(CriLogger),
Journald(JournaldLogger),
Json(JsonLogger),
}

impl ContainerLog {
//Create a new default SharedContainerLog
///Create a new default SharedContainerLog
pub fn new() -> SharedContainerLog {
Arc::new(RwLock::new(Self::default()))
}

/// Create a new SharedContainerLog from an capnp owned reader.
pub fn from(reader: Reader<Owned>) -> Result<SharedContainerLog> {
let drivers = reader
.iter()
.map(|x| -> Result<_> {
match x.get_type()? {
Type::ContainerRuntimeInterface => {
Ok(LogDriver::ContainerRuntimeInterface(CriLogger::new(
Ok(Some(LogDriver::ContainerRuntimeInterface(CriLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?))
)?)))
}
Type::Json => {
Ok(Some(LogDriver::Json(JsonLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)))
}
Type::Journald => {
Ok(Some(LogDriver::Journald(JournaldLogger::new(

if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)))
}
Type::Journald => Ok(LogDriver::Journald(JournaldLogger::new(
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)),

}
})
.collect::<Result<Vec<_>>>()?;
.filter_map(Result::transpose)
.collect::<Result<Vec<_>, _>>()?;
Ok(Arc::new(RwLock::new(Self { drivers })))
}



/// Asynchronously initialize all loggers.
pub async fn init(&mut self) -> Result<()> {
join_all(
Expand All @@ -61,7 +78,7 @@ impl ContainerLog {
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => {
cri_logger.init().boxed()
}

LogDriver::Json(ref mut json_logger) => json_logger.init().boxed(),
LogDriver::Journald(ref mut journald_logger) => journald_logger.init().boxed(),
})
.collect::<Vec<_>>(),
Expand All @@ -71,6 +88,7 @@ impl ContainerLog {
.collect::<Result<Vec<_>>>()?;
Ok(())
}

/// Reopen the container logs.
pub async fn reopen(&mut self) -> Result<()> {
join_all(
Expand All @@ -80,6 +98,7 @@ impl ContainerLog {
LogDriver::ContainerRuntimeInterface(ref mut cri_logger) => {
cri_logger.reopen().boxed()
}
LogDriver::Json(ref mut json_logger) => json_logger.reopen().boxed(),
LogDriver::Journald(ref mut journald_logger) => journald_logger.reopen().boxed(),
})
.collect::<Vec<_>>(),
Expand All @@ -89,10 +108,10 @@ impl ContainerLog {
.collect::<Result<Vec<_>>>()?;
Ok(())
}

/// Write the contents of the provided reader into all loggers.
pub async fn write<T>(&mut self, pipe: Pipe, bytes: T) -> Result<()>
where
T: AsyncBufRead + Unpin + Clone,
T: AsyncBufRead + Unpin + Clone, // Using Clone to satisfy both Clone and Copy requirements
{
let futures = self
.drivers
Expand All @@ -105,20 +124,23 @@ impl ContainerLog {
) -> Result<()> {
match logger {
LogDriver::ContainerRuntimeInterface(cri_logger) => {
cri_logger.write(pipe, bytes).await
cri_logger.write(pipe, bytes.clone()).await
}
LogDriver::Journald(journald_logger) => {
journald_logger.write(pipe, bytes.clone()).await
}
LogDriver::Json(json_logger) => {
json_logger.write(pipe, bytes.clone()).await
}
LogDriver::Journald(journald_logger) => journald_logger.write(pipe, bytes).await,
}
}

box_future(x, pipe, bytes.clone())
})
.collect::<Vec<_>>();

join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::container_io::Pipe;
pub use crate::container_io::Pipe;
use anyhow::{Context, Result};
use getset::{CopyGetters, Getters, Setters};
use serde_json::json;
Expand Down
3 changes: 2 additions & 1 deletion conmon-rs/server/src/container_log/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod cri_logger;
pub mod container_log;
pub mod journald;
pub mod journald;
pub mod json_logger;
1 change: 0 additions & 1 deletion conmon-rs/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ mod container_log;
mod fd_socket;
mod init;
mod journal;
mod json_logger;
mod listener;
mod oom_watcher;
mod pause;
Expand Down

0 comments on commit 844f82b

Please sign in to comment.