Skip to content

Commit 7ab2dd4

Browse files
AdheipSinghnitisht
andauthored
fix: handle SIGTERM for Distributed and Standalone Mode (#907)
also add readiness probes in helm chart Co-authored-by: Nitish Tiwari <[email protected]>
1 parent 3caa884 commit 7ab2dd4

12 files changed

+229
-32
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Dockerfile

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ FROM gcr.io/distroless/cc-debian12:latest
3131
WORKDIR /parseable
3232

3333
# Copy the static shell into base image.
34-
COPY --from=builder /bin/sh /bin/sh
3534
COPY --from=builder /parseable/target/release/parseable /usr/bin/parseable
3635

37-
CMD ["parseable"]
36+
CMD ["/usr/bin/parseable"]

Dockerfile.debug

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@ WORKDIR /parseable
3232

3333
COPY --from=builder /parseable/target/debug/parseable /usr/bin/parseable
3434

35-
CMD ["parseable"]
35+
CMD ["/usr/bin/parseable"]

helm/templates/ingestor-statefulset.yaml

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,19 @@ spec:
4040
{{- toYaml .Values.parseable.securityContext | nindent 8 }}
4141
image: {{ .Values.parseable.image.repository }}:{{ .Values.parseable.image.tag | default .Chart.AppVersion }}
4242
imagePullPolicy: {{ .Values.parseable.image.pullPolicy }}
43-
command: ["/bin/sh", "-c"]
44-
args: ["parseable s3-store --ingestor-endpoint=${HOSTNAME}.{{ include "parseable.fullname" . }}-ingestor-headless.{{ .Release.Namespace }}.svc.cluster.local:{{ .Values.parseable.highAvailability.ingestor.port }}"]
43+
args:
44+
- /usr/bin/parseable
45+
- s3-store
46+
- --ingestor-endpoint=$(HOSTNAME).{{ include "parseable.fullname" . }}-ingestor-headless.{{ .Release.Namespace }}.svc.cluster.local:{{ .Values.parseable.highAvailability.ingestor.port }}
4547
env:
4648
{{- range $key, $value := .Values.parseable.highAvailability.ingestor.env }}
4749
- name: {{ $key }}
4850
value: {{ tpl $value $ | quote }}
51+
- name: HOSTNAME
52+
valueFrom:
53+
fieldRef:
54+
apiVersion: v1
55+
fieldPath: metadata.name
4956
{{- end }}
5057
{{- range $secret := .Values.parseable.s3ModeSecret }}
5158
{{- range $key := $secret.keys }}
@@ -62,6 +69,10 @@ spec:
6269
value: "ingest"
6370
ports:
6471
- containerPort: {{ .Values.parseable.highAvailability.ingestor.port }}
72+
{{- with .Values.readinessProbe }}
73+
readinessProbe:
74+
{{ toYaml . | nindent 12 }}
75+
{{- end }}
6576
resources:
6677
{{- toYaml .Values.parseable.highAvailability.ingestor.resources | nindent 12 }}
6778
{{- if .Values.parseable.persistence.ingestor.enabled }}

helm/templates/querier-statefulset.yaml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ spec:
4141
{{- toYaml .Values.parseable.securityContext | nindent 8 }}
4242
image: {{ .Values.parseable.image.repository }}:{{ .Values.parseable.image.tag | default .Chart.AppVersion }}
4343
imagePullPolicy: {{ .Values.parseable.image.pullPolicy }}
44-
command: ["parseable"]
45-
args: ["s3-store"]
44+
args:
45+
- /usr/bin/parseable
46+
- s3-store
4647
env:
4748
{{- range $key, $value := .Values.parseable.env }}
4849
- name: {{ $key }}
@@ -57,6 +58,11 @@ spec:
5758
secretKeyRef:
5859
name: {{ $secret.name }}
5960
key: {{ $key }}
61+
- name: HOSTNAME
62+
valueFrom:
63+
fieldRef:
64+
apiVersion: v1
65+
fieldPath: metadata.name
6066
{{- end }}
6167
{{- end }}
6268
- name: P_MODE
@@ -67,6 +73,10 @@ spec:
6773
{{- end }}
6874
ports:
6975
- containerPort: 8000
76+
{{- with .Values.readinessProbe }}
77+
readinessProbe:
78+
{{ toYaml . | nindent 12 }}
79+
{{- end }}
7080
resources:
7181
{{- toYaml .Values.parseable.resources | nindent 12 }}
7282
volumeMounts:

helm/templates/standalone-deployment.yaml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ spec:
3737
# Uncomment to debug
3838
# command: [ "/bin/sh", "-c", "sleep 1000000" ]
3939
{{- if .Values.parseable.local }}
40-
args: ["parseable", "local-store"]
40+
args: ["/usr/bin/parseable", "local-store"]
4141
{{- else }}
42-
args: ["parseable", "s3-store"]
42+
args: ["/usr/bin/parseable", "s3-store"]
4343
{{- end }}
4444
env:
4545
{{- range $key, $value := .Values.parseable.env }}
@@ -56,6 +56,11 @@ spec:
5656
secretKeyRef:
5757
name: {{ $secret.name }}
5858
key: {{ $key }}
59+
- name: HOSTNAME
60+
valueFrom:
61+
fieldRef:
62+
apiVersion: v1
63+
fieldPath: metadata.name
5964
{{- end }}
6065
{{- end }}
6166
{{- else}}
@@ -73,6 +78,10 @@ spec:
7378
{{- end }}
7479
ports:
7580
- containerPort: 8000
81+
{{- with .Values.readinessProbe }}
82+
readinessProbe:
83+
{{ toYaml . | nindent 12 }}
84+
{{- end }}
7685
resources:
7786
{{- toYaml .Values.parseable.resources | nindent 12 }}
7887
volumeMounts:

helm/values.yaml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ parseable:
2323
## Console (UI) is available on the other service (that points to the query pod)
2424
service:
2525
type: ClusterIP
26-
port: 80
26+
port: 80
27+
readinessProbe:
28+
httpGet:
29+
path: /api/v1/readiness
30+
port: 8000
2731
resources:
2832
limits:
2933
cpu: 500m
@@ -104,6 +108,10 @@ parseable:
104108
service:
105109
type: ClusterIP
106110
port: 80
111+
readinessProbe:
112+
httpGet:
113+
path: /api/v1/readiness
114+
port: 8000
107115
resources:
108116
limits:
109117
cpu: 500m

server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ anyhow = { version = "1.0", features = ["backtrace"] }
3636
argon2 = "0.5.0"
3737
async-trait = "0.1"
3838
base64 = "0.22.0"
39+
lazy_static = "1.4"
3940
bytes = "1.4"
4041
byteorder = "1.4.3"
4142
bzip2 = { version = "*", features = ["static"] }

server/src/handlers/http/health_check.rs

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,77 @@
1616
*
1717
*/
1818

19+
use crate::option::CONFIG;
1920
use actix_web::http::StatusCode;
2021
use actix_web::HttpResponse;
22+
use lazy_static::lazy_static;
23+
use std::sync::Arc;
24+
use tokio::signal::unix::{signal, SignalKind};
25+
use tokio::sync::{oneshot, Mutex};
26+
use tokio::time::{sleep, Duration};
2127

22-
use crate::option::CONFIG;
28+
// Create a global variable to store signal status
29+
lazy_static! {
30+
static ref SIGNAL_RECEIVED: Arc<Mutex<bool>> = Arc::new(Mutex::new(false));
31+
}
2332

2433
pub async fn liveness() -> HttpResponse {
2534
HttpResponse::new(StatusCode::OK)
2635
}
2736

37+
pub async fn handle_signals(shutdown_signal: Arc<Mutex<Option<oneshot::Sender<()>>>>) {
38+
let signal_received = SIGNAL_RECEIVED.clone();
39+
40+
let mut sigterm =
41+
signal(SignalKind::terminate()).expect("Failed to set up SIGTERM signal handler");
42+
log::info!("Signal handler task started");
43+
44+
// Block until SIGTERM is received
45+
match sigterm.recv().await {
46+
Some(_) => {
47+
log::info!("Received SIGTERM signal at Readiness Probe Handler");
48+
49+
// Set the shutdown flag to true
50+
let mut shutdown_flag = signal_received.lock().await;
51+
*shutdown_flag = true;
52+
53+
// Trigger graceful shutdown
54+
if let Some(shutdown_sender) = shutdown_signal.lock().await.take() {
55+
let _ = shutdown_sender.send(());
56+
}
57+
58+
// Delay to allow readiness probe to return SERVICE_UNAVAILABLE
59+
let _ = sleep(Duration::from_secs(20)).await;
60+
61+
// Sync to local
62+
crate::event::STREAM_WRITERS.unset_all();
63+
64+
// Sync to S3
65+
if let Err(e) = CONFIG.storage().get_object_store().sync().await {
66+
log::warn!("Failed to sync local data with object store. {:?}", e);
67+
}
68+
69+
log::info!("Local and S3 Sync done, handler SIGTERM completed.");
70+
}
71+
None => {
72+
log::info!("Signal handler received None, indicating an error or end of stream");
73+
}
74+
}
75+
76+
log::info!("Signal handler task completed");
77+
}
78+
2879
pub async fn readiness() -> HttpResponse {
29-
if CONFIG.storage().get_object_store().check().await.is_ok() {
30-
return HttpResponse::new(StatusCode::OK);
80+
// Check if the application has received a shutdown signal
81+
let shutdown_flag = SIGNAL_RECEIVED.lock().await;
82+
if *shutdown_flag {
83+
return HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE);
3184
}
3285

33-
HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE)
86+
// Check the object store connection
87+
if CONFIG.storage().get_object_store().check().await.is_ok() {
88+
HttpResponse::new(StatusCode::OK)
89+
} else {
90+
HttpResponse::new(StatusCode::SERVICE_UNAVAILABLE)
91+
}
3492
}

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use crate::analytics;
1919
use crate::banner;
2020
use crate::handlers::airplane;
21+
use crate::handlers::http::health_check;
2122
use crate::handlers::http::ingest;
2223
use crate::handlers::http::logstream;
2324
use crate::handlers::http::middleware::RouteExt;
@@ -35,6 +36,8 @@ use crate::storage::ObjectStorageError;
3536
use crate::storage::PARSEABLE_ROOT_DIRECTORY;
3637
use crate::sync;
3738

