Skip to content

Commit

Permalink
feat(subs): retry + timeout on save (#863)
Browse files Browse the repository at this point in the history
Port and improve retry/timeout.

#### Migration notes

N/A

- [ ] The change comes with new or modified tests
- [ ] Hard-to-understand functions have explanatory comments
- [ ] End-user documentation is updated to reflect the change
  • Loading branch information
michael-0acf4 authored Oct 3, 2024
1 parent 3b2e53d commit bbef981
Show file tree
Hide file tree
Showing 19 changed files with 1,290 additions and 161 deletions.
1 change: 1 addition & 0 deletions .ghjk/lock.json
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,7 @@
"install-protobuf-rust": {
"ty": "denoFile@v1",
"key": "install-protobuf-rust",
"desc": "Enable rust codegen for protoc",
"envKey": "bciqfoerhfsehob65bmr7v5eka43ml4zbbt3aeot7zz5ijlb4oy2lucq"
},
"install-wasi-adapter": {
Expand Down
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions src/substantial/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,3 @@ uuid.workspace = true

protobuf = "3.5.1"
redis = "0.25.4"

[dev-dependencies]
tokio = { workspace = true, features =["full"] }
7 changes: 0 additions & 7 deletions src/substantial/proto-gen.sh

This file was deleted.

20 changes: 18 additions & 2 deletions src/substantial/protocol/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,26 @@ message Start {
google.protobuf.Struct kwargs = 1;
}

message SaveResolved {
string json_result = 1;
}

message SaveRetry {
google.protobuf.Timestamp wait_until = 1;
int32 counter = 2;
}

message SaveFailed {
string err = 1;
}

message Save {
uint32 id = 1;
string value = 2;
int32 counter = 3;
oneof of {
SaveResolved resolved = 10;
SaveRetry retry = 11;
SaveFailed failed = 12;
}
}

message Sleep {
Expand Down
72 changes: 63 additions & 9 deletions src/substantial/src/converters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
use crate::{
backends::Backend,
protocol::{
events::{Event, Records},
events::{Event, Records, SaveFailed, SaveResolved, SaveRetry},
metadata::{metadata::Of, Error, Info, Metadata},
},
};
Expand All @@ -26,6 +26,21 @@ pub enum RunResult {
Err(serde_json::Value),
}

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(tag = "type")]
pub enum SavedValue {
Retry {
counter: i32,
wait_until: DateTime<Utc>,
},
Resolved {
payload: serde_json::Value,
},
Failed {
err: serde_json::Value,
},
}

/// Bridge between protobuf types to Typescript
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(tag = "type")]
Expand All @@ -37,7 +52,7 @@ pub enum OperationEvent {
},
Save {
id: u32,
value: serde_json::Value,
value: SavedValue,
},
Send {
event_name: String,
Expand Down Expand Up @@ -116,6 +131,7 @@ impl TryFrom<Event> for Operation {
type Error = anyhow::Error;
fn try_from(event: Event) -> Result<Self> {
use crate::protocol::events::event::Of;
use crate::protocol::events::save::Of::{Failed, Resolved, Retry};
use crate::protocol::events::stop;

let at = to_datetime_utc(event.at.get_or_default())?;
Expand All @@ -129,7 +145,7 @@ impl TryFrom<Event> for Operation {
.into_iter()
.map(|(k, v)| {
let v = serde_json::from_str(v.string_value())
.with_context(|| format!("Get value from kwargs {k:?}"))?;
.with_context(|| format!("Get value from kwargs {:?}", k))?;
Ok((k, v))
})
.collect::<Result<_>>()?;
Expand Down Expand Up @@ -158,11 +174,28 @@ impl TryFrom<Event> for Operation {
});
}
Of::Save(save) => {
let raw_value = save.clone().value;
let value = serde_json::from_str(&raw_value)?;
let value = save
.clone()
.of
.with_context(|| format!("variant is empty {:?}", save))?;

return Ok(Operation {
at,
event: OperationEvent::Save { id: save.id, value },
event: OperationEvent::Save {
id: save.id,
value: match value {
Resolved(resolved) => SavedValue::Resolved {
payload: serde_json::from_str(&resolved.json_result)?,
},
Retry(retry) => SavedValue::Retry {
counter: retry.counter,
wait_until: to_datetime_utc(&retry.wait_until)?,
},
Failed(failed) => SavedValue::Failed {
err: serde_json::from_str(&failed.err)?,
},
},
},
});
}
Of::Send(send) => {
Expand Down Expand Up @@ -191,14 +224,15 @@ impl TryFrom<Event> for Operation {
}
}

bail!("cannot convert from event {event:?}")
bail!("cannot convert from event {:?}", event)
}
}

impl TryFrom<Operation> for Event {
type Error = anyhow::Error;
fn try_from(operation: Operation) -> Result<Event> {
use crate::protocol::events::event::Of;
use crate::protocol::events::save::Of::{Failed, Resolved, Retry};
use crate::protocol::events::{stop, Event, Save, Send, Sleep, Start, Stop};

let at = to_timestamp(&operation.at);
Expand Down Expand Up @@ -252,7 +286,25 @@ impl TryFrom<Operation> for Event {
OperationEvent::Save { id, value } => {
let save = Save {
id,
value: serde_json::to_string(&value)?,
of: Some(match value {
SavedValue::Resolved { payload } => Resolved(SaveResolved {
json_result: serde_json::to_string(&payload)?,
..Default::default()
}),
SavedValue::Retry {
counter,
wait_until,
} => Retry(SaveRetry {
wait_until: MessageField::some(to_timestamp(&wait_until)),
counter,
..Default::default()
}),
SavedValue::Failed { err } => Failed(SaveFailed {
err: serde_json::to_string(&err)?,
..Default::default()
}),
}),

..Default::default()
};

Expand Down Expand Up @@ -360,12 +412,14 @@ fn to_timestamp(datetime: &DateTime<Utc>) -> Timestamp {
let mut timestamp = Timestamp::new();
timestamp.seconds = datetime.timestamp();
timestamp.nanos = datetime.timestamp_subsec_nanos() as i32;

timestamp
}

fn to_datetime_utc(time: &Timestamp) -> Result<DateTime<Utc>> {
if let Some(converted) = DateTime::from_timestamp(time.seconds, time.nanos as u32) {
return Ok(converted);
}
bail!("Cannot convert timestamp: {time:?}");

bail!("Cannot convert timestamp: {:?}", time);
}
Loading

0 comments on commit bbef981

Please sign in to comment.