Skip to content

Commit f5193cd

Browse files
author
Devdutt Shenoi
authored
feat: sync from dedicated thread (#1168)
1 parent ca4b25a commit f5193cd

File tree

7 files changed

+135
-179
lines changed

7 files changed

+135
-179
lines changed

src/alerts/mod.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use once_cell::sync::Lazy;
2727
use serde_json::Error as SerdeError;
2828
use std::collections::{HashMap, HashSet};
2929
use std::fmt::{self, Display};
30-
use tokio::sync::oneshot::{Receiver, Sender};
30+
use tokio::sync::oneshot::{self, Receiver, Sender};
3131
use tokio::sync::RwLock;
3232
use tokio::task::JoinHandle;
3333
use tracing::{trace, warn};
@@ -733,10 +733,16 @@ impl Alerts {
733733
let store = CONFIG.storage().get_object_store();
734734

735735
for alert in store.get_alerts().await.unwrap_or_default() {
736-
let (handle, rx, tx) =
737-
schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;
738-
739-
self.update_task(alert.id, handle, rx, tx).await;
736+
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
737+
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
738+
let handle = schedule_alert_task(
739+
alert.get_eval_frequency(),
740+
alert.clone(),
741+
inbox_rx,
742+
outbox_tx,
743+
)?;
744+
745+
self.update_task(alert.id, handle, outbox_rx, inbox_tx).await;
740746

741747
map.insert(alert.id, alert);
742748
}

src/handlers/http/alerts.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use actix_web::{
2525
HttpRequest, Responder,
2626
};
2727
use bytes::Bytes;
28+
use tokio::sync::oneshot;
2829
use ulid::Ulid;
2930

3031
use crate::alerts::{
@@ -55,7 +56,14 @@ pub async fn post(
5556
user_auth_for_query(&session_key, &alert.query).await?;
5657

5758
// create scheduled tasks
58-
let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;
59+
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
60+
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
61+
let handle = schedule_alert_task(
62+
alert.get_eval_frequency(),
63+
alert.clone(),
64+
inbox_rx,
65+
outbox_tx,
66+
)?;
5967

6068
// now that we've validated that the user can run this query
6169
// move on to saving the alert in ObjectStore
@@ -67,7 +75,9 @@ pub async fn post(
6775
let alert_bytes = serde_json::to_vec(&alert)?;
6876
store.put_object(&path, Bytes::from(alert_bytes)).await?;
6977

70-
ALERTS.update_task(alert.id, handle, rx, tx).await;
78+
ALERTS
79+
.update_task(alert.id, handle, outbox_rx, inbox_tx)
80+
.await;
7181

7282
Ok(web::Json(alert))
7383
}
@@ -136,7 +146,14 @@ pub async fn modify(
136146
alert.validate().await?;
137147

138148
// modify task
139-
let (handle, rx, tx) = schedule_alert_task(alert.get_eval_frequency(), alert.clone()).await?;
149+
let (outbox_tx, outbox_rx) = oneshot::channel::<()>();
150+
let (inbox_tx, inbox_rx) = oneshot::channel::<()>();
151+
let handle = schedule_alert_task(
152+
alert.get_eval_frequency(),
153+
alert.clone(),
154+
inbox_rx,
155+
outbox_tx,
156+
)?;
140157

141158
// modify on disk
142159
CONFIG
@@ -148,7 +165,9 @@ pub async fn modify(
148165
// modify in memory
149166
ALERTS.update(&alert).await;
150167

151-
ALERTS.update_task(alert.id, handle, rx, tx).await;
168+
ALERTS
169+
.update_task(alert.id, handle, outbox_rx, inbox_tx)
170+
.await;
152171

153172
Ok(web::Json(alert))
154173
}

src/handlers/http/modal/ingest_server.rs

Lines changed: 10 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*
1717
*/
1818

19+
use std::thread;
20+
1921
use super::ingest::ingestor_logstream;
2022
use super::ingest::ingestor_rbac;
2123
use super::ingest::ingestor_role;
@@ -54,7 +56,6 @@ use once_cell::sync::Lazy;
5456
use relative_path::RelativePathBuf;
5557
use serde_json::Value;
5658
use tokio::sync::oneshot;
57-
use tracing::error;
5859
use tracing::info;
5960

6061
/// Metadata associated with this ingestor server
@@ -199,65 +200,21 @@ impl ParseableServer for IngestServer {
199200

200201
migration::run_migration(&CONFIG).await?;
201202

202-
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
203-
sync::run_local_sync().await;
204-
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
205-
sync::object_store_sync().await;
206-
let (
207-
mut remote_conversion_handler,
208-
mut remote_conversion_outbox,
209-
mut remote_conversion_inbox,
210-
) = sync::arrow_conversion().await;
203+
// Run sync on a background thread
204+
let (cancel_tx, cancel_rx) = oneshot::channel();
205+
thread::spawn(|| sync::handler(cancel_rx));
211206

212207
tokio::spawn(airplane::server());
213208

214209
// set the ingestor metadata
215210
set_ingestor_metadata().await?;
216211

217212
// Ingestors shouldn't have to deal with OpenId auth flow
218-
let app = self.start(shutdown_rx, prometheus.clone(), None);
219-
220-
tokio::pin!(app);
221-
loop {
222-
tokio::select! {
223-
e = &mut app => {
224-
// actix server finished .. stop other threads and stop the server
225-
remote_sync_inbox.send(()).unwrap_or(());
226-
localsync_inbox.send(()).unwrap_or(());
227-
remote_conversion_inbox.send(()).unwrap_or(());
228-
if let Err(e) = localsync_handler.await {
229-
error!("Error joining remote_sync_handler: {:?}", e);
230-
}
231-
if let Err(e) = remote_sync_handler.await {
232-
error!("Error joining remote_sync_handler: {:?}", e);
233-
}
234-
if let Err(e) = remote_conversion_handler.await {
235-
error!("Error joining remote_conversion_handler: {:?}", e);
236-
}
237-
return e
238-
},
239-
_ = &mut localsync_outbox => {
240-
// crash the server if localsync fails for any reason
241-
// panic!("Local Sync thread died. Server will fail now!")
242-
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
243-
},
244-
_ = &mut remote_sync_outbox => {
245-
// remote_sync failed, this is recoverable by just starting remote_sync thread again
246-
if let Err(e) = remote_sync_handler.await {
247-
error!("Error joining remote_sync_handler: {:?}", e);
248-
}
249-
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
250-
},
251-
_ = &mut remote_conversion_outbox => {
252-
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
253-
if let Err(e) = remote_conversion_handler.await {
254-
error!("Error joining remote_conversion_handler: {:?}", e);
255-
}
256-
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await;
257-
}
258-
259-
}
260-
}
213+
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
214+
// Cancel sync jobs
215+
cancel_tx.send(()).expect("Cancellation should not fail");
216+
217+
result
261218
}
262219
}
263220

src/handlers/http/modal/query_server.rs

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*
1717
*/
1818

19+
use std::thread;
20+
1921
use crate::alerts::ALERTS;
2022
use crate::correlation::CORRELATIONS;
2123
use crate::handlers::airplane;
@@ -27,10 +29,9 @@ use crate::handlers::http::{self, role};
2729
use crate::handlers::http::{logstream, MAX_EVENT_PAYLOAD_SIZE};
2830
use crate::hottier::HotTierManager;
2931
use crate::rbac::role::Action;
30-
use crate::sync;
3132
use crate::users::dashboards::DASHBOARDS;
3233
use crate::users::filters::FILTERS;
33-
use crate::{analytics, migration, storage};
34+
use crate::{analytics, migration, storage, sync};
3435
use actix_web::web::{resource, ServiceConfig};
3536
use actix_web::{web, Scope};
3637
use actix_web_prometheus::PrometheusMetrics;
@@ -133,44 +134,18 @@ impl ParseableServer for QueryServer {
133134
hot_tier_manager.put_internal_stream_hot_tier().await?;
134135
hot_tier_manager.download_from_s3()?;
135136
};
136-
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
137-
sync::run_local_sync().await;
138-
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
139-
sync::object_store_sync().await;
137+
138+
// Run sync on a background thread
139+
let (cancel_tx, cancel_rx) = oneshot::channel();
140+
thread::spawn(|| sync::handler(cancel_rx));
140141

141142
tokio::spawn(airplane::server());
142-
let app = self.start(shutdown_rx, prometheus.clone(), CONFIG.options.openid());
143143

144-
tokio::pin!(app);
145-
loop {
146-
tokio::select! {
147-
e = &mut app => {
148-
// actix server finished .. stop other threads and stop the server
149-
remote_sync_inbox.send(()).unwrap_or(());
150-
localsync_inbox.send(()).unwrap_or(());
151-
if let Err(e) = localsync_handler.await {
152-
error!("Error joining localsync_handler: {:?}", e);
153-
}
154-
if let Err(e) = remote_sync_handler.await {
155-
error!("Error joining remote_sync_handler: {:?}", e);
156-
}
157-
return e
158-
},
159-
_ = &mut localsync_outbox => {
160-
// crash the server if localsync fails for any reason
161-
// panic!("Local Sync thread died. Server will fail now!")
162-
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
163-
},
164-
_ = &mut remote_sync_outbox => {
165-
// remote_sync failed, this is recoverable by just starting remote_sync thread again
166-
if let Err(e) = remote_sync_handler.await {
167-
error!("Error joining remote_sync_handler: {:?}", e);
168-
}
169-
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
170-
}
144+
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
145+
// Cancel sync jobs
146+
cancel_tx.send(()).expect("Cancellation should not fail");
171147

172-
};
173-
}
148+
result
174149
}
175150
}
176151

src/handlers/http/modal/server.rs

Lines changed: 13 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*
1717
*/
1818

19+
use std::thread;
20+
1921
use crate::alerts::ALERTS;
2022
use crate::analytics;
2123
use crate::correlation::CORRELATIONS;
@@ -130,66 +132,24 @@ impl ParseableServer for Server {
130132
hot_tier_manager.download_from_s3()?;
131133
};
132134

133-
let (localsync_handler, mut localsync_outbox, localsync_inbox) =
134-
sync::run_local_sync().await;
135-
let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) =
136-
sync::object_store_sync().await;
137-
let (
138-
mut remote_conversion_handler,
139-
mut remote_conversion_outbox,
140-
mut remote_conversion_inbox,
141-
) = sync::arrow_conversion().await;
135+
// Run sync on a background thread
136+
let (cancel_tx, cancel_rx) = oneshot::channel();
137+
thread::spawn(|| sync::handler(cancel_rx));
138+
142139
if CONFIG.options.send_analytics {
143140
analytics::init_analytics_scheduler()?;
144141
}
145142

146143
tokio::spawn(handlers::livetail::server());
147144
tokio::spawn(handlers::airplane::server());
148145

149-
let app = self.start(shutdown_rx, prometheus.clone(), CONFIG.options.openid());
150-
151-
tokio::pin!(app);
152-
153-
loop {
154-
tokio::select! {
155-
e = &mut app => {
156-
// actix server finished .. stop other threads and stop the server
157-
remote_sync_inbox.send(()).unwrap_or(());
158-
localsync_inbox.send(()).unwrap_or(());
159-
remote_conversion_inbox.send(()).unwrap_or(());
160-
if let Err(e) = localsync_handler.await {
161-
error!("Error joining remote_sync_handler: {:?}", e);
162-
}
163-
if let Err(e) = remote_sync_handler.await {
164-
error!("Error joining remote_sync_handler: {:?}", e);
165-
}
166-
if let Err(e) = remote_conversion_handler.await {
167-
error!("Error joining remote_conversion_handler: {:?}", e);
168-
}
169-
return e
170-
},
171-
_ = &mut localsync_outbox => {
172-
// crash the server if localsync fails for any reason
173-
// panic!("Local Sync thread died. Server will fail now!")
174-
return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable"))
175-
},
176-
_ = &mut remote_sync_outbox => {
177-
// remote_sync failed, this is recoverable by just starting remote_sync thread again
178-
if let Err(e) = remote_sync_handler.await {
179-
error!("Error joining remote_sync_handler: {:?}", e);
180-
}
181-
(remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await;
182-
},
183-
_ = &mut remote_conversion_outbox => {
184-
// remote_conversion failed, this is recoverable by just starting remote_conversion thread again
185-
if let Err(e) = remote_conversion_handler.await {
186-
error!("Error joining remote_conversion_handler: {:?}", e);
187-
}
188-
(remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await;
189-
}
190-
191-
};
192-
}
146+
let result = self
147+
.start(shutdown_rx, prometheus.clone(), CONFIG.options.openid())
148+
.await;
149+
// Cancel sync jobs
150+
cancel_tx.send(()).expect("Cancellation should not fail");
151+
152+
return result;
193153
}
194154
}
195155

src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ pub use handlers::http::modal::{
5656
use once_cell::sync::Lazy;
5757
use reqwest::{Client, ClientBuilder};
5858

59-
pub const STORAGE_CONVERSION_INTERVAL: u32 = 60;
60-
pub const STORAGE_UPLOAD_INTERVAL: u32 = 30;
59+
pub const STORAGE_CONVERSION_INTERVAL: u64 = 60;
60+
pub const STORAGE_UPLOAD_INTERVAL: u64 = 30;
6161

6262
// A single HTTP client for all outgoing HTTP requests from the parseable server
6363
static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {

0 commit comments

Comments
 (0)