Skip to content

Commit

Permalink
updated for write to journal
Browse files Browse the repository at this point in the history
Signed-off-by: wasup-yash <[email protected]>
  • Loading branch information
wasup-yash committed Oct 26, 2023
1 parent 5a37b92 commit db759b4
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 108 deletions.
76 changes: 76 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions conmon-rs/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ tracing-opentelemetry = "0.21.0"
tracing-subscriber = "0.3.17"
tz-rs = "0.6.14"
uuid = { version = "1.5.0", features = ["v4", "fast-rng", "macro-diagnostics"] }
systemd = "0.10.0"

[build-dependencies]
shadow-rs = "0.24.1"
Expand Down
2 changes: 1 addition & 1 deletion conmon-rs/server/src/container_log/container_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ impl ContainerLog {
)?))
}
Type::Journald => Ok(LogDriver::Journald(JournaldLogger::new(
x.get_path()?,
if x.get_max_size() > 0 {
Some(x.get_max_size() as usize)
} else {
None
},
)?)),

}
})
.collect::<Result<Vec<_>>>()?;
Expand Down
160 changes: 53 additions & 107 deletions conmon-rs/server/src/container_log/journald.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,14 @@
use crate::container_io::Pipe;
use anyhow::{Context, Result};
use getset::{CopyGetters, Getters, Setters};
use std::{
marker::Unpin,
path::{Path, PathBuf},
};
use tokio::{
fs::{File, OpenOptions},
io::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
};
use anyhow::Result;
use getset::{Getters, Setters};
use std::{marker::Unpin, io::Cursor};
use crate::journal::Journal;
use tokio::io::{AsyncBufRead, AsyncBufReadExt};
use tracing::debug;
use std::io::Write;

