diff --git a/lib/file-source/src/file_watcher/mod.rs b/lib/file-source/src/file_watcher/mod.rs index 7ac50173ea2b7e..7fbe9565ca1e4b 100644 --- a/lib/file-source/src/file_watcher/mod.rs +++ b/lib/file-source/src/file_watcher/mod.rs @@ -39,6 +39,7 @@ pub struct FileWatcher { findable: bool, reader: Box, file_position: FilePosition, + last_line_offset: u64, devno: u64, inode: u64, is_dead: bool, @@ -142,6 +143,7 @@ impl FileWatcher { findable: true, reader, file_position, + last_line_offset: file_position, devno, inode: ino, is_dead: false, @@ -208,10 +210,12 @@ impl FileWatcher { /// a new file handler as needed, transparently to the caller. pub(super) fn read_line(&mut self) -> io::Result> { self.track_read_attempt(); + self.track_last_line_offset(); + let line_offset = self.last_line_offset; let reader = &mut self.reader; let file_position = &mut self.file_position; - let initial_position = *file_position; + match read_until_with_max_size( reader, file_position, @@ -222,7 +226,7 @@ impl FileWatcher { Ok(Some(_)) => { self.track_read_success(); Ok(Some(RawLine { - offset: initial_position, + offset: line_offset, bytes: self.buf.split().freeze(), })) } @@ -239,7 +243,7 @@ impl FileWatcher { Ok(None) } else { Ok(Some(RawLine { - offset: initial_position, + offset: line_offset, bytes: buf, })) } @@ -267,6 +271,13 @@ impl FileWatcher { self.last_read_success = Instant::now(); } + #[inline] + fn track_last_line_offset(&mut self) { + if self.buf.is_empty() { + self.last_line_offset = self.file_position + } + } + #[inline] pub fn last_read_success(&self) -> Instant { self.last_read_success