39+
use std::sync::Arc;
40+
3841
use super::server::Server;
3942
use super::ssl_acceptor::get_ssl_acceptor;
4043
use super::IngestorMetadata;
@@ -56,6 +59,7 @@ use bytes::Bytes;
5659
use once_cell::sync::Lazy;
5760
use relative_path::RelativePathBuf;
5861
use serde_json::Value;
62+
use tokio::sync::{oneshot, Mutex};
5963

6064
/// ! have to use a guard before using it
6165
pub static INGESTOR_META: Lazy<IngestorMetadata> =
@@ -91,17 +95,47 @@ impl ParseableServer for IngestServer {
9195
.wrap(cross_origin_config())
9296
};
9397

94-
// concurrent workers equal to number of logical cores
95-
let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get());
98+
// Create a channel to trigger server shutdown
99+
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
100+
let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger)));
101+
102+
// Clone the shutdown signal for the signal handler
103+
let shutdown_signal = server_shutdown_signal.clone();
104+
105+
// Spawn the signal handler task
106+
tokio::spawn(async move {
107+
health_check::handle_signals(shutdown_signal).await;
108+
println!("Received shutdown signal, notifying server to shut down...");
109+
});
96110

97-
if let Some(config) = ssl {
111+
// Create the HTTP server
112+
let http_server = HttpServer::new(create_app_fn)
113+
.workers(num_cpus::get())
114+
.shutdown_timeout(60);
115+
116+
// Start the server with or without TLS
117+
let srv = if let Some(config) = ssl {
98118
http_server
99119
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
100120
.run()
101-
.await?;
102121
} else {
103-
http_server.bind(&CONFIG.parseable.address)?.run().await?;
104-
}
122+
http_server.bind(&CONFIG.parseable.address)?.run()
123+
};
124+
125+
// Graceful shutdown handling
126+
let srv_handle = srv.handle();
127+
128+
tokio::spawn(async move {
129+
// Wait for the shutdown signal
130+
shutdown_rx.await.ok();
131+
132+
// Initiate graceful shutdown
133+
log::info!("Graceful shutdown of HTTP server triggered");
134+
srv_handle.stop(true).await;
135+
});
136+
137+
// Await the server to run and handle shutdown
138+
srv.await?;
105139

