diff --git a/changelog.d/22050-fingerprint-uncompressed-file-content.fix.md b/changelog.d/22050-fingerprint-uncompressed-file-content.fix.md new file mode 100644 index 0000000000000..a0b74ee22d0dd --- /dev/null +++ b/changelog.d/22050-fingerprint-uncompressed-file-content.fix.md @@ -0,0 +1,5 @@ +Changes the fingerprint for file sources to use uncompressed file content +as a source of truth when fingerprinting lines and checking +`ignored_header_bytes`. Previously this was using the compressed bytes. For now, only gzip compression is supported. + +authors: roykim98 diff --git a/lib/file-source/src/fingerprinter.rs b/lib/file-source/src/fingerprinter.rs index 30086da1cd479..542e67393da35 100644 --- a/lib/file-source/src/fingerprinter.rs +++ b/lib/file-source/src/fingerprinter.rs @@ -1,12 +1,14 @@ use std::{ collections::HashSet, fs::{self, metadata, File}, - io::{self, Read, Seek, SeekFrom, Write}, + io::{self, BufRead, BufReader, Read, Seek, SeekFrom, Write}, path::{Path, PathBuf}, }; use crc::Crc; +use flate2::bufread::GzDecoder; use serde::{Deserialize, Serialize}; +use vector_common::constants::GZIP_MAGIC; use crate::{metadata_ext::PortableFileExt, FileSourceInternalEvents}; @@ -69,6 +71,95 @@ impl From for FileFingerprint { } } +#[derive(Debug, Copy, Clone)] +enum SupportedCompressionAlgorithms { + Gzip, +} + +impl SupportedCompressionAlgorithms { + fn values() -> Vec { + // Enumerate these from smallest magic_header_bytes to largest + vec![SupportedCompressionAlgorithms::Gzip] + } + + fn magic_header_bytes(&self) -> &'static [u8] { + match self { + SupportedCompressionAlgorithms::Gzip => GZIP_MAGIC, + } + } +} + +trait UncompressedReader { + fn check(fp: &mut File) -> Result, std::io::Error>; + fn reader<'a>(fp: &'a mut File) -> Result, std::io::Error>; +} + +struct UncompressedReaderImpl; +impl UncompressedReader for UncompressedReaderImpl { + /// Checks a file for supported compression algorithms by searching for + /// supported magic header bytes. + /// + /// If an error occurs during reading, the file handler may become unusable, + /// as the cursor position of the file may not be reset. + /// + /// # Arguments + /// - `fp`: A mutable reference to the file to check. + /// + /// # Returns + /// - `Ok(Some(algorithm))` if a supported compression algorithm is detected. + /// - `Ok(None)` if no supported compression algorithm is detected. + /// - `Err(std::io::Error)` if an I/O error occurs. + fn check(fp: &mut File) -> Result, std::io::Error> { + let mut algorithm: Option = None; + for compression_algorithm in SupportedCompressionAlgorithms::values() { + // magic headers for algorithms can be of different lengths, and using a buffer too long could exceed the length of the file + // so instantiate and check the various sizes in monotonically increasing order + let magic_header_bytes = compression_algorithm.magic_header_bytes(); + + let mut magic = vec![0u8; magic_header_bytes.len()]; + + fp.seek(SeekFrom::Start(0))?; + let result = fp.read_exact(&mut magic); + + if result.is_err() { + fp.seek(SeekFrom::Start(0))?; + return Err(result.unwrap_err()); + } + + if magic == magic_header_bytes { + algorithm = Some(compression_algorithm); + break; + } + } + fp.seek(SeekFrom::Start(0))?; + Ok(algorithm) + } + + fn reader<'a>(fp: &'a mut File) -> Result, std::io::Error> { + // To support new compression algorithms, add them below + match Self::check(fp)? { + Some(SupportedCompressionAlgorithms::Gzip) => { + Ok(Box::new(BufReader::new(GzDecoder::new(BufReader::new(fp))))) + } + // No compression, or read the raw bytes + None => Ok(Box::new(BufReader::new(fp))), + } + } +} + +fn skip_first_n_bytes(reader: &mut R, n: usize) -> io::Result<()> { + // We cannot simply seek the file by n because the file may be compressed; + // to skip the first n decompressed bytes, we decompress up to n and discard the output. + let mut skipped_bytes = 0; + while skipped_bytes < n { + let chunk = reader.fill_buf()?; + let bytes_to_skip = std::cmp::min(chunk.len(), n - skipped_bytes); + reader.consume(bytes_to_skip); + skipped_bytes += bytes_to_skip; + } + Ok(()) +} + impl Fingerprinter { pub fn get_fingerprint_of_file( &self, @@ -95,8 +186,10 @@ impl Fingerprinter { } => { buffer.resize(self.max_line_length, 0u8); let mut fp = fs::File::open(path)?; - fp.seek(SeekFrom::Start(ignored_header_bytes as u64))?; - let bytes_read = fingerprinter_read_until(fp, b'\n', lines, buffer)?; + let mut reader = UncompressedReaderImpl::reader(&mut fp)?; + + skip_first_n_bytes(&mut reader, ignored_header_bytes)?; + let bytes_read = fingerprinter_read_until(reader, b'\n', lines, buffer)?; let fingerprint = FINGERPRINT_CRC.checksum(&buffer[..bytes_read]); Ok(FirstLinesChecksum(fingerprint)) } @@ -281,12 +374,37 @@ fn fingerprinter_read_until( #[cfg(test)] mod test { - use std::{collections::HashSet, fs, io::Error, path::Path, time::Duration}; + use std::{ + collections::HashSet, + fs, + io::{Error, Read, Write}, + path::Path, + time::Duration, + }; - use tempfile::tempdir; + use flate2::write::GzEncoder; + use tempfile::{tempdir, TempDir}; use super::{FileSourceInternalEvents, FingerprintStrategy, Fingerprinter}; + fn gzip(data: &mut [u8]) -> Vec { + let mut buffer = vec![]; + let mut encoder = GzEncoder::new(&mut buffer, flate2::Compression::default()); + encoder.write_all(data).expect("Failed to write data"); + encoder + .finish() + .expect("Failed to finish encoding with gzip footer"); + buffer + } + + fn read_byte_content(target_dir: &TempDir, file: &str) -> Vec { + let path = target_dir.path().join(file); + let mut file = fs::File::open(path).unwrap(); + let mut content = Vec::new(); + file.read_to_end(&mut content).unwrap(); + content + } + #[test] fn test_checksum_fingerprint() { let fingerprinter = Fingerprinter { @@ -366,10 +484,21 @@ mod test { let empty = prepare_test("empty.log", b""); let incomplete_line = prepare_test("incomplete_line.log", b"missing newline char"); - let one_line = prepare_test("one_line.log", b"hello world\n"); + let one_line = prepare_test( + "one_line_duplicate_compressed.log", + &gzip(&mut b"hello world\n".to_vec()), + ); let one_line_duplicate = prepare_test("one_line_duplicate.log", b"hello world\n"); + let one_line_duplicate_compressed = prepare_test( + "one_line_duplicate_compressed.log", + &gzip(&mut b"hello world\n".to_vec()), + ); let one_line_continued = prepare_test("one_line_continued.log", b"hello world\nthe next line\n"); + let one_line_continued_compressed = prepare_test( + "one_line_continued_compressed.log", + &gzip(&mut b"hello world\nthe next line\n".to_vec()), + ); let different_two_lines = prepare_test("different_two_lines.log", b"line one\nline two\n"); let exactly_max_line_length = @@ -395,8 +524,22 @@ mod test { assert!(run(&exactly_max_line_length).is_ok()); assert!(run(&exceeding_max_line_length).is_ok()); - assert_eq!(run(&one_line).unwrap(), run(&one_line_duplicate).unwrap()); - assert_eq!(run(&one_line).unwrap(), run(&one_line_continued).unwrap()); + assert_eq!( + run(&one_line).unwrap(), + run(&one_line_duplicate_compressed).unwrap() + ); + assert_eq!( + run(&one_line).unwrap(), + run(&one_line_continued_compressed).unwrap() + ); + assert_eq!( + run(&one_line).unwrap(), + run(&one_line_duplicate_compressed).unwrap() + ); + assert_eq!( + run(&one_line).unwrap(), + run(&one_line_continued_compressed).unwrap() + ); assert_ne!(run(&one_line).unwrap(), run(&different_two_lines).unwrap()); @@ -404,6 +547,16 @@ mod test { run(&exactly_max_line_length).unwrap(), run(&exceeding_max_line_length).unwrap() ); + + assert_ne!( + read_byte_content(&target_dir, "one_line_duplicate.log"), + read_byte_content(&target_dir, "one_line_duplicate_compressed.log") + ); + + assert_ne!( + read_byte_content(&target_dir, "one_line_continued.log"), + read_byte_content(&target_dir, "one_line_continued_compressed.log") + ); } #[test] @@ -436,6 +589,15 @@ mod test { "two_lines_continued.log", b"hello world\nfrom vector\nthe next line\n", ); + let two_lines_duplicate_compressed = prepare_test( + "two_lines_duplicate_compressed.log", + &gzip(&mut b"hello world\nfrom vector\n".to_vec()), + ); + let two_lines_continued_compressed = prepare_test( + "two_lines_continued_compressed.log", + &gzip(&mut b"hello world\nfrom vector\nthe next line\n".to_vec()), + ); + let different_three_lines = prepare_test( "different_three_lines.log", b"line one\nline two\nine three\n", @@ -453,11 +615,82 @@ mod test { assert_eq!(run(&two_lines).unwrap(), run(&two_lines_duplicate).unwrap()); assert_eq!(run(&two_lines).unwrap(), run(&two_lines_continued).unwrap()); + assert_eq!( + run(&two_lines).unwrap(), + run(&two_lines_duplicate_compressed).unwrap() + ); + assert_eq!( + run(&two_lines).unwrap(), + run(&two_lines_continued_compressed).unwrap() + ); assert_ne!( run(&two_lines).unwrap(), run(&different_three_lines).unwrap() ); + + assert_ne!( + read_byte_content(&target_dir, "two_lines_duplicate.log"), + read_byte_content(&target_dir, "two_lines_duplicate_compressed.log") + ); + assert_ne!( + read_byte_content(&target_dir, "two_lines_continued.log"), + read_byte_content(&target_dir, "two_lines_continued_compressed.log") + ); + } + + #[test] + fn test_first_two_lines_checksum_fingerprint_with_headers() { + let max_line_length = 64; + let fingerprinter = Fingerprinter { + strategy: FingerprintStrategy::FirstLinesChecksum { + ignored_header_bytes: 14, + lines: 2, + }, + max_line_length, + ignore_not_found: false, + }; + + let target_dir = tempdir().unwrap(); + let prepare_test = |file: &str, contents: &[u8]| { + let path = target_dir.path().join(file); + fs::write(&path, contents).unwrap(); + path + }; + + let two_lines = prepare_test( + "two_lines.log", + b"some-header-1\nhello world\nfrom vector\n", + ); + let two_lines_compressed_same_header = prepare_test( + "two_lines_compressed_same_header.log", + &gzip(&mut b"some-header-1\nhello world\nfrom vector\n".to_vec()), + ); + let two_lines_compressed_same_header_size = prepare_test( + "two_lines_compressed_same_header_size.log", + &gzip(&mut b"some-header-2\nhello world\nfrom vector\n".to_vec()), + ); + let two_lines_compressed_different_header_size = prepare_test( + "two_lines_compressed_different_header_size.log", + &gzip(&mut b"some-header-22\nhellow world\nfrom vector\n".to_vec()), + ); + + let mut buf = Vec::new(); + let mut run = move |path| fingerprinter.get_fingerprint_of_file(path, &mut buf); + + assert!(run(&two_lines).is_ok()); + assert_eq!( + run(&two_lines).unwrap(), + run(&two_lines_compressed_same_header).unwrap() + ); + assert_eq!( + run(&two_lines).unwrap(), + run(&two_lines_compressed_same_header_size).unwrap() + ); + assert_ne!( + run(&two_lines).unwrap(), + run(&two_lines_compressed_different_header_size).unwrap() + ); } #[test] @@ -517,6 +750,22 @@ mod test { .is_none()); } + #[test] + fn test_monotonic_compression_algorithms() { + // This test is necessary to handle an edge case where when assessing the magic header + // bytes of a file to determine the compression algorithm, it's possible that the length of + // the file is smaller than the size of the magic header bytes it's being assessed against. + // While this could be an indication that the file is simply too small, it could also + // just be that the compression header is a smaller one than the assessed algorithm. + // Checking this with a guarantee on the monotonically increasing order assures that this edge case doesn't happen. + let algos = super::SupportedCompressionAlgorithms::values(); + let mut smallest_byte_length = 0; + for algo in algos { + let magic_header_bytes = algo.magic_header_bytes(); + assert!(smallest_byte_length <= magic_header_bytes.len()); + smallest_byte_length = magic_header_bytes.len(); + } + } #[derive(Clone)] struct NoErrors; diff --git a/src/sources/file.rs b/src/sources/file.rs index 362cae59cc07b..c5611caddaee6 100644 --- a/src/sources/file.rs +++ b/src/sources/file.rs @@ -298,6 +298,8 @@ pub enum FingerprintConfig { bytes: Option, /// The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum. + /// If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only + /// gzip is supported at this time. /// /// This can be helpful if all files share a common header that should be skipped. #[serde(default = "default_ignored_header_bytes")] @@ -306,7 +308,8 @@ pub enum FingerprintConfig { /// The number of lines to read for generating the checksum. /// - /// If your files share a common header that is not always a fixed size, + /// The number of lines are determined from the uncompressed content if the file is compressed. Only + /// gzip is supported at this time. /// /// If the file has less than this amount of lines, it won’t be read at all. #[serde(default = "default_lines")] diff --git a/website/cue/reference/components/sources/base/file.cue b/website/cue/reference/components/sources/base/file.cue index e1ddfe19e3335..2c1247b099105 100644 --- a/website/cue/reference/components/sources/base/file.cue +++ b/website/cue/reference/components/sources/base/file.cue @@ -98,6 +98,8 @@ base: components: sources: file: configuration: { ignored_header_bytes: { description: """ The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum. + If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only + gzip is supported at this time. This can be helpful if all files share a common header that should be skipped. """ @@ -112,7 +114,8 @@ base: components: sources: file: configuration: { description: """ The number of lines to read for generating the checksum. - If your files share a common header that is not always a fixed size, + The number of lines are determined from the uncompressed content if the file is compressed. Only + gzip is supported at this time. If the file has less than this amount of lines, it won’t be read at all. """ diff --git a/website/cue/reference/components/sources/file.cue b/website/cue/reference/components/sources/file.cue index 3230ed53cbd0f..3eb6917c4ebc2 100644 --- a/website/cue/reference/components/sources/file.cue +++ b/website/cue/reference/components/sources/file.cue @@ -219,7 +219,8 @@ components: sources: file: { check](\(urls.crc)) (CRC) on the first N lines of the file. This serves as a *fingerprint* that uniquely identifies the file. The number of lines, N, that are read can be set using the [`fingerprint.lines`](#fingerprint.lines) and - [`fingerprint.ignored_header_bytes`](#fingerprint.ignored_header_bytes) options. + [`fingerprint.ignored_header_bytes`](#fingerprint.ignored_header_bytes) options. Note + that for compressed files, these lines and header bytes refer to the uncompressed content. This strategy avoids the common pitfalls associated with using device and inode names since inode names can be reused across files. This enables Vector to properly