#[derive(Debug, CopyGetters, Getters, Setters)]
#[derive(Debug, Getters, Setters)]
pub struct JournaldLogger {
#[getset(get)]
path: PathBuf,

#[getset(set)]
file: Option<BufWriter<File>>,

#[getset(get_copy)]
max_log_size: Option<usize>,

Expand All @@ -27,43 +17,36 @@ pub struct JournaldLogger {
}

impl JournaldLogger {
const ERR_UNINITIALIZED: &'static str = "logger not initialized";

pub fn new<T: AsRef<Path>>(path: T, max_log_size: Option<usize>) -> Result<JournaldLogger> {
pub fn new(max_log_size: Option<usize>) -> Result<Self> {
Ok(Self {
path: path.as_ref().into(),
file: None,
max_log_size,
bytes_written: 0,
})
}

pub async fn init(&mut self) -> Result<()> {
debug!("Initializing Journald logger in path {}", self.path().display());
self.set_file(Self::open(self.path()).await?.into());
debug!("Initializing Journald logger");
Ok(())
}

pub async fn write<T>(&mut self, pipe: Pipe, bytes: T) -> Result<()>
pub async fn write<T>(&mut self, pipe: Pipe, mut bytes: T) -> Result<()>
where
T: AsyncBufRead + Unpin,
{
let mut reader = BufReader::new(bytes);
let mut line_buf = Vec::new();

while reader.read_until(b'\n', &mut line_buf).await? > 0 {
let mut line_buf = String::new();
while bytes.read_line(&mut line_buf).await? > 0 {
let log_entry = format!(
"{} [{}] {}",
format!("{:?}", std::time::SystemTime::now()),
match pipe {
Pipe::StdOut => "stdout",
Pipe::StdErr => "stderr",
},
String::from_utf8_lossy(&line_buf).trim()
line_buf.trim()
);

let bytes = log_entry.as_bytes();
self.bytes_written += bytes.len();
let bytes_len = log_entry.len();
self.bytes_written += bytes_len;

if let Some(max_size) = self.max_log_size {
if self.bytes_written > max_size {
Expand All @@ -72,102 +55,65 @@ impl JournaldLogger {
}
}

let file = self.file.as_mut().context(Self::ERR_UNINITIALIZED)?;
file.write_all(bytes).await?;
file.write_all(b"\n").await?;
self.flush().await?;
Journal.write_all(log_entry.as_bytes())?;
Journal.flush()?;
line_buf.clear();
}

Ok(())
}

pub async fn reopen(&mut self) -> Result<()> {
debug!("Reopen Journald log {}", self.path().display());
self.file
.as_mut()
.context(Self::ERR_UNINITIALIZED)?
.get_ref()
.sync_all()
.await?;
self.init().await
debug!("Reopen Journald log");
// Implement logic for reopening if necessary
Ok(())
}

pub async fn flush(&mut self) -> Result<()> {
self.file
.as_mut()
.context(Self::ERR_UNINITIALIZED)?
.flush()
.await
.context("flush file writer")
}

async fn open<T: AsRef<Path>>(path: T) -> Result<BufWriter<File>> {
Ok(BufWriter::new(
OpenOptions::new()
.create(true)
.read(true)
.truncate(true)
.write(true)
.open(&path)
.await
.context(format!("open log file path '{}'", path.as_ref().display()))?,
))
// Implement logic for flushing if necessary
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
use tokio::io::AsyncReadExt;

#[tokio::test]
async fn test_journald_logger_new() {
let logger = JournaldLogger::new("/tmp/journald_test.log", Some(1000)).unwrap();
assert_eq!(logger.path().to_str().unwrap(), "/tmp/journald_test.log");
assert_eq!(logger.max_log_size().unwrap(), 1000);
}

#[tokio::test]
async fn test_journald_logger_init() {
let mut logger = JournaldLogger::new("/tmp/journald_test_init.log", Some(1000)).unwrap();
logger.init().await.unwrap();
assert!(logger.file.is_some());
}

#[tokio::test]
async fn test_journald_logger_write() {
let mut logger = JournaldLogger::new("/tmp/journald_test_write.log", Some(1000)).unwrap();
logger.init().await.unwrap();

let cursor = Cursor::new(b"Test log message\n".to_vec());
logger.write(Pipe::StdOut, cursor).await.unwrap();

let mut file = File::open("/tmp/journald_test_write.log").await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();

assert!(contents.contains("Test log message"));
}
#[tokio::test]
async fn test_journald_logger_new() {
let logger = JournaldLogger::new(Some(1000)).unwrap();
assert_eq!(logger.max_log_size.unwrap(), 1000);
}

#[tokio::test]
async fn test_journald_logger_reopen() {
let mut logger = JournaldLogger::new("/tmp/journald_test_reopen.log", Some(1000)).unwrap();
logger.init().await.unwrap();
#[tokio::test]
async fn test_journald_logger_init() {
let mut logger = JournaldLogger::new(Some(1000)).unwrap();
assert!(logger.init().await.is_ok());
}

let cursor = Cursor::new(b"Test log message before reopen\n".to_vec());
logger.write(Pipe::StdOut, cursor).await.unwrap();
#[tokio::test]
async fn test_journald_logger_write() {
let mut logger = JournaldLogger::new(Some(1000)).unwrap();
logger.init().await.unwrap();

logger.reopen().await.unwrap();
let cursor = Cursor::new(b"Test log message\n".to_vec());
assert!(logger.write(Pipe::StdOut, cursor).await.is_ok());

let cursor = Cursor::new(b"Test log message after reopen\n".to_vec());
logger.write(Pipe::StdOut, cursor).await.unwrap();
// Verifying the actual log message in Journald might require additional setup or permissions.
}

let mut file = File::open("/tmp/journald_test_reopen.log").await.unwrap();
let mut contents = String::new();
file.read_to_string(&mut contents).await.unwrap();
#[tokio::test]
async fn test_journald_logger_reopen() {
let mut logger = JournaldLogger::new(Some(1000)).unwrap();
logger.init().await.unwrap();

assert!(contents.contains("Test log message after reopen"));
assert!(!contents.contains("Test log message before reopen"));
}
let cursor = Cursor::new(b"Test log message before reopen\n".to_vec());
assert!(logger.write(Pipe::StdOut, cursor).await.is_ok());

assert!(logger.reopen().await.is_ok());

let cursor = Cursor::new(b"Test log message after reopen\n".to_vec());
assert!(logger.write(Pipe::StdOut, cursor).await.is_ok());

// As with the write test, verifying the actual log messages in Journald might require additional setup or permissions.
}

0 comments on commit db759b4

Please sign in to comment.