106140
Ok(())
107141
}

server/src/handlers/http/modal/query_server.rs

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use crate::handlers::airplane;
2020
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
21+
use crate::handlers::http::health_check;
2122
use crate::handlers::http::logstream::create_internal_stream_if_not_exists;
2223
use crate::handlers::http::middleware::RouteExt;
2324
use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION};
@@ -32,6 +33,7 @@ use actix_web::web::ServiceConfig;
3233
use actix_web::{App, HttpServer};
3334
use async_trait::async_trait;
3435
use std::sync::Arc;
36+
use tokio::sync::{oneshot, Mutex};
3537

3638
use crate::option::CONFIG;
3739

@@ -74,16 +76,46 @@ impl ParseableServer for QueryServer {
7476
.wrap(cross_origin_config())
7577
};
7678

77-
// concurrent workers equal to number of cores on the cpu
78-
let http_server = HttpServer::new(create_app_fn).workers(num_cpus::get());
79-
if let Some(config) = ssl {
79+
// Create a channel to trigger server shutdown
80+
let (shutdown_trigger, shutdown_rx) = oneshot::channel::<()>();
81+
let server_shutdown_signal = Arc::new(Mutex::new(Some(shutdown_trigger)));
82+
83+
// Clone the shutdown signal for the signal handler
84+
let shutdown_signal = server_shutdown_signal.clone();
85+
86+
// Spawn the signal handler task
87+
tokio::spawn(async move {
88+
health_check::handle_signals(shutdown_signal).await;
89+
});
90+
91+
// Create the HTTP server
92+
let http_server = HttpServer::new(create_app_fn)
93+
.workers(num_cpus::get())
94+
.shutdown_timeout(120);
95+
96+
// Start the server with or without TLS
97+
let srv = if let Some(config) = ssl {
8098
http_server
8199
.bind_rustls_0_22(&CONFIG.parseable.address, config)?
82100
.run()
83-
.await?;
84101
} else {
85-
http_server.bind(&CONFIG.parseable.address)?.run().await?;
86-
}
102+
http_server.bind(&CONFIG.parseable.address)?.run()
103+
};
104+
105+
// Graceful shutdown handling
106+
let srv_handle = srv.handle();
107+
108+
tokio::spawn(async move {
109+
// Wait for the shutdown signal
110+
shutdown_rx.await.ok();
111+
112+
// Initiate graceful shutdown
113+
log::info!("Graceful shutdown of HTTP server triggered");
114+
srv_handle.stop(true).await;
115+
});
116+
117+
// Await the server to run and handle shutdown
118+
srv.await?;
87119

88120
Ok(())
89121
}

0 commit comments

Comments
 (0)