Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bridge-indexer] Progress saving policy #19243

Merged
merged 1 commit into from
Sep 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 240 additions & 1 deletion crates/sui-bridge-indexer/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#![allow(dead_code)] // TODO: remove in next PR where integration of ProgressSavingPolicy is done

use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};

use anyhow::{anyhow, Error};
use async_trait::async_trait;
use diesel::dsl::now;
Expand Down Expand Up @@ -29,7 +34,6 @@ impl PgBridgePersistent {
}
}

// TODO: this is shared between SUI and ETH, move to different file.
#[async_trait]
impl Persistent<ProcessedTxnData> for PgBridgePersistent {
async fn write(&self, data: Vec<ProcessedTxnData>) -> Result<(), Error> {
Expand Down Expand Up @@ -178,3 +182,238 @@ impl IndexerProgressStore for PgBridgePersistent {
Ok(())
}
}

#[derive(Debug, Clone)]
pub enum ProgressSavingPolicy {
SaveAfterDuration(SaveAfterDurationPolicy),
OutOfOrderSaveAfterDuration(OutOfOrderSaveAfterDurationPolicy),
}

#[derive(Debug, Clone)]
pub struct SaveAfterDurationPolicy {
duration: tokio::time::Duration,
last_save_time: Arc<Mutex<HashMap<String, Option<tokio::time::Instant>>>>,
}

impl SaveAfterDurationPolicy {
pub fn new(duration: tokio::time::Duration) -> Self {
Self {
duration,
last_save_time: Arc::new(Mutex::new(HashMap::new())),
}
}
}

#[derive(Debug, Clone)]
pub struct OutOfOrderSaveAfterDurationPolicy {
duration: tokio::time::Duration,
last_save_time: Arc<Mutex<HashMap<String, Option<tokio::time::Instant>>>>,
seen: Arc<Mutex<HashMap<String, HashSet<u64>>>>,
next_to_fill: Arc<Mutex<HashMap<String, Option<u64>>>>,
}

impl OutOfOrderSaveAfterDurationPolicy {
pub fn new(duration: tokio::time::Duration) -> Self {
Self {
duration,
last_save_time: Arc::new(Mutex::new(HashMap::new())),
seen: Arc::new(Mutex::new(HashMap::new())),
next_to_fill: Arc::new(Mutex::new(HashMap::new())),
}
}
}

impl ProgressSavingPolicy {
/// If returns Some(progress), it means we should save the progress to DB.
fn cache_progress(
&mut self,
task_name: String,
heights: &[u64],
start_height: u64,
target_height: u64,
) -> Option<u64> {
match self {
ProgressSavingPolicy::SaveAfterDuration(policy) => {
let height = *heights.iter().max().unwrap();
let mut last_save_time_guard = policy.last_save_time.lock().unwrap();
let last_save_time = last_save_time_guard.entry(task_name).or_insert(None);
if height >= target_height {
*last_save_time = Some(tokio::time::Instant::now());
return Some(height);
}
if let Some(v) = last_save_time {
if v.elapsed() >= policy.duration {
*last_save_time = Some(tokio::time::Instant::now());
Some(height)
} else {
None
}
} else {
// update `last_save_time` to now but don't actually save progress
*last_save_time = Some(tokio::time::Instant::now());
None
}
}
ProgressSavingPolicy::OutOfOrderSaveAfterDuration(policy) => {
let mut next_to_fill = {
let mut next_to_fill_guard = policy.next_to_fill.lock().unwrap();
(*next_to_fill_guard
.entry(task_name.clone())
.or_insert(Some(start_height)))
.unwrap()
};
let old_next_to_fill = next_to_fill;
{
let mut seen_guard = policy.seen.lock().unwrap();
let seen = seen_guard
.entry(task_name.clone())
.or_insert(HashSet::new());
seen.extend(heights.iter().cloned());
while seen.remove(&next_to_fill) {
next_to_fill += 1;
}
}
// We made some progress in filling gaps
if old_next_to_fill != next_to_fill {
policy
.next_to_fill
.lock()
.unwrap()
.insert(task_name.clone(), Some(next_to_fill));
}

let mut last_save_time_guard = policy.last_save_time.lock().unwrap();
let last_save_time = last_save_time_guard
.entry(task_name.clone())
.or_insert(None);

// If we have reached the target height, we always save
if next_to_fill > target_height {
*last_save_time = Some(tokio::time::Instant::now());
return Some(next_to_fill - 1);
}
// Regardless of whether we made progress, we should save if we have waited long enough
if let Some(v) = last_save_time {
if v.elapsed() >= policy.duration && next_to_fill > start_height {
*last_save_time = Some(tokio::time::Instant::now());
Some(next_to_fill - 1)
} else {
None
}
} else {
// update `last_save_time` to now but don't actually save progress
*last_save_time = Some(tokio::time::Instant::now());
None
}
}
}
}
}

#[cfg(test)]
mod tests {

use super::*;

#[tokio::test]
async fn test_save_after_duration_policy() {
let duration = tokio::time::Duration::from_millis(100);
let mut policy =
ProgressSavingPolicy::SaveAfterDuration(SaveAfterDurationPolicy::new(duration));
assert_eq!(
policy.cache_progress("task1".to_string(), &[1], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task1".to_string(), &[2], 0, 100),
Some(2)
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task1".to_string(), &[3], 0, 100),
Some(3)
);

assert_eq!(
policy.cache_progress("task2".to_string(), &[4], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[5, 6], 0, 100),
Some(6)
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[8, 7], 0, 100),
Some(8)
);
}

#[tokio::test]
async fn test_out_of_order_save_after_duration_policy() {
let duration = tokio::time::Duration::from_millis(100);
let mut policy = ProgressSavingPolicy::OutOfOrderSaveAfterDuration(
OutOfOrderSaveAfterDurationPolicy::new(duration),
);

assert_eq!(
policy.cache_progress("task1".to_string(), &[0], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task1".to_string(), &[1], 0, 100),
Some(1)
);
assert_eq!(
policy.cache_progress("task1".to_string(), &[3], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task1".to_string(), &[4], 0, 100),
Some(1)
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task1".to_string(), &[2], 0, 100),
Some(4)
);

assert_eq!(
policy.cache_progress("task2".to_string(), &[0], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[1], 0, 100),
Some(1)
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[2], 0, 100),
Some(2)
);
assert_eq!(
policy.cache_progress("task2".to_string(), &[3], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[4], 0, 100),
Some(4)
);

assert_eq!(
policy.cache_progress("task2".to_string(), &[6, 7, 8], 0, 100),
None
);
tokio::time::sleep(duration).await;
assert_eq!(
policy.cache_progress("task2".to_string(), &[5, 9], 0, 100),
Some(9)
);
}
}
Loading