Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return non-zero exit code for non graceful shutdown #8

Merged
22 changes: 14 additions & 8 deletions src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,10 @@ impl RunningTopology {
components = ?remaining_components,
"Failed to gracefully shut down in time. Killing components."
);
}) as future::BoxFuture<'static, ()>
Result::Err(())
}) as future::BoxFuture<'static, Result<(), ()>>
} else {
Box::pin(future::pending()) as future::BoxFuture<'static, ()>
Box::pin(future::pending()) as future::BoxFuture<'static, Result<(), ()>>
};

// Reports in intervals which components are still running.
Expand Down Expand Up @@ -190,19 +191,24 @@ impl RunningTopology {
};

// Finishes once all tasks have shutdown.
let success = futures::future::join_all(wait_handles).map(|_| ());
let success = futures::future::join_all(wait_handles).map(|_| Result::Ok(()));

// Aggregate future that ends once anything detects that all tasks have shutdown.
let shutdown_complete_future = future::select_all(vec![
Box::pin(timeout) as future::BoxFuture<'static, ()>,
Box::pin(reporter) as future::BoxFuture<'static, ()>,
Box::pin(success) as future::BoxFuture<'static, ()>,
Box::pin(timeout) as future::BoxFuture<'static, Result<(), ()>>,
Box::pin(reporter.map(|()| Result::<(), ()>::Ok(()))) as future::BoxFuture<'static, Result<(), ()>>,
Box::pin(success) as future::BoxFuture<'static, Result<(), ()>>,
]);

// Now kick off the shutdown process by shutting down the sources.
let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline).map(|_| Result::<(), ()>::Ok(()));

futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
// Panic to ensure that Vector returns a non-zero exit code when it's unable to gracefully shutdown
futures::future::join(source_shutdown_complete, shutdown_complete_future)
.map(|futures| match futures.1.0 {
Result::Err(_) => panic!("failed to gracefully shutdown in time, panic to force non-zero exit code"),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way to avoid a panic! here without changing the interface. Given this seems to be returning a Future, can we wrap a Result in a future instead and unwrap that in the exit handlers?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I guess the output future is bounded to contain a unit type here

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexander-jiang what do u think of panic! ing after you collect all the values instead of as soon as you see the first error? this will let all the components shutdown before we engage the panic trap to crash out when we detect a bad shutdown.

also we should tag this to the team for review more widely

Copy link
Author

@alexander-jiang alexander-jiang Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is the future::join will wait for both the futures (source_shutdown_complete and shutdown_complete_future ) to complete before returning, so we only panic after waiting for both of those futures to complete.

The shutdown_complete_future uses a future::select_all over the futures, which waits until any of specified futures are ready, however, in that case I think it is correct to not wait for all - the select_all will wait until the deadline is exceeded (the timeout future) or until all the tasks are done (the success future), the reporter future is an infinite loop that writes a log message/event.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small correction i think - the select_all handler in https://github.com/discord/vector/pull/8/files#diff-bdafd184965fb9c8ceeb9213965a2d5dbe43037d8f5f3501f3d0fb9d11b4cc8fR197 waits till the first event between - timeout, success or the reporting(currently running) components returns.

which makes me think we should also declare the timeout scenario likely to also be erroneous even if any component was not having a reporting loop for completion? 💭 though this scenario is likely not to manifest from my interpretation

Result::Ok(_) => (),
})
}

/// Attempts to load a new configuration and update this running topology.
Expand Down
Loading