Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): add support for poll watcher #21290

Merged
merged 23 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/poll-watcher-support.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
`Poll Watcher` support for watching config files.
amribm marked this conversation as resolved.
Show resolved Hide resolved

authors: amribm
35 changes: 30 additions & 5 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#![allow(missing_docs)]
use std::{num::NonZeroUsize, path::PathBuf, process::ExitStatus, time::Duration};
use std::{
num::{NonZeroU64, NonZeroUsize},
path::PathBuf,
process::ExitStatus,
time::Duration,
};

use exitcode::ExitCode;
use futures::StreamExt;
Expand All @@ -12,7 +17,7 @@ use crate::extra_context::ExtraContext;
#[cfg(feature = "api")]
use crate::{api, internal_events::ApiStarted};
use crate::{
cli::{handle_config_errors, LogFormat, Opts, RootOpts},
cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod},
config::{self, Config, ConfigPath},
heartbeat,
internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
Expand Down Expand Up @@ -59,9 +64,18 @@ impl ApplicationConfig {
let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
.then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));

let watcher_conf = if opts.watch_config {
Some(watcher_config(
opts.watch_config_method,
opts.watch_config_poll_interval_seconds,
))
} else {
None
};

let config = load_configs(
&config_paths,
opts.watch_config,
watcher_conf,
opts.require_healthy,
opts.allow_empty_config,
graceful_shutdown_duration,
Expand Down Expand Up @@ -466,17 +480,18 @@ pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtim

pub async fn load_configs(
config_paths: &[ConfigPath],
watch_config: bool,
watcher_conf: Option<config::watcher::WatcherConfig>,
require_healthy: Option<bool>,
allow_empty_config: bool,
graceful_shutdown_duration: Option<Duration>,
signal_handler: &mut SignalHandler,
) -> Result<Config, ExitCode> {
let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;

if watch_config {
if let Some(watcher_conf) = watcher_conf {
// Start listening for config changes immediately.
config::watcher::spawn_thread(
watcher_conf,
signal_handler.clone_tx(),
config_paths.iter().map(Into::into),
None,
Expand Down Expand Up @@ -526,3 +541,13 @@ pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64)
);
info!(message = "Log level is enabled.", level = ?level);
}

pub fn watcher_config(
method: WatchConfigMethod,
interval: NonZeroU64,
) -> config::watcher::WatcherConfig {
match method {
WatchConfigMethod::Recommended => config::watcher::WatcherConfig::RecommendedWatcher,
WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
}
}
28 changes: 28 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,26 @@ pub struct RootOpts {
#[arg(short, long, env = "VECTOR_WATCH_CONFIG")]
pub watch_config: bool,

/// Method for configuration watching.
///
/// By default, `vector` uses recommended watcher for host OS
/// - `inotify` for Linux-based systems.
/// - `kqueue` for unix/macos
/// - `ReadDirectoryChangesWatcher` for windows
/// The `poll` watcher can be used in cases where `inotify` doesn't work, e.g., when attaching the configuration via NFS.
#[arg(long, default_value = "inotify", env = "VECTOR_WATCH_METHOD")]
amribm marked this conversation as resolved.
Show resolved Hide resolved
pub watch_config_method: WatchConfigMethod,

/// Poll for changes in the configuration file at the given interval.
///
/// This setting is only applicable if `Poll` is set in `--watch-config-method`.
#[arg(
long,
env = "VECTOR_WATCH_CONFIG_POLL_INTERVAL_SECONDS",
default_value = "30"
)]
pub watch_config_poll_interval_seconds: NonZeroU64,
amribm marked this conversation as resolved.
Show resolved Hide resolved

/// Set the internal log rate limit
#[arg(
short,
Expand Down Expand Up @@ -354,6 +374,14 @@ pub enum LogFormat {
Json,
}

#[derive(clap::ValueEnum, Debug, Clone, Copy, PartialEq, Eq)]
pub enum WatchConfigMethod {
///recommended watcher for host OS
Recommended,
/// works for EFS/NFS like network storage systems
Poll,
}
amribm marked this conversation as resolved.
Show resolved Hide resolved

pub fn handle_config_errors(errors: Vec<String>) -> exitcode::ExitCode {
for error in errors {
error!(message = "Configuration error.", %error);
Expand Down
95 changes: 70 additions & 25 deletions src/config/watcher.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::{path::PathBuf, time::Duration};
use std::{
path::{Path, PathBuf},
time::Duration,
};
use std::{
sync::mpsc::{channel, Receiver},
thread,
};

use notify::{recommended_watcher, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{recommended_watcher, EventKind, RecursiveMode};

use crate::Error;

Expand All @@ -19,11 +22,48 @@ const CONFIG_WATCH_DELAY: std::time::Duration = std::time::Duration::from_secs(1

const RETRY_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

pub enum WatcherConfig {
/// Recommended watcher for the current OS, usually `inotify` for Linux-based systems.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can refactor create_watcher to use the WatchConfigMethod enum directly. AFAIK, due to clap parsing, the WatchConfigMethod needs to have plain enum variants.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since i want that enum to have values about the interval and i don't want to pass the interval in the function parameters.

One thing we can do is we can eliminate is watch config method enum. we can make it as string and add the possible values for clap arguments. using that we can create the WatcherConfig enum

RecommendedWatcher,
/// Poll-based watcher, typically used for watching files on NFS.
PollWatcher(u64),
}

amribm marked this conversation as resolved.
Show resolved Hide resolved
enum Watcher {
/// recommended watcher for os, usually inotify for linux based systems
RecommendedWatcher(notify::RecommendedWatcher),
/// poll based watcher. for watching files from NFS.
amribm marked this conversation as resolved.
Show resolved Hide resolved
PollWatcher(notify::PollWatcher),
}
amribm marked this conversation as resolved.
Show resolved Hide resolved

impl Watcher {
fn add_paths(&mut self, config_paths: &[PathBuf]) -> Result<(), Error> {
for path in config_paths {
self.watch(path, RecursiveMode::Recursive)?;
}
Ok(())
}

fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<(), Error> {
use notify::Watcher as NotifyWatcher;
match self {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any reason to use this here vs at the top like the other imports?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we already have name watcher as enum. it was colliding with watcher from notify.
Renaming the import at the top level user might not able to understand what is NotifyWathcher and we need the methods for notify::Watcher on that specific scope. that's why i used Watcher on the specific scope

Watcher::RecommendedWatcher(watcher) => {
watcher.watch(path, recursive_mode)?;
}
Watcher::PollWatcher(watcher) => {
watcher.watch(path, recursive_mode)?;
}
}
Ok(())
}
}

/// Sends a ReloadFromDisk on config_path changes.
/// Accumulates file changes until no change for given duration has occurred.
/// Has best effort guarantee of detecting all file changes from the end of
/// this function until the main thread stops.
pub fn spawn_thread<'a>(
watcher_conf: WatcherConfig,
signal_tx: crate::signal::SignalTx,
config_paths: impl IntoIterator<Item = &'a PathBuf> + 'a,
delay: impl Into<Option<Duration>>,
Expand All @@ -33,7 +73,7 @@ pub fn spawn_thread<'a>(

// Create watcher now so not to miss any changes happening between
// returning from this function and the thread starting.
let mut watcher = Some(create_watcher(&config_paths)?);
let mut watcher = Some(create_watcher(&watcher_conf, &config_paths)?);

info!("Watching configuration files.");

Expand All @@ -53,7 +93,7 @@ pub fn spawn_thread<'a>(

// We need to read paths to resolve any inode changes that may have happened.
// And we need to do it before raising sighup to avoid missing any change.
if let Err(error) = add_paths(&mut watcher, &config_paths) {
if let Err(error) = watcher.add_paths(&config_paths) {
error!(message = "Failed to read files to watch.", %error);
break;
}
Expand All @@ -72,7 +112,7 @@ pub fn spawn_thread<'a>(

thread::sleep(RETRY_TIMEOUT);

watcher = create_watcher(&config_paths)
watcher = create_watcher(&watcher_conf, &config_paths)
.map_err(|error| error!(message = "Failed to create file watcher.", %error))
.ok();

Expand All @@ -91,28 +131,28 @@ pub fn spawn_thread<'a>(
}

fn create_watcher(
watcher_conf: &WatcherConfig,
config_paths: &[PathBuf],
) -> Result<
(
RecommendedWatcher,
Receiver<Result<notify::Event, notify::Error>>,
),
Error,
> {
) -> Result<(Watcher, Receiver<Result<notify::Event, notify::Error>>), Error> {
info!("Creating configuration file watcher.");

let (sender, receiver) = channel();
let mut watcher = recommended_watcher(sender)?;
add_paths(&mut watcher, config_paths)?;
let mut watcher = match watcher_conf {
WatcherConfig::RecommendedWatcher => {
let recommended_watcher = recommended_watcher(sender)?;
Watcher::RecommendedWatcher(recommended_watcher)
}
WatcherConfig::PollWatcher(interval) => {
let config =
notify::Config::default().with_poll_interval(Duration::from_secs(*interval));
let poll_watcher = notify::PollWatcher::new(sender, config)?;
amribm marked this conversation as resolved.
Show resolved Hide resolved
Watcher::PollWatcher(poll_watcher)
}
};
watcher.add_paths(config_paths)?;
Ok((watcher, receiver))
}

fn add_paths(watcher: &mut RecommendedWatcher, config_paths: &[PathBuf]) -> Result<(), Error> {
for path in config_paths {
watcher.watch(path, RecursiveMode::Recursive)?;
}
Ok(())
}

#[cfg(all(test, unix, not(target_os = "macos")))] // https://github.com/vectordotdev/vector/issues/5000
mod tests {
use super::*;
Expand Down Expand Up @@ -140,12 +180,13 @@ mod tests {
let delay = Duration::from_secs(3);
let dir = temp_dir().to_path_buf();
let file_path = dir.join("vector.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;

std::fs::create_dir(&dir).unwrap();
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[dir], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[dir], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -159,9 +200,10 @@ mod tests {
let delay = Duration::from_secs(3);
let file_path = temp_file();
let mut file = File::create(&file_path).unwrap();
let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[file_path], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[file_path], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -179,8 +221,10 @@ mod tests {
let mut file = File::create(&file_path).unwrap();
std::os::unix::fs::symlink(&file_path, &sym_file).unwrap();

let watcher_conf = WatcherConfig::RecommendedWatcher;

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[sym_file], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sym_file], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand All @@ -195,12 +239,13 @@ mod tests {
let dir = temp_dir().to_path_buf();
let sub_dir = dir.join("sources");
let file_path = sub_dir.join("input.toml");
let watcher_conf = WatcherConfig::RecommendedWatcher;

std::fs::create_dir_all(&sub_dir).unwrap();
let mut file = File::create(&file_path).unwrap();

let (signal_tx, signal_rx) = broadcast::channel(128);
spawn_thread(signal_tx, &[sub_dir], delay).unwrap();
spawn_thread(watcher_conf, signal_tx, &[sub_dir], delay).unwrap();

if !test(&mut file, delay * 5, signal_rx).await {
panic!("Test timed out");
Expand Down
16 changes: 16 additions & 0 deletions website/cue/reference/cli.cue
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,22 @@ cli: {
description: "Watch for changes in the configuration file and reload accordingly"
type: bool: default: false
}
VECTOR_WATCH_CONFIG_METHOD: {
description: """
Method for watching config files.

`recommend` - recommended event based watcher for host OS
`poll` - `poll` watcher can be used in cases where event based watcher doesn't work, e.g., when attaching the configuration via NFS.
"""
type: text: default: recommended
}
VECTOR_WATCH_CONFIG_POLL_INTERVAL_SECONDS: {
description: """
Poll for config changes at given interval
only applicable if `poll` is set in `--watch-config-method`
"""
type: bool: default: false
}
VECTOR_INTERNAL_LOG_RATE_LIMIT: {
description: "Set the internal log rate limit. This limits Vector from emitting identical logs more than once over the given number of seconds."
type: uint: {
Expand Down
Loading