From bd7b96d34ec346b4ff601543348d7cc43c2370ff Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Wed, 8 Jan 2025 02:11:39 +0000 Subject: [PATCH 01/12] Panic if the components failed to gracefully shutdown in time --- src/topology/running.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index dbd6465883b79..79bff098221d4 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -152,9 +152,10 @@ impl RunningTopology { components = ?remaining_components, "Failed to gracefully shut down in time. Killing components." ); - }) as future::BoxFuture<'static, ()> + Result::Err(()) + }) as future::BoxFuture<'static, Result<(), ()>> } else { - Box::pin(future::pending()) as future::BoxFuture<'static, ()> + Box::pin(future::pending()) as future::BoxFuture<'static, Result<(), ()>> }; // Reports in intervals which components are still running. @@ -190,19 +191,23 @@ impl RunningTopology { }; // Finishes once all tasks have shutdown. - let success = futures::future::join_all(wait_handles).map(|_| ()); + let success = futures::future::join_all(wait_handles).map(|_| Result::Ok(())); // Aggregate future that ends once anything detects that all tasks have shutdown. let shutdown_complete_future = future::select_all(vec![ - Box::pin(timeout) as future::BoxFuture<'static, ()>, - Box::pin(reporter) as future::BoxFuture<'static, ()>, - Box::pin(success) as future::BoxFuture<'static, ()>, + Box::pin(timeout) as future::BoxFuture<'static, Result<(), ()>>, + Box::pin(reporter) as future::BoxFuture<'static, Result<(), ()>>, + Box::pin(success) as future::BoxFuture<'static, Result<(), ()>>, ]); // Now kick off the shutdown process by shutting down the sources. - let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline); + let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline).map(|_| Result::Ok(())); - futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ()) + futures::future::join(source_shutdown_complete, shutdown_complete_future) + .map(|xy| match xy.1.0 { + Result::Err(_) => panic!("alexj's panic: failed to gracefully shutdown in time"), + Result::Ok(_) => (), + }) } /// Attempts to load a new configuration and update this running topology. From 62e70a482f7586cdd631127c87b93f8ec2433e39 Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Wed, 8 Jan 2025 02:30:01 +0000 Subject: [PATCH 02/12] try to fix the compile error --- src/topology/running.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/topology/running.rs b/src/topology/running.rs index 79bff098221d4..1693e08fca587 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -188,6 +188,7 @@ impl RunningTopology { "Shutting down... Waiting on running components." ); } + Result::Ok(()) }; // Finishes once all tasks have shutdown. From c445adb549acf76c3bb9691995809d04566dc810 Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Wed, 8 Jan 2025 02:36:43 +0000 Subject: [PATCH 03/12] fix more compile errors/warnings - remove unreachable code, specify generic types --- src/topology/running.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index 1693e08fca587..47d56e1775c73 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -188,7 +188,6 @@ impl RunningTopology { "Shutting down... Waiting on running components." ); } - Result::Ok(()) }; // Finishes once all tasks have shutdown. @@ -202,7 +201,7 @@ impl RunningTopology { ]); // Now kick off the shutdown process by shutting down the sources. - let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline).map(|_| Result::Ok(())); + let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline).map(|_| Result::<(), ()>::Ok(())); futures::future::join(source_shutdown_complete, shutdown_complete_future) .map(|xy| match xy.1.0 { From 3d28e262e3e1366e9989032d811c2ba60b290000 Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Wed, 8 Jan 2025 17:32:47 +0000 Subject: [PATCH 04/12] try to fix compile error again --- src/topology/running.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index 47d56e1775c73..0cd0cd646bdb6 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -196,7 +196,7 @@ impl RunningTopology { // Aggregate future that ends once anything detects that all tasks have shutdown. let shutdown_complete_future = future::select_all(vec![ Box::pin(timeout) as future::BoxFuture<'static, Result<(), ()>>, - Box::pin(reporter) as future::BoxFuture<'static, Result<(), ()>>, + Box::pin(reporter) as future::BoxFuture<'static, ()>, Box::pin(success) as future::BoxFuture<'static, Result<(), ()>>, ]); From 4e290f91358eb734cdd83168418ccc24a10dc31a Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Wed, 8 Jan 2025 17:40:28 +0000 Subject: [PATCH 05/12] map the infinite-loop future to return a Result instead --- src/topology/running.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index 0cd0cd646bdb6..9a5c302185912 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -196,7 +196,7 @@ impl RunningTopology { // Aggregate future that ends once anything detects that all tasks have shutdown. let shutdown_complete_future = future::select_all(vec![ Box::pin(timeout) as future::BoxFuture<'static, Result<(), ()>>, - Box::pin(reporter) as future::BoxFuture<'static, ()>, + Box::pin(reporter.map(|()| Result::<(), ()>::Ok(()))) as future::BoxFuture<'static, Result<(), ()>>, Box::pin(success) as future::BoxFuture<'static, Result<(), ()>>, ]); From 2fe231af5b85f7cfcc48a7d37ff86b12a8c37af7 Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Wed, 8 Jan 2025 19:49:05 +0000 Subject: [PATCH 06/12] clean up the panic msg and variable names --- src/topology/running.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index 9a5c302185912..f0f0a40537ff8 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -203,9 +203,10 @@ impl RunningTopology { // Now kick off the shutdown process by shutting down the sources. let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline).map(|_| Result::<(), ()>::Ok(())); + // Panic to ensure that Vector returns a non-zero exit code when it's unable to gracefully shutdown futures::future::join(source_shutdown_complete, shutdown_complete_future) - .map(|xy| match xy.1.0 { - Result::Err(_) => panic!("alexj's panic: failed to gracefully shutdown in time"), + .map(|futures| match futures.1.0 { + Result::Err(_) => panic!("failed to gracefully shutdown in time, panic to force non-zero exit code"), Result::Ok(_) => (), }) } From ebafc3835f53da9c9f75b130b6ddd77012353d0b Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Thu, 9 Jan 2025 21:38:39 +0000 Subject: [PATCH 07/12] add list of components that failed to shut down in time to panic msg --- src/topology/running.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index f0f0a40537ff8..4248c205d85a6 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -152,7 +152,7 @@ impl RunningTopology { components = ?remaining_components, "Failed to gracefully shut down in time. Killing components." ); - Result::Err(()) + Result::Err(remaining_components) }) as future::BoxFuture<'static, Result<(), ()>> } else { Box::pin(future::pending()) as future::BoxFuture<'static, Result<(), ()>> @@ -195,18 +195,18 @@ impl RunningTopology { // Aggregate future that ends once anything detects that all tasks have shutdown. let shutdown_complete_future = future::select_all(vec![ - Box::pin(timeout) as future::BoxFuture<'static, Result<(), ()>>, - Box::pin(reporter.map(|()| Result::<(), ()>::Ok(()))) as future::BoxFuture<'static, Result<(), ()>>, - Box::pin(success) as future::BoxFuture<'static, Result<(), ()>>, + Box::pin(timeout) as future::BoxFuture<'static, Result<(), String>>, + Box::pin(reporter.map(|()| Result::<(), String>::Ok(()))) as future::BoxFuture<'static, Result<(), String>>, + Box::pin(success) as future::BoxFuture<'static, Result<(), String>>, ]); // Now kick off the shutdown process by shutting down the sources. - let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline).map(|_| Result::<(), ()>::Ok(())); + let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline).map(|_| Result::<(), String>::Ok(())); // Panic to ensure that Vector returns a non-zero exit code when it's unable to gracefully shutdown futures::future::join(source_shutdown_complete, shutdown_complete_future) .map(|futures| match futures.1.0 { - Result::Err(_) => panic!("failed to gracefully shutdown in time, panic to force non-zero exit code"), + Result::Err(s) => panic!(format!("failed to gracefully shutdown in time, panic to force non-zero exit code: {s}")), Result::Ok(_) => (), }) } From 00de9d8e2dc81c832c1d8e30f0c96b427e804c5b Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Thu, 9 Jan 2025 21:43:54 +0000 Subject: [PATCH 08/12] fix the panic format string syntax --- src/topology/running.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index 4248c205d85a6..fedf749ff8521 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -206,7 +206,7 @@ impl RunningTopology { // Panic to ensure that Vector returns a non-zero exit code when it's unable to gracefully shutdown futures::future::join(source_shutdown_complete, shutdown_complete_future) .map(|futures| match futures.1.0 { - Result::Err(s) => panic!(format!("failed to gracefully shutdown in time, panic to force non-zero exit code: {s}")), + Result::Err(s) => panic!("failed to gracefully shutdown in time, panic to force non-zero exit code. remaining components: {}", &s), Result::Ok(_) => (), }) } From 67fb2dc01aa02eeafcea28a6725430da024df5aa Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Thu, 9 Jan 2025 21:49:21 +0000 Subject: [PATCH 09/12] fix a compile error --- src/topology/running.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index fedf749ff8521..8347d77682e0e 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -153,9 +153,9 @@ impl RunningTopology { "Failed to gracefully shut down in time. Killing components." ); Result::Err(remaining_components) - }) as future::BoxFuture<'static, Result<(), ()>> + }) as future::BoxFuture<'static, Result<(), String>> } else { - Box::pin(future::pending()) as future::BoxFuture<'static, Result<(), ()>> + Box::pin(future::pending()) as future::BoxFuture<'static, Result<(), String>> }; // Reports in intervals which components are still running. From 290f56e37c06ba49a435f35586a63a7d64d47612 Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Sat, 11 Jan 2025 00:41:22 +0000 Subject: [PATCH 10/12] add some debug logs to check my hypothesis: when the buffer reader is initialized and then catches up to the writer, it waits for the writer to notify it (but the writer will never write more data in the vector buffer recovery topology/pipeline) --- lib/vector-buffers/src/variants/disk_v2/reader.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/vector-buffers/src/variants/disk_v2/reader.rs b/lib/vector-buffers/src/variants/disk_v2/reader.rs index 8aea89645ca64..de1dbdbd19038 100644 --- a/lib/vector-buffers/src/variants/disk_v2/reader.rs +++ b/lib/vector-buffers/src/variants/disk_v2/reader.rs @@ -977,6 +977,7 @@ where if self.ledger.is_writer_done() { let total_buffer_size = self.ledger.get_total_buffer_size(); if total_buffer_size == 0 { + debug!("writer is done and total buffer size is 0"); return Ok(None); } } @@ -1074,6 +1075,7 @@ where continue; } + debug!("reader is on writer's current data file: waiting for writer to wake the reader"); self.ledger.wait_for_writer().await; } else { debug!( From e37a2f31a014474d1130e3bdc5df42cb066d67c1 Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Tue, 14 Jan 2025 00:58:17 +0000 Subject: [PATCH 11/12] tweak debug log msgs --- lib/vector-buffers/src/variants/disk_v2/reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/vector-buffers/src/variants/disk_v2/reader.rs b/lib/vector-buffers/src/variants/disk_v2/reader.rs index de1dbdbd19038..0464ca3c633aa 100644 --- a/lib/vector-buffers/src/variants/disk_v2/reader.rs +++ b/lib/vector-buffers/src/variants/disk_v2/reader.rs @@ -977,7 +977,7 @@ where if self.ledger.is_writer_done() { let total_buffer_size = self.ledger.get_total_buffer_size(); if total_buffer_size == 0 { - debug!("writer is done and total buffer size is 0"); + debug!("buffer writer is done and buffer is empty") return Ok(None); } } @@ -1075,7 +1075,7 @@ where continue; } - debug!("reader is on writer's current data file: waiting for writer to wake the reader"); + debug!("waiting for buffer writer to wake the reader"); self.ledger.wait_for_writer().await; } else { debug!( From b05ef5581d38cbf408292b08c0357ce6370393a5 Mon Sep 17 00:00:00 2001 From: Alexander Jiang Date: Tue, 14 Jan 2025 18:19:08 +0000 Subject: [PATCH 12/12] fix missing semicolon --- lib/vector-buffers/src/variants/disk_v2/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/vector-buffers/src/variants/disk_v2/reader.rs b/lib/vector-buffers/src/variants/disk_v2/reader.rs index 0464ca3c633aa..70d0fc74e3d02 100644 --- a/lib/vector-buffers/src/variants/disk_v2/reader.rs +++ b/lib/vector-buffers/src/variants/disk_v2/reader.rs @@ -977,7 +977,7 @@ where if self.ledger.is_writer_done() { let total_buffer_size = self.ledger.get_total_buffer_size(); if total_buffer_size == 0 { - debug!("buffer writer is done and buffer is empty") + debug!("buffer writer is done and buffer is empty"); return Ok(None); } }