Skip to content

Commit

Permalink
chore(docs): revamp the documentation for end-to-end acknowledgements (
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz authored Oct 31, 2022
1 parent a5d7cea commit 30706de
Show file tree
Hide file tree
Showing 92 changed files with 1,499 additions and 389 deletions.
7 changes: 6 additions & 1 deletion lib/vector-core/src/config/global_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ pub struct GlobalOptions {
#[serde(skip_serializing_if = "crate::serde::skip_serializing_if_default")]
pub proxy: ProxyConfig,

#[configurable(derived)]
/// Controls how acknowledgements are handled for all sinks by default.
///
/// See [End-to-end Acknowledgements][e2e_acks] for more information on how Vector handles event
/// acknowledgement.
///
/// [e2e_acks]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/
#[serde(
default,
deserialize_with = "bool_or_struct",
Expand Down
75 changes: 73 additions & 2 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,82 @@ impl Output {
}
}

/// Configuration of acknowledgement behavior.
/// Source-specific end-to-end acknowledgements configuration.
///
/// This type exists solely to provide a source-specific description of the `acknowledgements`
/// setting, as it is deprecated, and we still need to maintain a way to expose it in the
/// documentation before it's removed while also making sure people know it shouldn't be used.
#[configurable_component]
#[configurable(title = "Controls how acknowledgements are handled by this source.")]
#[configurable(
description = "This setting is **deprecated** in favor of enabling `acknowledgements` at the [global][global_acks] or sink level. \
Enabling or disabling acknowledgements at the source level has **no effect** on acknowledgement behavior.
See [End-to-end Acknowledgements][e2e_acks] for more information on how Vector handles event acknowledgement.
[global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
[e2e_acks]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/"
)]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct SourceAcknowledgementsConfig {
/// Whether or not end-to-end acknowledgements are enabled for this source.
enabled: Option<bool>,
}

impl SourceAcknowledgementsConfig {
pub const DEFAULT: Self = Self { enabled: None };

#[must_use]
pub fn merge_default(&self, other: &Self) -> Self {
let enabled = self.enabled.or(other.enabled);
Self { enabled }
}

pub fn enabled(&self) -> bool {
self.enabled.unwrap_or(false)
}
}

impl From<Option<bool>> for SourceAcknowledgementsConfig {
fn from(enabled: Option<bool>) -> Self {
Self { enabled }
}
}

impl From<bool> for SourceAcknowledgementsConfig {
fn from(enabled: bool) -> Self {
Some(enabled).into()
}
}

impl From<SourceAcknowledgementsConfig> for AcknowledgementsConfig {
fn from(config: SourceAcknowledgementsConfig) -> Self {
Self {
enabled: config.enabled,
}
}
}

/// End-to-end acknowledgements configuration.
#[configurable_component]
#[configurable(title = "Controls how acknowledgements are handled for this sink.")]
#[configurable(
description = "See [End-to-end Acknowledgements][e2e_acks] for more information on how Vector handles event acknowledgement.
[e2e_acks]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/"
)]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct AcknowledgementsConfig {
/// Enables end-to-end acknowledgements.
/// Whether or not end-to-end acknowledgements are enabled.
///
/// When enabled for a sink, any source connected to that sink, where the source supports
/// end-to-end acknowledgements as well, will wait for events to be acknowledged by the sink
/// before acknowledging them at the source.
///
/// Enabling or disabling acknowledgements at the sink level takes precedence over any global
/// [`acknowledgements`][global_acks] configuration.
///
/// [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
enabled: Option<bool>,
}

