Skip to content

Commit

Permalink
feat(file sink): supports input based on encoding type (#21726)
Browse files Browse the repository at this point in the history
* feat(file sink): supports input based on encoding type

* PR review comments

* PR comments

* Add changelog

* Cargo clippy
  • Loading branch information
nionata authored Dec 2, 2024
1 parent daa9f24 commit dd8ed5d
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The file sink now supports any input event type that the configured encoding supports. It previously only supported log events.

authors: nionata
158 changes: 142 additions & 16 deletions src/sinks/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use vector_lib::{

use crate::{
codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
event::{Event, EventStatus, Finalizable},
expiring_hash_map::ExpiringHashMap,
internal_events::{
Expand Down Expand Up @@ -194,7 +194,7 @@ impl SinkConfig for FileSinkConfig {
}

fn input(&self) -> Input {
Input::new(self.encoding.config().1.input_type() & DataType::Log)
Input::new(self.encoding.config().1.input_type())
}

fn acknowledgements(&self) -> &AcknowledgementsConfig {
Expand Down Expand Up @@ -443,17 +443,23 @@ impl StreamSink<Event> for FileSink {
mod tests {
use std::convert::TryInto;

use chrono::{SubsecRound, Utc};
use futures::{stream, SinkExt};
use similar_asserts::assert_eq;
use vector_lib::{event::LogEvent, sink::VectorSink};
use vector_lib::{
codecs::JsonSerializerConfig,
event::{LogEvent, TraceEvent},
sink::VectorSink,
};

use super::*;
use crate::{
config::log_schema,
test_util::{
components::{assert_sink_compliance, FILE_SINK_TAGS},
lines_from_file, lines_from_gzip_file, lines_from_zstd_file, random_events_with_stream,
random_lines_with_stream, temp_dir, temp_file, trace_init,
random_lines_with_stream, random_metrics_with_stream,
random_metrics_with_stream_timestamp, temp_dir, temp_file, trace_init,
},
};

Expand All @@ -463,7 +469,7 @@ mod tests {
}

#[tokio::test]
async fn single_partition() {
async fn log_single_partition() {
let template = temp_file();

let config = FileSinkConfig {
Expand All @@ -480,7 +486,7 @@ mod tests {

let (input, _events) = random_lines_with_stream(100, 64, None);

run_assert_log_sink(config, input.clone()).await;
run_assert_log_sink(&config, input.clone()).await;

let output = lines_from_file(template);
for (input, output) in input.into_iter().zip(output) {
Expand All @@ -489,7 +495,7 @@ mod tests {
}

#[tokio::test]
async fn single_partition_gzip() {
async fn log_single_partition_gzip() {
let template = temp_file();

let config = FileSinkConfig {
Expand All @@ -506,7 +512,7 @@ mod tests {

let (input, _) = random_lines_with_stream(100, 64, None);

run_assert_log_sink(config, input.clone()).await;
run_assert_log_sink(&config, input.clone()).await;

let output = lines_from_gzip_file(template);
for (input, output) in input.into_iter().zip(output) {
Expand All @@ -515,7 +521,7 @@ mod tests {
}

#[tokio::test]
async fn single_partition_zstd() {
async fn log_single_partition_zstd() {
let template = temp_file();

let config = FileSinkConfig {
Expand All @@ -532,7 +538,7 @@ mod tests {

let (input, _) = random_lines_with_stream(100, 64, None);

run_assert_log_sink(config, input.clone()).await;
run_assert_log_sink(&config, input.clone()).await;

let output = lines_from_zstd_file(template);
for (input, output) in input.into_iter().zip(output) {
Expand All @@ -541,7 +547,7 @@ mod tests {
}

#[tokio::test]
async fn many_partitions() {
async fn log_many_partitions() {
let directory = temp_dir();

let mut template = directory.to_string_lossy().to_string();
Expand Down Expand Up @@ -579,7 +585,7 @@ mod tests {
input[7].as_mut_log().insert("date", "2019-29-07");
input[7].as_mut_log().insert("level", "error");

run_assert_sink(config, input.clone().into_iter()).await;
run_assert_sink(&config, input.clone().into_iter()).await;

let output = [
lines_from_file(directory.join("warnings-2019-26-07.log")),
Expand Down Expand Up @@ -626,7 +632,7 @@ mod tests {
}

#[tokio::test]
async fn reopening() {
async fn log_reopening() {
trace_init();

let template = temp_file();
Expand Down Expand Up @@ -683,17 +689,137 @@ mod tests {
sink_handle.await.unwrap();
}

async fn run_assert_log_sink(config: FileSinkConfig, events: Vec<String>) {
#[tokio::test]
async fn metric_single_partition() {
let template = temp_file();

let config = FileSinkConfig {
path: template.clone().try_into().unwrap(),
idle_timeout: default_idle_timeout(),
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: FileInternalMetricsConfig {
include_file_tag: true,
},
};

let (input, _events) = random_metrics_with_stream(100, None, None);

run_assert_sink(&config, input.clone().into_iter()).await;

let output = lines_from_file(template);
for (input, output) in input.into_iter().zip(output) {
let metric_name = input.as_metric().name();
assert!(output.contains(metric_name));
}
}

#[tokio::test]
async fn metric_many_partitions() {
let directory = temp_dir();

let format = "%Y-%m-%d-%H-%M-%S";
let mut template = directory.to_string_lossy().to_string();
template.push_str(&format!("/{}.log", format));

let config = FileSinkConfig {
path: template.try_into().unwrap(),
idle_timeout: default_idle_timeout(),
encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: FileInternalMetricsConfig {
include_file_tag: true,
},
};

let metric_count = 3;
let timestamp = Utc::now().trunc_subsecs(3);
let timestamp_offset = Duration::from_secs(1);

let (input, _events) = random_metrics_with_stream_timestamp(
metric_count,
None,
None,
timestamp,
timestamp_offset,
);

run_assert_sink(&config, input.clone().into_iter()).await;

let output = (0..metric_count).map(|index| {
let expected_timestamp = timestamp + (timestamp_offset * index as u32);
let expected_filename =
directory.join(format!("{}.log", expected_timestamp.format(format)));

lines_from_file(expected_filename)
});
for (input, output) in input.iter().zip(output) {
// The format will partition by second and metrics are a second apart.
assert_eq!(
output.len(),
1,
"Expected the output file to contain one metric"
);
let output = &output[0];

let metric_name = input.as_metric().name();
assert!(output.contains(metric_name));
}
}

#[tokio::test]
async fn trace_single_partition() {
let template = temp_file();

let config = FileSinkConfig {
path: template.clone().try_into().unwrap(),
idle_timeout: default_idle_timeout(),
encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
compression: Compression::None,
acknowledgements: Default::default(),
timezone: Default::default(),
internal_metrics: FileInternalMetricsConfig {
include_file_tag: true,
},
};

let (input, _events) = random_lines_with_stream(100, 64, None);

run_assert_trace_sink(&config, input.clone()).await;

let output = lines_from_file(template);
for (input, output) in input.iter().zip(output) {
assert!(output.contains(input));
}
}

async fn run_assert_log_sink(config: &FileSinkConfig, events: Vec<String>) {
run_assert_sink(
config,
events.into_iter().map(LogEvent::from).map(Event::Log),
)
.await;
}

async fn run_assert_sink(config: FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
async fn run_assert_trace_sink(config: &FileSinkConfig, events: Vec<String>) {
run_assert_sink(
config,
events
.into_iter()
.map(LogEvent::from)
.map(TraceEvent::from)
.map(Event::Trace),
)
.await;
}

async fn run_assert_sink(config: &FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
assert_sink_compliance(&FILE_SINK_TAGS, async move {
let sink = FileSink::new(&config, SinkContext::default()).unwrap();
let sink = FileSink::new(config, SinkContext::default()).unwrap();
VectorSink::from_event_streamsink(sink)
.run(Box::pin(stream::iter(events.map(Into::into))))
.await
Expand Down
32 changes: 29 additions & 3 deletions src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{
task::{ready, Context, Poll},
};

use chrono::{SubsecRound, Utc};
use chrono::{DateTime, SubsecRound, Utc};
use flate2::read::MultiGzDecoder;
use futures::{stream, task::noop_waker_ref, FutureExt, SinkExt, Stream, StreamExt, TryStreamExt};
use openssl::ssl::{SslConnector, SslFiletype, SslMethod, SslVerifyMode};
Expand Down Expand Up @@ -286,10 +286,36 @@ pub fn random_metrics_with_stream(
batch: Option<BatchNotifier>,
tags: Option<MetricTags>,
) -> (Vec<Event>, impl Stream<Item = EventArray>) {
let timestamp = Utc::now().trunc_subsecs(3);
random_metrics_with_stream_timestamp(
count,
batch,
tags,
Utc::now().trunc_subsecs(3),
std::time::Duration::from_secs(2),
)
}

/// Generates event metrics with the provided tags and timestamp.
///
/// # Parameters
/// - `count`: the number of metrics to generate
/// - `batch`: the batch notifier to use with the stream
/// - `tags`: the tags to apply to each metric event
/// - `timestamp`: the timestamp to use for each metric event
/// - `timestamp_offset`: the offset from the `timestamp` to use for each additional metric
///
/// # Returns
/// A tuple of the generated metric events and the stream of the generated events
pub fn random_metrics_with_stream_timestamp(
count: usize,
batch: Option<BatchNotifier>,
tags: Option<MetricTags>,
timestamp: DateTime<Utc>,
timestamp_offset: std::time::Duration,
) -> (Vec<Event>, impl Stream<Item = EventArray>) {
let events: Vec<_> = (0..count)
.map(|index| {
let ts = timestamp + (std::time::Duration::from_secs(2) * index as u32);
let ts = timestamp + (timestamp_offset * index as u32);
Event::Metric(
Metric::new(
format!("counter_{}", thread_rng().gen::<u32>()),
Expand Down

0 comments on commit dd8ed5d

Please sign in to comment.