Expand Down
4 changes: 3 additions & 1 deletion src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use std::{
use indexmap::IndexMap;
pub use vector_config::component::{GenerateConfig, SinkDescription, TransformDescription};
use vector_config::configurable_component;
pub use vector_core::config::{AcknowledgementsConfig, DataType, GlobalOptions, Input, Output};
pub use vector_core::config::{
AcknowledgementsConfig, DataType, GlobalOptions, Input, Output, SourceAcknowledgementsConfig,
};

use crate::{conditions, event::Metric, secrets::SecretBackends, serde::OneOrMany};

Expand Down
7 changes: 5 additions & 2 deletions src/config/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use async_trait::async_trait;
use enum_dispatch::enum_dispatch;
use vector_config::{configurable_component, NamedComponent};
use vector_core::{
config::{AcknowledgementsConfig, GlobalOptions, LogNamespace, Output},
config::{
AcknowledgementsConfig, GlobalOptions, LogNamespace, Output, SourceAcknowledgementsConfig,
},
source::Source,
};

Expand Down Expand Up @@ -140,7 +142,8 @@ impl SourceContext {
}
}

pub fn do_acknowledgements(&self, config: &AcknowledgementsConfig) -> bool {
pub fn do_acknowledgements(&self, config: SourceAcknowledgementsConfig) -> bool {
let config = AcknowledgementsConfig::from(config);
if config.enabled() {
warn!(
message = "Enabling `acknowledgements` on sources themselves is deprecated in favor of enabling them in the sink configuration, and will be removed in a future version.",
Expand Down
6 changes: 3 additions & 3 deletions src/sources/amqp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio_util::codec::FramedRead;
use vector_common::{finalizer::UnorderedFinalizer, internal_event::EventsReceived};
use vector_config::configurable_component;
use vector_core::{
config::{AcknowledgementsConfig, LogNamespace},
config::{LogNamespace, SourceAcknowledgementsConfig},
event::Event,
ByteSizeOf,
};
Expand Down Expand Up @@ -88,7 +88,7 @@ pub struct AmqpSourceConfig {

#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
pub(crate) acknowledgements: AcknowledgementsConfig,
pub(crate) acknowledgements: SourceAcknowledgementsConfig,
}

fn default_queue() -> String {
Expand Down Expand Up @@ -123,7 +123,7 @@ impl AmqpSourceConfig {
impl SourceConfig for AmqpSourceConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);
let acknowledgements = cx.do_acknowledgements(&self.acknowledgements);
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);

amqp_source(self, cx.shutdown, cx.out, log_namespace, acknowledgements).await
}
Expand Down
6 changes: 3 additions & 3 deletions src/sources/aws_kinesis_firehose/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use warp::Filter;
use crate::{
codecs::DecodingConfig,
config::{
AcknowledgementsConfig, GenerateConfig, Output, Resource, SourceConfig, SourceContext,
GenerateConfig, Output, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
},
serde::{bool_or_struct, default_decoding, default_framing_message_based},
tls::{MaybeTlsSettings, TlsEnableableConfig},
Expand Down Expand Up @@ -58,7 +58,7 @@ pub struct AwsKinesisFirehoseConfig {

#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: AcknowledgementsConfig,
acknowledgements: SourceAcknowledgementsConfig,
}

/// Compression scheme for records in a Firehose message.
Expand Down Expand Up @@ -103,7 +103,7 @@ impl SourceConfig for AwsKinesisFirehoseConfig {
LogNamespace::Legacy,
)
.build();
let acknowledgements = cx.do_acknowledgements(&self.acknowledgements);
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);

let svc = filters::firehose(
self.access_key.as_ref().map(|k| k.inner().to_owned()),
Expand Down
6 changes: 4 additions & 2 deletions src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use crate::common::sqs::SqsClientBuilder;
use crate::tls::TlsConfig;
use crate::{
aws::auth::AwsAuthentication,
config::{AcknowledgementsConfig, DataType, Output, ProxyConfig, SourceConfig, SourceContext},
config::{
DataType, Output, ProxyConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
},
line_agg,
serde::bool_or_struct,
};
Expand Down Expand Up @@ -101,7 +103,7 @@ pub struct AwsS3Config {

#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: AcknowledgementsConfig,
acknowledgements: SourceAcknowledgementsConfig,

#[configurable(derived)]
tls_options: Option<TlsConfig>,
Expand Down
6 changes: 3 additions & 3 deletions src/sources/aws_s3/sqs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use vector_core::ByteSizeOf;

use crate::tls::TlsConfig;
use crate::{
config::{log_schema, AcknowledgementsConfig, SourceContext},
config::{log_schema, SourceAcknowledgementsConfig, SourceContext},
event::{BatchNotifier, BatchStatus, LogEvent},
internal_events::{
EventsReceived, SqsMessageDeleteBatchError, SqsMessageDeletePartialError,
Expand Down Expand Up @@ -223,9 +223,9 @@ impl Ingestor {
pub(super) async fn run(
self,
cx: SourceContext,
acknowledgements: AcknowledgementsConfig,
acknowledgements: SourceAcknowledgementsConfig,
) -> Result<(), ()> {
let acknowledgements = cx.do_acknowledgements(&acknowledgements);
let acknowledgements = cx.do_acknowledgements(acknowledgements);
let mut handles = Vec::new();
for _ in 0..self.state.client_concurrency {
let process = IngestorProcess::new(
Expand Down
6 changes: 3 additions & 3 deletions src/sources/aws_sqs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::common::sqs::SqsClientBuilder;
use crate::tls::TlsConfig;
use crate::{
aws::{auth::AwsAuthentication, region::RegionOrEndpoint},
config::{AcknowledgementsConfig, Output, SourceConfig, SourceContext},
config::{Output, SourceAcknowledgementsConfig, SourceConfig, SourceContext},
serde::{bool_or_struct, default_decoding, default_framing_message_based},
sources::aws_sqs::source::SqsSource,
};
Expand Down Expand Up @@ -81,7 +81,7 @@ pub struct AwsSqsConfig {

#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
pub acknowledgements: AcknowledgementsConfig,
pub acknowledgements: SourceAcknowledgementsConfig,

#[configurable(derived)]
pub tls: Option<TlsConfig>,
Expand All @@ -97,7 +97,7 @@ impl SourceConfig for AwsSqsConfig {
LogNamespace::Legacy,
)
.build();
let acknowledgements = cx.do_acknowledgements(&self.acknowledgements);
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);

Ok(Box::pin(
SqsSource {
Expand Down
8 changes: 4 additions & 4 deletions src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use warp::{filters::BoxedFilter, reject::Rejection, reply::Response, Filter, Rep
use crate::{
codecs::{Decoder, DecodingConfig},
config::{
log_schema, AcknowledgementsConfig, DataType, GenerateConfig, Output, Resource,
log_schema, DataType, GenerateConfig, Output, Resource, SourceAcknowledgementsConfig,
SourceConfig, SourceContext,
},
event::Event,
Expand Down Expand Up @@ -104,7 +104,7 @@ pub struct DatadogAgentConfig {

#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: AcknowledgementsConfig,
acknowledgements: SourceAcknowledgementsConfig,
}

impl GenerateConfig for DatadogAgentConfig {
Expand All @@ -115,7 +115,7 @@ impl GenerateConfig for DatadogAgentConfig {
store_api_key: true,
framing: default_framing_message_based(),
decoding: default_decoding(),
acknowledgements: AcknowledgementsConfig::default(),
acknowledgements: SourceAcknowledgementsConfig::default(),
disable_logs: false,
disable_metrics: false,
disable_traces: false,
Expand Down Expand Up @@ -158,7 +158,7 @@ impl SourceConfig for DatadogAgentConfig {
log_namespace,
);
let listener = tls.bind(&self.address).await?;
let acknowledgements = cx.do_acknowledgements(&self.acknowledgements);
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
let filters = source.build_warp_filters(cx.out, acknowledgements, self)?;
let shutdown = cx.shutdown;

Expand Down
8 changes: 5 additions & 3 deletions src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use vector_core::config::LogNamespace;

use super::util::{EncodingConfig, MultilineConfig};
use crate::{
config::{log_schema, AcknowledgementsConfig, DataType, Output, SourceConfig, SourceContext},
config::{
log_schema, DataType, Output, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
},
encoding_transcode::{Decoder, Encoder},
event::{BatchNotifier, BatchStatus, LogEvent},
internal_events::{
Expand Down Expand Up @@ -197,7 +199,7 @@ pub struct FileConfig {

#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: AcknowledgementsConfig,
acknowledgements: SourceAcknowledgementsConfig,
}

fn default_max_line_bytes() -> usize {
Expand Down Expand Up @@ -382,7 +384,7 @@ impl SourceConfig for FileConfig {
}
}

let acknowledgements = cx.do_acknowledgements(&self.acknowledgements);
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);

Ok(file_source(
self,
Expand Down
4 changes: 2 additions & 2 deletions src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use vector_core::config::LogNamespace;
use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
use crate::{
config::{
log_schema, AcknowledgementsConfig, DataType, GenerateConfig, Output, Resource,
log_schema, DataType, GenerateConfig, Output, Resource, SourceAcknowledgementsConfig,
SourceConfig, SourceContext,
},
event::{Event, LogEvent},
Expand Down Expand Up @@ -51,7 +51,7 @@ pub struct FluentConfig {

#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: AcknowledgementsConfig,
acknowledgements: SourceAcknowledgementsConfig,
}

impl GenerateConfig for FluentConfig {
Expand Down
6 changes: 3 additions & 3 deletions src/sources/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use vector_core::config::LogNamespace;

use crate::{
codecs::{Decoder, DecodingConfig},
config::{AcknowledgementsConfig, DataType, Output, SourceConfig, SourceContext},
config::{DataType, Output, SourceAcknowledgementsConfig, SourceConfig, SourceContext},
event::{BatchNotifier, BatchStatus, Event, MaybeAsLogMut, Value},
gcp::{GcpAuthConfig, GcpAuthenticator, Scope, PUBSUB_URL},
internal_events::{
Expand Down Expand Up @@ -182,7 +182,7 @@ pub struct PubsubConfig {

#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
pub acknowledgements: AcknowledgementsConfig,
pub acknowledgements: SourceAcknowledgementsConfig,
}

const fn default_ack_deadline() -> i32 {
Expand Down Expand Up @@ -277,7 +277,7 @@ impl SourceConfig for PubsubConfig {
LogNamespace::Legacy,
)
.build(),
acknowledgements: cx.do_acknowledgements(&self.acknowledgements),
acknowledgements: cx.do_acknowledgements(self.acknowledgements),
shutdown: cx.shutdown,
out: cx.out,
ack_deadline_secs,
Expand Down
6 changes: 3 additions & 3 deletions src/sources/heroku_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use warp::http::{HeaderMap, StatusCode};
use crate::{
codecs::{Decoder, DecodingConfig},
config::{
log_schema, AcknowledgementsConfig, GenerateConfig, Output, Resource, SourceConfig,
log_schema, GenerateConfig, Output, Resource, SourceAcknowledgementsConfig, SourceConfig,
SourceContext,
},
event::{Event, LogEvent},
Expand Down Expand Up @@ -62,7 +62,7 @@ pub struct LogplexConfig {

#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: AcknowledgementsConfig,
acknowledgements: SourceAcknowledgementsConfig,
}

impl GenerateConfig for LogplexConfig {
Expand All @@ -74,7 +74,7 @@ impl GenerateConfig for LogplexConfig {
auth: None,
framing: default_framing_message_based(),
decoding: default_decoding(),
acknowledgements: AcknowledgementsConfig::default(),
acknowledgements: SourceAcknowledgementsConfig::default(),
})
.unwrap()
}
Expand Down
Loading

0 comments on commit 30706de

Please sign in to comment.