From bbef98187819b0c71921321d0e53f139a35dffc1 Mon Sep 17 00:00:00 2001 From: michael-0acf4 Date: Thu, 3 Oct 2024 13:17:00 +0300 Subject: [PATCH] feat(subs): retry + timeout on save (#863) Port and improve retry/timeout. #### Migration notes N/A - [ ] The change comes with new or modified tests - [ ] Hard-to-understand functions have explanatory comments - [ ] End-user documentation is updated to reflect the change --- .ghjk/lock.json | 1 + Cargo.lock | 1 - src/substantial/Cargo.toml | 3 - src/substantial/proto-gen.sh | 7 - src/substantial/protocol/events.proto | 20 +- src/substantial/src/converters.rs | 72 +- src/substantial/src/protocol/events.rs | 843 +++++++++++++++--- src/typegate/engine/runtime.d.ts | 9 +- .../src/runtimes/substantial/agent.ts | 19 +- .../src/runtimes/substantial/deno_context.ts | 173 +++- .../src/runtimes/substantial/types.ts | 8 +- .../src/runtimes/substantial/worker.ts | 3 + tests/runtimes/substantial/common.ts | 176 +++- .../substantial/imports/common_types.ts | 15 +- tests/runtimes/substantial/kv_like_test.ts | 12 +- tests/runtimes/substantial/redis_test.ts | 13 + tests/runtimes/substantial/substantial.py | 27 +- tests/runtimes/substantial/workflow.ts | 45 +- tools/tasks/install.ts | 4 +- 19 files changed, 1290 insertions(+), 161 deletions(-) delete mode 100644 src/substantial/proto-gen.sh diff --git a/.ghjk/lock.json b/.ghjk/lock.json index 09cc9f0668..b385936c69 100644 --- a/.ghjk/lock.json +++ b/.ghjk/lock.json @@ -1083,6 +1083,7 @@ "install-protobuf-rust": { "ty": "denoFile@v1", "key": "install-protobuf-rust", + "desc": "Enable rust codegen for protoc", "envKey": "bciqfoerhfsehob65bmr7v5eka43ml4zbbt3aeot7zz5ijlb4oy2lucq" }, "install-wasi-adapter": { diff --git a/Cargo.lock b/Cargo.lock index 9c99a25a48..fad6bce5fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11097,7 +11097,6 @@ dependencies = [ "redis", "serde 1.0.204", "serde_json", - "tokio", "uuid", ] diff --git a/src/substantial/Cargo.toml b/src/substantial/Cargo.toml index de420edefe..d6b86ab6c1 100644 --- a/src/substantial/Cargo.toml +++ b/src/substantial/Cargo.toml @@ -13,6 +13,3 @@ uuid.workspace = true protobuf = "3.5.1" redis = "0.25.4" - -[dev-dependencies] -tokio = { workspace = true, features =["full"] } diff --git a/src/substantial/proto-gen.sh b/src/substantial/proto-gen.sh deleted file mode 100644 index db30b27f2e..0000000000 --- a/src/substantial/proto-gen.sh +++ /dev/null @@ -1,7 +0,0 @@ -set -eux - -# https://github.com/protocolbuffers/protobuf/issues/13346 - -# must be in sync with substantial/Cargo.toml protobuf -cargo install protobuf-codegen -protoc -I . --rust_out=src/protocol protocol/* diff --git a/src/substantial/protocol/events.proto b/src/substantial/protocol/events.proto index e40e06a743..397ab00f08 100644 --- a/src/substantial/protocol/events.proto +++ b/src/substantial/protocol/events.proto @@ -9,10 +9,26 @@ message Start { google.protobuf.Struct kwargs = 1; } +message SaveResolved { + string json_result = 1; +} + +message SaveRetry { + google.protobuf.Timestamp wait_until = 1; + int32 counter = 2; +} + +message SaveFailed { + string err = 1; +} + message Save { uint32 id = 1; - string value = 2; - int32 counter = 3; + oneof of { + SaveResolved resolved = 10; + SaveRetry retry = 11; + SaveFailed failed = 12; + } } message Sleep { diff --git a/src/substantial/src/converters.rs b/src/substantial/src/converters.rs index f029c227b4..3c146ca090 100644 --- a/src/substantial/src/converters.rs +++ b/src/substantial/src/converters.rs @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize}; use crate::{ backends::Backend, protocol::{ - events::{Event, Records}, + events::{Event, Records, SaveFailed, SaveResolved, SaveRetry}, metadata::{metadata::Of, Error, Info, Metadata}, }, }; @@ -26,6 +26,21 @@ pub enum RunResult { Err(serde_json::Value), } +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(tag = "type")] +pub enum SavedValue { + Retry { + counter: i32, + wait_until: DateTime, + }, + Resolved { + payload: serde_json::Value, + }, + Failed { + err: serde_json::Value, + }, +} + /// Bridge between protobuf types to Typescript #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(tag = "type")] @@ -37,7 +52,7 @@ pub enum OperationEvent { }, Save { id: u32, - value: serde_json::Value, + value: SavedValue, }, Send { event_name: String, @@ -116,6 +131,7 @@ impl TryFrom for Operation { type Error = anyhow::Error; fn try_from(event: Event) -> Result { use crate::protocol::events::event::Of; + use crate::protocol::events::save::Of::{Failed, Resolved, Retry}; use crate::protocol::events::stop; let at = to_datetime_utc(event.at.get_or_default())?; @@ -129,7 +145,7 @@ impl TryFrom for Operation { .into_iter() .map(|(k, v)| { let v = serde_json::from_str(v.string_value()) - .with_context(|| format!("Get value from kwargs {k:?}"))?; + .with_context(|| format!("Get value from kwargs {:?}", k))?; Ok((k, v)) }) .collect::>()?; @@ -158,11 +174,28 @@ impl TryFrom for Operation { }); } Of::Save(save) => { - let raw_value = save.clone().value; - let value = serde_json::from_str(&raw_value)?; + let value = save + .clone() + .of + .with_context(|| format!("variant is empty {:?}", save))?; + return Ok(Operation { at, - event: OperationEvent::Save { id: save.id, value }, + event: OperationEvent::Save { + id: save.id, + value: match value { + Resolved(resolved) => SavedValue::Resolved { + payload: serde_json::from_str(&resolved.json_result)?, + }, + Retry(retry) => SavedValue::Retry { + counter: retry.counter, + wait_until: to_datetime_utc(&retry.wait_until)?, + }, + Failed(failed) => SavedValue::Failed { + err: serde_json::from_str(&failed.err)?, + }, + }, + }, }); } Of::Send(send) => { @@ -191,7 +224,7 @@ impl TryFrom for Operation { } } - bail!("cannot convert from event {event:?}") + bail!("cannot convert from event {:?}", event) } } @@ -199,6 +232,7 @@ impl TryFrom for Event { type Error = anyhow::Error; fn try_from(operation: Operation) -> Result { use crate::protocol::events::event::Of; + use crate::protocol::events::save::Of::{Failed, Resolved, Retry}; use crate::protocol::events::{stop, Event, Save, Send, Sleep, Start, Stop}; let at = to_timestamp(&operation.at); @@ -252,7 +286,25 @@ impl TryFrom for Event { OperationEvent::Save { id, value } => { let save = Save { id, - value: serde_json::to_string(&value)?, + of: Some(match value { + SavedValue::Resolved { payload } => Resolved(SaveResolved { + json_result: serde_json::to_string(&payload)?, + ..Default::default() + }), + SavedValue::Retry { + counter, + wait_until, + } => Retry(SaveRetry { + wait_until: MessageField::some(to_timestamp(&wait_until)), + counter, + ..Default::default() + }), + SavedValue::Failed { err } => Failed(SaveFailed { + err: serde_json::to_string(&err)?, + ..Default::default() + }), + }), + ..Default::default() }; @@ -360,6 +412,7 @@ fn to_timestamp(datetime: &DateTime) -> Timestamp { let mut timestamp = Timestamp::new(); timestamp.seconds = datetime.timestamp(); timestamp.nanos = datetime.timestamp_subsec_nanos() as i32; + timestamp } @@ -367,5 +420,6 @@ fn to_datetime_utc(time: &Timestamp) -> Result> { if let Some(converted) = DateTime::from_timestamp(time.seconds, time.nanos as u32) { return Ok(converted); } - bail!("Cannot convert timestamp: {time:?}"); + + bail!("Cannot convert timestamp: {:?}", time); } diff --git a/src/substantial/src/protocol/events.rs b/src/substantial/src/protocol/events.rs index 90e510602d..3ca2887936 100644 --- a/src/substantial/src/protocol/events.rs +++ b/src/substantial/src/protocol/events.rs @@ -147,16 +147,399 @@ impl ::protobuf::reflect::ProtobufValue for Start { type RuntimeType = ::protobuf::reflect::rt::RuntimeTypeMessage; } +// @@protoc_insertion_point(message:substantial.protos.events.SaveResolved) +#[derive(PartialEq,Clone,Default,Debug)] +pub struct SaveResolved { + // message fields + // @@protoc_insertion_point(field:substantial.protos.events.SaveResolved.json_result) + pub json_result: ::std::string::String, + // special fields + // @@protoc_insertion_point(special_field:substantial.protos.events.SaveResolved.special_fields) + pub special_fields: ::protobuf::SpecialFields, +} + +impl<'a> ::std::default::Default for &'a SaveResolved { + fn default() -> &'a SaveResolved { + ::default_instance() + } +} + +impl SaveResolved { + pub fn new() -> SaveResolved { + ::std::default::Default::default() + } + + fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { + let mut fields = ::std::vec::Vec::with_capacity(1); + let mut oneofs = ::std::vec::Vec::with_capacity(0); + fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( + "json_result", + |m: &SaveResolved| { &m.json_result }, + |m: &mut SaveResolved| { &mut m.json_result }, + )); + ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( + "SaveResolved", + fields, + oneofs, + ) + } +} + +impl ::protobuf::Message for SaveResolved { + const NAME: &'static str = "SaveResolved"; + + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::Result<()> { + while let Some(tag) = is.read_raw_tag_or_eof()? { + match tag { + 10 => { + self.json_result = is.read_string()?; + }, + tag => { + ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u64 { + let mut my_size = 0; + if !self.json_result.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.json_result); + } + my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); + self.special_fields.cached_size().set(my_size as u32); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::Result<()> { + if !self.json_result.is_empty() { + os.write_string(1, &self.json_result)?; + } + os.write_unknown_fields(self.special_fields.unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn special_fields(&self) -> &::protobuf::SpecialFields { + &self.special_fields + } + + fn mut_special_fields(&mut self) -> &mut ::protobuf::SpecialFields { + &mut self.special_fields + } + + fn new() -> SaveResolved { + SaveResolved::new() + } + + fn clear(&mut self) { + self.json_result.clear(); + self.special_fields.clear(); + } + + fn default_instance() -> &'static SaveResolved { + static instance: SaveResolved = SaveResolved { + json_result: ::std::string::String::new(), + special_fields: ::protobuf::SpecialFields::new(), + }; + &instance + } +} + +impl ::protobuf::MessageFull for SaveResolved { + fn descriptor() -> ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::Lazy::new(); + descriptor.get(|| file_descriptor().message_by_package_relative_name("SaveResolved").unwrap()).clone() + } +} + +impl ::std::fmt::Display for SaveResolved { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for SaveResolved { + type RuntimeType = ::protobuf::reflect::rt::RuntimeTypeMessage; +} + +// @@protoc_insertion_point(message:substantial.protos.events.SaveRetry) +#[derive(PartialEq,Clone,Default,Debug)] +pub struct SaveRetry { + // message fields + // @@protoc_insertion_point(field:substantial.protos.events.SaveRetry.wait_until) + pub wait_until: ::protobuf::MessageField<::protobuf::well_known_types::timestamp::Timestamp>, + // @@protoc_insertion_point(field:substantial.protos.events.SaveRetry.counter) + pub counter: i32, + // special fields + // @@protoc_insertion_point(special_field:substantial.protos.events.SaveRetry.special_fields) + pub special_fields: ::protobuf::SpecialFields, +} + +impl<'a> ::std::default::Default for &'a SaveRetry { + fn default() -> &'a SaveRetry { + ::default_instance() + } +} + +impl SaveRetry { + pub fn new() -> SaveRetry { + ::std::default::Default::default() + } + + fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { + let mut fields = ::std::vec::Vec::with_capacity(2); + let mut oneofs = ::std::vec::Vec::with_capacity(0); + fields.push(::protobuf::reflect::rt::v2::make_message_field_accessor::<_, ::protobuf::well_known_types::timestamp::Timestamp>( + "wait_until", + |m: &SaveRetry| { &m.wait_until }, + |m: &mut SaveRetry| { &mut m.wait_until }, + )); + fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( + "counter", + |m: &SaveRetry| { &m.counter }, + |m: &mut SaveRetry| { &mut m.counter }, + )); + ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( + "SaveRetry", + fields, + oneofs, + ) + } +} + +impl ::protobuf::Message for SaveRetry { + const NAME: &'static str = "SaveRetry"; + + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::Result<()> { + while let Some(tag) = is.read_raw_tag_or_eof()? { + match tag { + 10 => { + ::protobuf::rt::read_singular_message_into_field(is, &mut self.wait_until)?; + }, + 16 => { + self.counter = is.read_int32()?; + }, + tag => { + ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u64 { + let mut my_size = 0; + if let Some(v) = self.wait_until.as_ref() { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint64_size(len) + len; + } + if self.counter != 0 { + my_size += ::protobuf::rt::int32_size(2, self.counter); + } + my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); + self.special_fields.cached_size().set(my_size as u32); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::Result<()> { + if let Some(v) = self.wait_until.as_ref() { + ::protobuf::rt::write_message_field_with_cached_size(1, v, os)?; + } + if self.counter != 0 { + os.write_int32(2, self.counter)?; + } + os.write_unknown_fields(self.special_fields.unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn special_fields(&self) -> &::protobuf::SpecialFields { + &self.special_fields + } + + fn mut_special_fields(&mut self) -> &mut ::protobuf::SpecialFields { + &mut self.special_fields + } + + fn new() -> SaveRetry { + SaveRetry::new() + } + + fn clear(&mut self) { + self.wait_until.clear(); + self.counter = 0; + self.special_fields.clear(); + } + + fn default_instance() -> &'static SaveRetry { + static instance: SaveRetry = SaveRetry { + wait_until: ::protobuf::MessageField::none(), + counter: 0, + special_fields: ::protobuf::SpecialFields::new(), + }; + &instance + } +} + +impl ::protobuf::MessageFull for SaveRetry { + fn descriptor() -> ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::Lazy::new(); + descriptor.get(|| file_descriptor().message_by_package_relative_name("SaveRetry").unwrap()).clone() + } +} + +impl ::std::fmt::Display for SaveRetry { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for SaveRetry { + type RuntimeType = ::protobuf::reflect::rt::RuntimeTypeMessage; +} + +// @@protoc_insertion_point(message:substantial.protos.events.SaveFailed) +#[derive(PartialEq,Clone,Default,Debug)] +pub struct SaveFailed { + // message fields + // @@protoc_insertion_point(field:substantial.protos.events.SaveFailed.err) + pub err: ::std::string::String, + // special fields + // @@protoc_insertion_point(special_field:substantial.protos.events.SaveFailed.special_fields) + pub special_fields: ::protobuf::SpecialFields, +} + +impl<'a> ::std::default::Default for &'a SaveFailed { + fn default() -> &'a SaveFailed { + ::default_instance() + } +} + +impl SaveFailed { + pub fn new() -> SaveFailed { + ::std::default::Default::default() + } + + fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { + let mut fields = ::std::vec::Vec::with_capacity(1); + let mut oneofs = ::std::vec::Vec::with_capacity(0); + fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( + "err", + |m: &SaveFailed| { &m.err }, + |m: &mut SaveFailed| { &mut m.err }, + )); + ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( + "SaveFailed", + fields, + oneofs, + ) + } +} + +impl ::protobuf::Message for SaveFailed { + const NAME: &'static str = "SaveFailed"; + + fn is_initialized(&self) -> bool { + true + } + + fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::Result<()> { + while let Some(tag) = is.read_raw_tag_or_eof()? { + match tag { + 10 => { + self.err = is.read_string()?; + }, + tag => { + ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; + }, + }; + } + ::std::result::Result::Ok(()) + } + + // Compute sizes of nested messages + #[allow(unused_variables)] + fn compute_size(&self) -> u64 { + let mut my_size = 0; + if !self.err.is_empty() { + my_size += ::protobuf::rt::string_size(1, &self.err); + } + my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); + self.special_fields.cached_size().set(my_size as u32); + my_size + } + + fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::Result<()> { + if !self.err.is_empty() { + os.write_string(1, &self.err)?; + } + os.write_unknown_fields(self.special_fields.unknown_fields())?; + ::std::result::Result::Ok(()) + } + + fn special_fields(&self) -> &::protobuf::SpecialFields { + &self.special_fields + } + + fn mut_special_fields(&mut self) -> &mut ::protobuf::SpecialFields { + &mut self.special_fields + } + + fn new() -> SaveFailed { + SaveFailed::new() + } + + fn clear(&mut self) { + self.err.clear(); + self.special_fields.clear(); + } + + fn default_instance() -> &'static SaveFailed { + static instance: SaveFailed = SaveFailed { + err: ::std::string::String::new(), + special_fields: ::protobuf::SpecialFields::new(), + }; + &instance + } +} + +impl ::protobuf::MessageFull for SaveFailed { + fn descriptor() -> ::protobuf::reflect::MessageDescriptor { + static descriptor: ::protobuf::rt::Lazy<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::Lazy::new(); + descriptor.get(|| file_descriptor().message_by_package_relative_name("SaveFailed").unwrap()).clone() + } +} + +impl ::std::fmt::Display for SaveFailed { + fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { + ::protobuf::text_format::fmt(self, f) + } +} + +impl ::protobuf::reflect::ProtobufValue for SaveFailed { + type RuntimeType = ::protobuf::reflect::rt::RuntimeTypeMessage; +} + // @@protoc_insertion_point(message:substantial.protos.events.Save) #[derive(PartialEq,Clone,Default,Debug)] pub struct Save { // message fields // @@protoc_insertion_point(field:substantial.protos.events.Save.id) pub id: u32, - // @@protoc_insertion_point(field:substantial.protos.events.Save.value) - pub value: ::std::string::String, - // @@protoc_insertion_point(field:substantial.protos.events.Save.counter) - pub counter: i32, + // message oneof groups + pub of: ::std::option::Option, // special fields // @@protoc_insertion_point(special_field:substantial.protos.events.Save.special_fields) pub special_fields: ::protobuf::SpecialFields, @@ -173,24 +556,183 @@ impl Save { ::std::default::Default::default() } + // .substantial.protos.events.SaveResolved resolved = 10; + + pub fn resolved(&self) -> &SaveResolved { + match self.of { + ::std::option::Option::Some(save::Of::Resolved(ref v)) => v, + _ => ::default_instance(), + } + } + + pub fn clear_resolved(&mut self) { + self.of = ::std::option::Option::None; + } + + pub fn has_resolved(&self) -> bool { + match self.of { + ::std::option::Option::Some(save::Of::Resolved(..)) => true, + _ => false, + } + } + + // Param is passed by value, moved + pub fn set_resolved(&mut self, v: SaveResolved) { + self.of = ::std::option::Option::Some(save::Of::Resolved(v)) + } + + // Mutable pointer to the field. + pub fn mut_resolved(&mut self) -> &mut SaveResolved { + if let ::std::option::Option::Some(save::Of::Resolved(_)) = self.of { + } else { + self.of = ::std::option::Option::Some(save::Of::Resolved(SaveResolved::new())); + } + match self.of { + ::std::option::Option::Some(save::Of::Resolved(ref mut v)) => v, + _ => panic!(), + } + } + + // Take field + pub fn take_resolved(&mut self) -> SaveResolved { + if self.has_resolved() { + match self.of.take() { + ::std::option::Option::Some(save::Of::Resolved(v)) => v, + _ => panic!(), + } + } else { + SaveResolved::new() + } + } + + // .substantial.protos.events.SaveRetry retry = 11; + + pub fn retry(&self) -> &SaveRetry { + match self.of { + ::std::option::Option::Some(save::Of::Retry(ref v)) => v, + _ => ::default_instance(), + } + } + + pub fn clear_retry(&mut self) { + self.of = ::std::option::Option::None; + } + + pub fn has_retry(&self) -> bool { + match self.of { + ::std::option::Option::Some(save::Of::Retry(..)) => true, + _ => false, + } + } + + // Param is passed by value, moved + pub fn set_retry(&mut self, v: SaveRetry) { + self.of = ::std::option::Option::Some(save::Of::Retry(v)) + } + + // Mutable pointer to the field. + pub fn mut_retry(&mut self) -> &mut SaveRetry { + if let ::std::option::Option::Some(save::Of::Retry(_)) = self.of { + } else { + self.of = ::std::option::Option::Some(save::Of::Retry(SaveRetry::new())); + } + match self.of { + ::std::option::Option::Some(save::Of::Retry(ref mut v)) => v, + _ => panic!(), + } + } + + // Take field + pub fn take_retry(&mut self) -> SaveRetry { + if self.has_retry() { + match self.of.take() { + ::std::option::Option::Some(save::Of::Retry(v)) => v, + _ => panic!(), + } + } else { + SaveRetry::new() + } + } + + // .substantial.protos.events.SaveFailed failed = 12; + + pub fn failed(&self) -> &SaveFailed { + match self.of { + ::std::option::Option::Some(save::Of::Failed(ref v)) => v, + _ => ::default_instance(), + } + } + + pub fn clear_failed(&mut self) { + self.of = ::std::option::Option::None; + } + + pub fn has_failed(&self) -> bool { + match self.of { + ::std::option::Option::Some(save::Of::Failed(..)) => true, + _ => false, + } + } + + // Param is passed by value, moved + pub fn set_failed(&mut self, v: SaveFailed) { + self.of = ::std::option::Option::Some(save::Of::Failed(v)) + } + + // Mutable pointer to the field. + pub fn mut_failed(&mut self) -> &mut SaveFailed { + if let ::std::option::Option::Some(save::Of::Failed(_)) = self.of { + } else { + self.of = ::std::option::Option::Some(save::Of::Failed(SaveFailed::new())); + } + match self.of { + ::std::option::Option::Some(save::Of::Failed(ref mut v)) => v, + _ => panic!(), + } + } + + // Take field + pub fn take_failed(&mut self) -> SaveFailed { + if self.has_failed() { + match self.of.take() { + ::std::option::Option::Some(save::Of::Failed(v)) => v, + _ => panic!(), + } + } else { + SaveFailed::new() + } + } + fn generated_message_descriptor_data() -> ::protobuf::reflect::GeneratedMessageDescriptorData { - let mut fields = ::std::vec::Vec::with_capacity(3); - let mut oneofs = ::std::vec::Vec::with_capacity(0); + let mut fields = ::std::vec::Vec::with_capacity(4); + let mut oneofs = ::std::vec::Vec::with_capacity(1); fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( "id", |m: &Save| { &m.id }, |m: &mut Save| { &mut m.id }, )); - fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( - "value", - |m: &Save| { &m.value }, - |m: &mut Save| { &mut m.value }, + fields.push(::protobuf::reflect::rt::v2::make_oneof_message_has_get_mut_set_accessor::<_, SaveResolved>( + "resolved", + Save::has_resolved, + Save::resolved, + Save::mut_resolved, + Save::set_resolved, )); - fields.push(::protobuf::reflect::rt::v2::make_simpler_field_accessor::<_, _>( - "counter", - |m: &Save| { &m.counter }, - |m: &mut Save| { &mut m.counter }, + fields.push(::protobuf::reflect::rt::v2::make_oneof_message_has_get_mut_set_accessor::<_, SaveRetry>( + "retry", + Save::has_retry, + Save::retry, + Save::mut_retry, + Save::set_retry, )); + fields.push(::protobuf::reflect::rt::v2::make_oneof_message_has_get_mut_set_accessor::<_, SaveFailed>( + "failed", + Save::has_failed, + Save::failed, + Save::mut_failed, + Save::set_failed, + )); + oneofs.push(save::Of::generated_oneof_descriptor_data()); ::protobuf::reflect::GeneratedMessageDescriptorData::new_2::( "Save", fields, @@ -212,11 +754,14 @@ impl ::protobuf::Message for Save { 8 => { self.id = is.read_uint32()?; }, - 18 => { - self.value = is.read_string()?; + 82 => { + self.of = ::std::option::Option::Some(save::Of::Resolved(is.read_message()?)); }, - 24 => { - self.counter = is.read_int32()?; + 90 => { + self.of = ::std::option::Option::Some(save::Of::Retry(is.read_message()?)); + }, + 98 => { + self.of = ::std::option::Option::Some(save::Of::Failed(is.read_message()?)); }, tag => { ::protobuf::rt::read_unknown_or_skip_group(tag, is, self.special_fields.mut_unknown_fields())?; @@ -233,11 +778,21 @@ impl ::protobuf::Message for Save { if self.id != 0 { my_size += ::protobuf::rt::uint32_size(1, self.id); } - if !self.value.is_empty() { - my_size += ::protobuf::rt::string_size(2, &self.value); - } - if self.counter != 0 { - my_size += ::protobuf::rt::int32_size(3, self.counter); + if let ::std::option::Option::Some(ref v) = self.of { + match v { + &save::Of::Resolved(ref v) => { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint64_size(len) + len; + }, + &save::Of::Retry(ref v) => { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint64_size(len) + len; + }, + &save::Of::Failed(ref v) => { + let len = v.compute_size(); + my_size += 1 + ::protobuf::rt::compute_raw_varint64_size(len) + len; + }, + }; } my_size += ::protobuf::rt::unknown_fields_size(self.special_fields.unknown_fields()); self.special_fields.cached_size().set(my_size as u32); @@ -248,11 +803,18 @@ impl ::protobuf::Message for Save { if self.id != 0 { os.write_uint32(1, self.id)?; } - if !self.value.is_empty() { - os.write_string(2, &self.value)?; - } - if self.counter != 0 { - os.write_int32(3, self.counter)?; + if let ::std::option::Option::Some(ref v) = self.of { + match v { + &save::Of::Resolved(ref v) => { + ::protobuf::rt::write_message_field_with_cached_size(10, v, os)?; + }, + &save::Of::Retry(ref v) => { + ::protobuf::rt::write_message_field_with_cached_size(11, v, os)?; + }, + &save::Of::Failed(ref v) => { + ::protobuf::rt::write_message_field_with_cached_size(12, v, os)?; + }, + }; } os.write_unknown_fields(self.special_fields.unknown_fields())?; ::std::result::Result::Ok(()) @@ -272,16 +834,16 @@ impl ::protobuf::Message for Save { fn clear(&mut self) { self.id = 0; - self.value.clear(); - self.counter = 0; + self.of = ::std::option::Option::None; + self.of = ::std::option::Option::None; + self.of = ::std::option::Option::None; self.special_fields.clear(); } fn default_instance() -> &'static Save { static instance: Save = Save { id: 0, - value: ::std::string::String::new(), - counter: 0, + of: ::std::option::Option::None, special_fields: ::protobuf::SpecialFields::new(), }; &instance @@ -305,6 +867,38 @@ impl ::protobuf::reflect::ProtobufValue for Save { type RuntimeType = ::protobuf::reflect::rt::RuntimeTypeMessage; } +/// Nested message and enums of message `Save` +pub mod save { + + #[derive(Clone,PartialEq,Debug)] + #[non_exhaustive] + // @@protoc_insertion_point(oneof:substantial.protos.events.Save.of) + pub enum Of { + // @@protoc_insertion_point(oneof_field:substantial.protos.events.Save.resolved) + Resolved(super::SaveResolved), + // @@protoc_insertion_point(oneof_field:substantial.protos.events.Save.retry) + Retry(super::SaveRetry), + // @@protoc_insertion_point(oneof_field:substantial.protos.events.Save.failed) + Failed(super::SaveFailed), + } + + impl ::protobuf::Oneof for Of { + } + + impl ::protobuf::OneofFull for Of { + fn descriptor() -> ::protobuf::reflect::OneofDescriptor { + static descriptor: ::protobuf::rt::Lazy<::protobuf::reflect::OneofDescriptor> = ::protobuf::rt::Lazy::new(); + descriptor.get(|| ::descriptor().oneof_by_name("of").unwrap()).clone() + } + } + + impl Of { + pub(in super) fn generated_oneof_descriptor_data() -> ::protobuf::reflect::GeneratedOneofDescriptorData { + ::protobuf::reflect::GeneratedOneofDescriptorData::new::("of") + } + } +} + // @@protoc_insertion_point(message:substantial.protos.events.Sleep) #[derive(PartialEq,Clone,Default,Debug)] pub struct Sleep { @@ -1531,89 +2125,113 @@ static file_descriptor_proto_data: &'static [u8] = b"\ \n\x15protocol/events.proto\x12\x19substantial.protos.events\x1a\x1cgoog\ le/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\"8\n\x05\ Start\x12/\n\x06kwargs\x18\x01\x20\x01(\x0b2\x17.google.protobuf.StructR\ - \x06kwargs\"F\n\x04Save\x12\x0e\n\x02id\x18\x01\x20\x01(\rR\x02id\x12\ - \x14\n\x05value\x18\x02\x20\x01(\tR\x05value\x12\x18\n\x07counter\x18\ - \x03\x20\x01(\x05R\x07counter\"w\n\x05Sleep\x12\x0e\n\x02id\x18\x01\x20\ - \x01(\rR\x02id\x120\n\x05start\x18\x02\x20\x01(\x0b2\x1a.google.protobuf\ - .TimestampR\x05start\x12,\n\x03end\x18\x03\x20\x01(\x0b2\x1a.google.prot\ - obuf.TimestampR\x03end\"0\n\x04Send\x12\x12\n\x04name\x18\x01\x20\x01(\t\ - R\x04name\x12\x14\n\x05value\x18\x02\x20\x01(\tR\x05value\"6\n\x04Stop\ - \x12\x10\n\x02ok\x18\x01\x20\x01(\tH\0R\x02ok\x12\x12\n\x03err\x18\x02\ - \x20\x01(\tH\0R\x03errB\x08\n\x06result\"\xd2\x02\n\x05Event\x12*\n\x02a\ - t\x18\x01\x20\x01(\x0b2\x1a.google.protobuf.TimestampR\x02at\x128\n\x05s\ - tart\x18\n\x20\x01(\x0b2\x20.substantial.protos.events.StartH\0R\x05star\ - t\x125\n\x04save\x18\x0b\x20\x01(\x0b2\x1f.substantial.protos.events.Sav\ - eH\0R\x04save\x128\n\x05sleep\x18\x0c\x20\x01(\x0b2\x20.substantial.prot\ - os.events.SleepH\0R\x05sleep\x125\n\x04send\x18\r\x20\x01(\x0b2\x1f.subs\ - tantial.protos.events.SendH\0R\x04send\x125\n\x04stop\x18\x0e\x20\x01(\ - \x0b2\x1f.substantial.protos.events.StopH\0R\x04stopB\x04\n\x02of\"Z\n\ - \x07Records\x12\x15\n\x06run_id\x18\x01\x20\x01(\tR\x05runId\x128\n\x06e\ - vents\x18\x02\x20\x03(\x0b2\x20.substantial.protos.events.EventR\x06even\ - tsJ\xb5\n\n\x06\x12\x04\0\01\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\x08\n\ + \x06kwargs\"/\n\x0cSaveResolved\x12\x1f\n\x0bjson_result\x18\x01\x20\x01\ + (\tR\njsonResult\"`\n\tSaveRetry\x129\n\nwait_until\x18\x01\x20\x01(\x0b\ + 2\x1a.google.protobuf.TimestampR\twaitUntil\x12\x18\n\x07counter\x18\x02\ + \x20\x01(\x05R\x07counter\"\x1e\n\nSaveFailed\x12\x10\n\x03err\x18\x01\ + \x20\x01(\tR\x03err\"\xe2\x01\n\x04Save\x12\x0e\n\x02id\x18\x01\x20\x01(\ + \rR\x02id\x12E\n\x08resolved\x18\n\x20\x01(\x0b2'.substantial.protos.eve\ + nts.SaveResolvedH\0R\x08resolved\x12<\n\x05retry\x18\x0b\x20\x01(\x0b2$.\ + substantial.protos.events.SaveRetryH\0R\x05retry\x12?\n\x06failed\x18\ + \x0c\x20\x01(\x0b2%.substantial.protos.events.SaveFailedH\0R\x06failedB\ + \x04\n\x02of\"w\n\x05Sleep\x12\x0e\n\x02id\x18\x01\x20\x01(\rR\x02id\x12\ + 0\n\x05start\x18\x02\x20\x01(\x0b2\x1a.google.protobuf.TimestampR\x05sta\ + rt\x12,\n\x03end\x18\x03\x20\x01(\x0b2\x1a.google.protobuf.TimestampR\ + \x03end\"0\n\x04Send\x12\x12\n\x04name\x18\x01\x20\x01(\tR\x04name\x12\ + \x14\n\x05value\x18\x02\x20\x01(\tR\x05value\"6\n\x04Stop\x12\x10\n\x02o\ + k\x18\x01\x20\x01(\tH\0R\x02ok\x12\x12\n\x03err\x18\x02\x20\x01(\tH\0R\ + \x03errB\x08\n\x06result\"\xd2\x02\n\x05Event\x12*\n\x02at\x18\x01\x20\ + \x01(\x0b2\x1a.google.protobuf.TimestampR\x02at\x128\n\x05start\x18\n\ + \x20\x01(\x0b2\x20.substantial.protos.events.StartH\0R\x05start\x125\n\ + \x04save\x18\x0b\x20\x01(\x0b2\x1f.substantial.protos.events.SaveH\0R\ + \x04save\x128\n\x05sleep\x18\x0c\x20\x01(\x0b2\x20.substantial.protos.ev\ + ents.SleepH\0R\x05sleep\x125\n\x04send\x18\r\x20\x01(\x0b2\x1f.substanti\ + al.protos.events.SendH\0R\x04send\x125\n\x04stop\x18\x0e\x20\x01(\x0b2\ + \x1f.substantial.protos.events.StopH\0R\x04stopB\x04\n\x02of\"Z\n\x07Rec\ + ords\x12\x15\n\x06run_id\x18\x01\x20\x01(\tR\x05runId\x128\n\x06events\ + \x18\x02\x20\x03(\x0b2\x20.substantial.protos.events.EventR\x06eventsJ\ + \xac\r\n\x06\x12\x04\0\0A\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\x08\n\ \x01\x02\x12\x03\x02\0\"\n\t\n\x02\x03\0\x12\x03\x04\0&\n\t\n\x02\x03\ \x01\x12\x03\x05\0)\n\n\n\x02\x04\0\x12\x04\x07\0\t\x01\n\n\n\x03\x04\0\ \x01\x12\x03\x07\x08\r\n\x0b\n\x04\x04\0\x02\0\x12\x03\x08\x02$\n\x0c\n\ \x05\x04\0\x02\0\x06\x12\x03\x08\x02\x18\n\x0c\n\x05\x04\0\x02\0\x01\x12\ \x03\x08\x19\x1f\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x08\"#\n\n\n\x02\ - \x04\x01\x12\x04\x0b\0\x0f\x01\n\n\n\x03\x04\x01\x01\x12\x03\x0b\x08\x0c\ - \n\x0b\n\x04\x04\x01\x02\0\x12\x03\x0c\x02\x10\n\x0c\n\x05\x04\x01\x02\0\ - \x05\x12\x03\x0c\x02\x08\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x0c\t\x0b\ - \n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x0c\x0e\x0f\n\x0b\n\x04\x04\x01\ - \x02\x01\x12\x03\r\x02\x13\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\r\x02\ - \x08\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\r\t\x0e\n\x0c\n\x05\x04\x01\ - \x02\x01\x03\x12\x03\r\x11\x12\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x0e\ - \x02\x14\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x0e\x02\x07\n\x0c\n\x05\ - \x04\x01\x02\x02\x01\x12\x03\x0e\x08\x0f\n\x0c\n\x05\x04\x01\x02\x02\x03\ - \x12\x03\x0e\x12\x13\n\n\n\x02\x04\x02\x12\x04\x11\0\x15\x01\n\n\n\x03\ - \x04\x02\x01\x12\x03\x11\x08\r\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x12\x02\ - \x10\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x12\x02\x08\n\x0c\n\x05\x04\ - \x02\x02\0\x01\x12\x03\x12\t\x0b\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\ - \x12\x0e\x0f\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x13\x02&\n\x0c\n\x05\ - \x04\x02\x02\x01\x06\x12\x03\x13\x02\x1b\n\x0c\n\x05\x04\x02\x02\x01\x01\ - \x12\x03\x13\x1c!\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x13$%\n\x0b\n\ - \x04\x04\x02\x02\x02\x12\x03\x14\x02$\n\x0c\n\x05\x04\x02\x02\x02\x06\ - \x12\x03\x14\x02\x1b\n\x0c\n\x05\x04\x02\x02\x02\x01\x12\x03\x14\x1c\x1f\ - \n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x14\"#\n\n\n\x02\x04\x03\x12\ - \x04\x17\0\x1a\x01\n\n\n\x03\x04\x03\x01\x12\x03\x17\x08\x0c\n\x0b\n\x04\ - \x04\x03\x02\0\x12\x03\x18\x02\x12\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\ - \x18\x02\x08\n\x0c\n\x05\x04\x03\x02\0\x01\x12\x03\x18\t\r\n\x0c\n\x05\ - \x04\x03\x02\0\x03\x12\x03\x18\x10\x11\n\x0b\n\x04\x04\x03\x02\x01\x12\ - \x03\x19\x02\x13\n\x0c\n\x05\x04\x03\x02\x01\x05\x12\x03\x19\x02\x08\n\ - \x0c\n\x05\x04\x03\x02\x01\x01\x12\x03\x19\t\x0e\n\x0c\n\x05\x04\x03\x02\ - \x01\x03\x12\x03\x19\x11\x12\n\n\n\x02\x04\x04\x12\x04\x1c\0!\x01\n\n\n\ - \x03\x04\x04\x01\x12\x03\x1c\x08\x0c\n\x0c\n\x04\x04\x04\x08\0\x12\x04\ - \x1d\x02\x20\x03\n\x0c\n\x05\x04\x04\x08\0\x01\x12\x03\x1d\x08\x0e\n\x0b\ - \n\x04\x04\x04\x02\0\x12\x03\x1e\x04\x12\n\x0c\n\x05\x04\x04\x02\0\x05\ - \x12\x03\x1e\x04\n\n\x0c\n\x05\x04\x04\x02\0\x01\x12\x03\x1e\x0b\r\n\x0c\ - \n\x05\x04\x04\x02\0\x03\x12\x03\x1e\x10\x11\n\x0b\n\x04\x04\x04\x02\x01\ - \x12\x03\x1f\x04\x13\n\x0c\n\x05\x04\x04\x02\x01\x05\x12\x03\x1f\x04\n\n\ - \x0c\n\x05\x04\x04\x02\x01\x01\x12\x03\x1f\x0b\x0e\n\x0c\n\x05\x04\x04\ - \x02\x01\x03\x12\x03\x1f\x11\x12\n\n\n\x02\x04\x05\x12\x04#\0,\x01\n\n\n\ - \x03\x04\x05\x01\x12\x03#\x08\r\n\x0b\n\x04\x04\x05\x02\0\x12\x03$\x02#\ - \n\x0c\n\x05\x04\x05\x02\0\x06\x12\x03$\x02\x1b\n\x0c\n\x05\x04\x05\x02\ - \0\x01\x12\x03$\x1c\x1e\n\x0c\n\x05\x04\x05\x02\0\x03\x12\x03$!\"\n\x0c\ - \n\x04\x04\x05\x08\0\x12\x04%\x02+\x03\n\x0c\n\x05\x04\x05\x08\0\x01\x12\ - \x03%\x08\n\n\x0b\n\x04\x04\x05\x02\x01\x12\x03&\x04\x15\n\x0c\n\x05\x04\ - \x05\x02\x01\x06\x12\x03&\x04\t\n\x0c\n\x05\x04\x05\x02\x01\x01\x12\x03&\ - \n\x0f\n\x0c\n\x05\x04\x05\x02\x01\x03\x12\x03&\x12\x14\n\x0b\n\x04\x04\ - \x05\x02\x02\x12\x03'\x04\x13\n\x0c\n\x05\x04\x05\x02\x02\x06\x12\x03'\ - \x04\x08\n\x0c\n\x05\x04\x05\x02\x02\x01\x12\x03'\t\r\n\x0c\n\x05\x04\ - \x05\x02\x02\x03\x12\x03'\x10\x12\n\x0b\n\x04\x04\x05\x02\x03\x12\x03(\ - \x04\x15\n\x0c\n\x05\x04\x05\x02\x03\x06\x12\x03(\x04\t\n\x0c\n\x05\x04\ - \x05\x02\x03\x01\x12\x03(\n\x0f\n\x0c\n\x05\x04\x05\x02\x03\x03\x12\x03(\ - \x12\x14\n\x0b\n\x04\x04\x05\x02\x04\x12\x03)\x04\x13\n\x0c\n\x05\x04\ - \x05\x02\x04\x06\x12\x03)\x04\x08\n\x0c\n\x05\x04\x05\x02\x04\x01\x12\ - \x03)\t\r\n\x0c\n\x05\x04\x05\x02\x04\x03\x12\x03)\x10\x12\n\x0b\n\x04\ - \x04\x05\x02\x05\x12\x03*\x04\x13\n\x0c\n\x05\x04\x05\x02\x05\x06\x12\ - \x03*\x04\x08\n\x0c\n\x05\x04\x05\x02\x05\x01\x12\x03*\t\r\n\x0c\n\x05\ - \x04\x05\x02\x05\x03\x12\x03*\x10\x12\n\n\n\x02\x04\x06\x12\x04.\01\x01\ - \n\n\n\x03\x04\x06\x01\x12\x03.\x08\x0f\n\x0b\n\x04\x04\x06\x02\0\x12\ - \x03/\x02\x14\n\x0c\n\x05\x04\x06\x02\0\x05\x12\x03/\x02\x08\n\x0c\n\x05\ - \x04\x06\x02\0\x01\x12\x03/\t\x0f\n\x0c\n\x05\x04\x06\x02\0\x03\x12\x03/\ - \x12\x13\n\x0b\n\x04\x04\x06\x02\x01\x12\x030\x02\x1c\n\x0c\n\x05\x04\ - \x06\x02\x01\x04\x12\x030\x02\n\n\x0c\n\x05\x04\x06\x02\x01\x06\x12\x030\ - \x0b\x10\n\x0c\n\x05\x04\x06\x02\x01\x01\x12\x030\x11\x17\n\x0c\n\x05\ - \x04\x06\x02\x01\x03\x12\x030\x1a\x1bb\x06proto3\ + \x04\x01\x12\x04\x0b\0\r\x01\n\n\n\x03\x04\x01\x01\x12\x03\x0b\x08\x14\n\ + \x0b\n\x04\x04\x01\x02\0\x12\x03\x0c\x02\x19\n\x0c\n\x05\x04\x01\x02\0\ + \x05\x12\x03\x0c\x02\x08\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x0c\t\x14\ + \n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x0c\x17\x18\n\n\n\x02\x04\x02\x12\ + \x04\x0f\0\x12\x01\n\n\n\x03\x04\x02\x01\x12\x03\x0f\x08\x11\n\x0b\n\x04\ + \x04\x02\x02\0\x12\x03\x10\x02+\n\x0c\n\x05\x04\x02\x02\0\x06\x12\x03\ + \x10\x02\x1b\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x10\x1c&\n\x0c\n\x05\ + \x04\x02\x02\0\x03\x12\x03\x10)*\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x11\ + \x02\x14\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\x11\x02\x07\n\x0c\n\x05\ + \x04\x02\x02\x01\x01\x12\x03\x11\x08\x0f\n\x0c\n\x05\x04\x02\x02\x01\x03\ + \x12\x03\x11\x12\x13\n\n\n\x02\x04\x03\x12\x04\x14\0\x16\x01\n\n\n\x03\ + \x04\x03\x01\x12\x03\x14\x08\x12\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x15\ + \x02\x11\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\x15\x02\x08\n\x0c\n\x05\ + \x04\x03\x02\0\x01\x12\x03\x15\t\x0c\n\x0c\n\x05\x04\x03\x02\0\x03\x12\ + \x03\x15\x0f\x10\n\n\n\x02\x04\x04\x12\x04\x18\0\x1f\x01\n\n\n\x03\x04\ + \x04\x01\x12\x03\x18\x08\x0c\n\x0b\n\x04\x04\x04\x02\0\x12\x03\x19\x02\ + \x10\n\x0c\n\x05\x04\x04\x02\0\x05\x12\x03\x19\x02\x08\n\x0c\n\x05\x04\ + \x04\x02\0\x01\x12\x03\x19\t\x0b\n\x0c\n\x05\x04\x04\x02\0\x03\x12\x03\ + \x19\x0e\x0f\n\x0c\n\x04\x04\x04\x08\0\x12\x04\x1a\x02\x1e\x03\n\x0c\n\ + \x05\x04\x04\x08\0\x01\x12\x03\x1a\x08\n\n\x0b\n\x04\x04\x04\x02\x01\x12\ + \x03\x1b\x04\x1f\n\x0c\n\x05\x04\x04\x02\x01\x06\x12\x03\x1b\x04\x10\n\ + \x0c\n\x05\x04\x04\x02\x01\x01\x12\x03\x1b\x11\x19\n\x0c\n\x05\x04\x04\ + \x02\x01\x03\x12\x03\x1b\x1c\x1e\n\x0b\n\x04\x04\x04\x02\x02\x12\x03\x1c\ + \x04\x19\n\x0c\n\x05\x04\x04\x02\x02\x06\x12\x03\x1c\x04\r\n\x0c\n\x05\ + \x04\x04\x02\x02\x01\x12\x03\x1c\x0e\x13\n\x0c\n\x05\x04\x04\x02\x02\x03\ + \x12\x03\x1c\x16\x18\n\x0b\n\x04\x04\x04\x02\x03\x12\x03\x1d\x04\x1b\n\ + \x0c\n\x05\x04\x04\x02\x03\x06\x12\x03\x1d\x04\x0e\n\x0c\n\x05\x04\x04\ + \x02\x03\x01\x12\x03\x1d\x0f\x15\n\x0c\n\x05\x04\x04\x02\x03\x03\x12\x03\ + \x1d\x18\x1a\n\n\n\x02\x04\x05\x12\x04!\0%\x01\n\n\n\x03\x04\x05\x01\x12\ + \x03!\x08\r\n\x0b\n\x04\x04\x05\x02\0\x12\x03\"\x02\x10\n\x0c\n\x05\x04\ + \x05\x02\0\x05\x12\x03\"\x02\x08\n\x0c\n\x05\x04\x05\x02\0\x01\x12\x03\"\ + \t\x0b\n\x0c\n\x05\x04\x05\x02\0\x03\x12\x03\"\x0e\x0f\n\x0b\n\x04\x04\ + \x05\x02\x01\x12\x03#\x02&\n\x0c\n\x05\x04\x05\x02\x01\x06\x12\x03#\x02\ + \x1b\n\x0c\n\x05\x04\x05\x02\x01\x01\x12\x03#\x1c!\n\x0c\n\x05\x04\x05\ + \x02\x01\x03\x12\x03#$%\n\x0b\n\x04\x04\x05\x02\x02\x12\x03$\x02$\n\x0c\ + \n\x05\x04\x05\x02\x02\x06\x12\x03$\x02\x1b\n\x0c\n\x05\x04\x05\x02\x02\ + \x01\x12\x03$\x1c\x1f\n\x0c\n\x05\x04\x05\x02\x02\x03\x12\x03$\"#\n\n\n\ + \x02\x04\x06\x12\x04'\0*\x01\n\n\n\x03\x04\x06\x01\x12\x03'\x08\x0c\n\ + \x0b\n\x04\x04\x06\x02\0\x12\x03(\x02\x12\n\x0c\n\x05\x04\x06\x02\0\x05\ + \x12\x03(\x02\x08\n\x0c\n\x05\x04\x06\x02\0\x01\x12\x03(\t\r\n\x0c\n\x05\ + \x04\x06\x02\0\x03\x12\x03(\x10\x11\n\x0b\n\x04\x04\x06\x02\x01\x12\x03)\ + \x02\x13\n\x0c\n\x05\x04\x06\x02\x01\x05\x12\x03)\x02\x08\n\x0c\n\x05\ + \x04\x06\x02\x01\x01\x12\x03)\t\x0e\n\x0c\n\x05\x04\x06\x02\x01\x03\x12\ + \x03)\x11\x12\n\n\n\x02\x04\x07\x12\x04,\01\x01\n\n\n\x03\x04\x07\x01\ + \x12\x03,\x08\x0c\n\x0c\n\x04\x04\x07\x08\0\x12\x04-\x020\x03\n\x0c\n\ + \x05\x04\x07\x08\0\x01\x12\x03-\x08\x0e\n\x0b\n\x04\x04\x07\x02\0\x12\ + \x03.\x04\x12\n\x0c\n\x05\x04\x07\x02\0\x05\x12\x03.\x04\n\n\x0c\n\x05\ + \x04\x07\x02\0\x01\x12\x03.\x0b\r\n\x0c\n\x05\x04\x07\x02\0\x03\x12\x03.\ + \x10\x11\n\x0b\n\x04\x04\x07\x02\x01\x12\x03/\x04\x13\n\x0c\n\x05\x04\ + \x07\x02\x01\x05\x12\x03/\x04\n\n\x0c\n\x05\x04\x07\x02\x01\x01\x12\x03/\ + \x0b\x0e\n\x0c\n\x05\x04\x07\x02\x01\x03\x12\x03/\x11\x12\n\n\n\x02\x04\ + \x08\x12\x043\0<\x01\n\n\n\x03\x04\x08\x01\x12\x033\x08\r\n\x0b\n\x04\ + \x04\x08\x02\0\x12\x034\x02#\n\x0c\n\x05\x04\x08\x02\0\x06\x12\x034\x02\ + \x1b\n\x0c\n\x05\x04\x08\x02\0\x01\x12\x034\x1c\x1e\n\x0c\n\x05\x04\x08\ + \x02\0\x03\x12\x034!\"\n\x0c\n\x04\x04\x08\x08\0\x12\x045\x02;\x03\n\x0c\ + \n\x05\x04\x08\x08\0\x01\x12\x035\x08\n\n\x0b\n\x04\x04\x08\x02\x01\x12\ + \x036\x04\x15\n\x0c\n\x05\x04\x08\x02\x01\x06\x12\x036\x04\t\n\x0c\n\x05\ + \x04\x08\x02\x01\x01\x12\x036\n\x0f\n\x0c\n\x05\x04\x08\x02\x01\x03\x12\ + \x036\x12\x14\n\x0b\n\x04\x04\x08\x02\x02\x12\x037\x04\x13\n\x0c\n\x05\ + \x04\x08\x02\x02\x06\x12\x037\x04\x08\n\x0c\n\x05\x04\x08\x02\x02\x01\ + \x12\x037\t\r\n\x0c\n\x05\x04\x08\x02\x02\x03\x12\x037\x10\x12\n\x0b\n\ + \x04\x04\x08\x02\x03\x12\x038\x04\x15\n\x0c\n\x05\x04\x08\x02\x03\x06\ + \x12\x038\x04\t\n\x0c\n\x05\x04\x08\x02\x03\x01\x12\x038\n\x0f\n\x0c\n\ + \x05\x04\x08\x02\x03\x03\x12\x038\x12\x14\n\x0b\n\x04\x04\x08\x02\x04\ + \x12\x039\x04\x13\n\x0c\n\x05\x04\x08\x02\x04\x06\x12\x039\x04\x08\n\x0c\ + \n\x05\x04\x08\x02\x04\x01\x12\x039\t\r\n\x0c\n\x05\x04\x08\x02\x04\x03\ + \x12\x039\x10\x12\n\x0b\n\x04\x04\x08\x02\x05\x12\x03:\x04\x13\n\x0c\n\ + \x05\x04\x08\x02\x05\x06\x12\x03:\x04\x08\n\x0c\n\x05\x04\x08\x02\x05\ + \x01\x12\x03:\t\r\n\x0c\n\x05\x04\x08\x02\x05\x03\x12\x03:\x10\x12\n\n\n\ + \x02\x04\t\x12\x04>\0A\x01\n\n\n\x03\x04\t\x01\x12\x03>\x08\x0f\n\x0b\n\ + \x04\x04\t\x02\0\x12\x03?\x02\x14\n\x0c\n\x05\x04\t\x02\0\x05\x12\x03?\ + \x02\x08\n\x0c\n\x05\x04\t\x02\0\x01\x12\x03?\t\x0f\n\x0c\n\x05\x04\t\ + \x02\0\x03\x12\x03?\x12\x13\n\x0b\n\x04\x04\t\x02\x01\x12\x03@\x02\x1c\n\ + \x0c\n\x05\x04\t\x02\x01\x04\x12\x03@\x02\n\n\x0c\n\x05\x04\t\x02\x01\ + \x06\x12\x03@\x0b\x10\n\x0c\n\x05\x04\t\x02\x01\x01\x12\x03@\x11\x17\n\ + \x0c\n\x05\x04\t\x02\x01\x03\x12\x03@\x1a\x1bb\x06proto3\ "; /// `FileDescriptorProto` object which was a source for this generated file @@ -1633,8 +2251,11 @@ pub fn file_descriptor() -> &'static ::protobuf::reflect::FileDescriptor { let mut deps = ::std::vec::Vec::with_capacity(2); deps.push(::protobuf::well_known_types::struct_::file_descriptor().clone()); deps.push(::protobuf::well_known_types::timestamp::file_descriptor().clone()); - let mut messages = ::std::vec::Vec::with_capacity(7); + let mut messages = ::std::vec::Vec::with_capacity(10); messages.push(Start::generated_message_descriptor_data()); + messages.push(SaveResolved::generated_message_descriptor_data()); + messages.push(SaveRetry::generated_message_descriptor_data()); + messages.push(SaveFailed::generated_message_descriptor_data()); messages.push(Save::generated_message_descriptor_data()); messages.push(Sleep::generated_message_descriptor_data()); messages.push(Send::generated_message_descriptor_data()); diff --git a/src/typegate/engine/runtime.d.ts b/src/typegate/engine/runtime.d.ts index 09b3547b57..a0c1779482 100644 --- a/src/typegate/engine/runtime.d.ts +++ b/src/typegate/engine/runtime.d.ts @@ -301,7 +301,14 @@ export type Backend = export type OperationEvent = | { type: "Sleep"; id: number; start: string; end: string } - | { type: "Save"; id: number; value: unknown } + | { + type: "Save"; + id: number; + value: + | { type: "Retry"; wait_until: string; counter: number } + | { type: "Resolved"; payload: unknown } + | { type: "Failed"; err: unknown }; + } | { type: "Send"; event_name: string; value: unknown } | { type: "Stop"; result: unknown } | { type: "Start"; kwargs: Record } diff --git a/src/typegate/src/runtimes/substantial/agent.ts b/src/typegate/src/runtimes/substantial/agent.ts index 86fa42e5dd..b72b9530a7 100644 --- a/src/typegate/src/runtimes/substantial/agent.ts +++ b/src/typegate/src/runtimes/substantial/agent.ts @@ -45,7 +45,19 @@ export class Agent { ) {} async schedule(input: AddScheduleInput) { + // FIXME: + // This function is triggered by the user (start, event, stop) + // Using async rust in here can be tricky, one issue for example is that + // concurrent calls fail silently without panics or even exceptions on the Redis Backend + // mutation { + // one: start(..) # calls schedule(..) + // .. + // tenth: start(..) # calls schedule(..) + // } + await Meta.substantial.storeAddSchedule(input); + // This delay is completely unrelated to the rust side and solves the issue + await sleep(100); } async log(runId: string, schedule: string, content: unknown) { @@ -247,6 +259,7 @@ export class Agent { case "START": { const ret = answer.data as WorkflowResult; switch (Interrupt.getTypeOf(ret.exception)) { + case "SAVE_RETRY": case "SLEEP": case "WAIT_ENSURE_VALUE": case "WAIT_HANDLE_EVENT": @@ -345,7 +358,7 @@ export class Agent { event: { type: "Stop", result: { - [rustResult]: result, + [rustResult]: result ?? null, } as unknown, }, }); @@ -359,6 +372,8 @@ export class Agent { run, }); + // console.log("Persisted", run); + await Meta.substantial.storeCloseSchedule({ backend: this.backend, queue: this.queue, @@ -405,6 +420,7 @@ function checkIfRunHasStopped(run: Run) { `"${run.run_id}" has potentially corrupted logs, another run occured yet previous has not stopped` ); } + life += 1; hasStopped = false; } else if (op.event.type == "Stop") { @@ -419,6 +435,7 @@ function checkIfRunHasStopped(run: Run) { `"${run.run_id}" has potentitally corrupted logs, attempted stopping already closed run, or run with a missing Start` ); } + life -= 1; hasStopped = true; } diff --git a/src/typegate/src/runtimes/substantial/deno_context.ts b/src/typegate/src/runtimes/substantial/deno_context.ts index f725f85adc..44cd59dd41 100644 --- a/src/typegate/src/runtimes/substantial/deno_context.ts +++ b/src/typegate/src/runtimes/substantial/deno_context.ts @@ -1,6 +1,9 @@ // Copyright Metatype OÜ, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 +// FIXME: DO NOT IMPORT any file that refers to Meta, this will be instantiated in a Worker +// import { sleep } from "../../utils.ts"; // will silently fail?? + import { Interrupt, OperationEvent, Run } from "./types.ts"; export class Context { @@ -18,23 +21,90 @@ export class Context { this.run.operations.push({ at: new Date().toJSON(), event: op }); } - async save(fn: () => T | Promise) { + async save(fn: () => T | Promise, option?: SaveOption) { const id = this.#nextId(); + let currRetryCount = 1; for (const { event } of this.run.operations) { if (event.type == "Save" && id == event.id) { - // console.log("skip #", id, event.value); - return event.value; + if (event.value.type == "Resolved") { + return event.value.payload; + } else if (event.value.type == "Retry") { + const delay = new Date(event.value.wait_until); + if (delay.getTime() > new Date().getTime()) { + // Too soon! + throw Interrupt.Variant("SAVE_RETRY"); + } else { + currRetryCount = event.value.counter; + } + } } } - const result = await Promise.resolve(fn()); - this.#appendOp({ - type: "Save", - id, - value: result, - }); - return result; + // current call already counts + currRetryCount += 1; + + try { + let result: any; + if (option?.timeoutMs && option.timeoutMs > 0) { + result = await Promise.race([fn(), failAfter(option.timeoutMs)]); + } else { + result = await Promise.resolve(fn()); + } + + this.#appendOp({ + type: "Save", + id, + value: { + type: "Resolved", + payload: result, + }, + }); + + return result; + } catch (err) { + if ( + option?.retry?.maxRetries && + currRetryCount < option.retry.maxRetries + ) { + const { retry } = option; + const strategy = new RetryStrategy( + retry.maxRetries, + retry.minBackoffMs, + retry.maxBackoffMs + ); + + const retriesLeft = Math.max(retry.maxRetries - currRetryCount, 0); + const delayMs = strategy.eval(retry.strategy ?? "linear", retriesLeft); + const waitUntilAsMs = new Date().getTime() + delayMs; + + this.#appendOp({ + type: "Save", + id, + value: { + type: "Retry", + wait_until: new Date(waitUntilAsMs).toJSON(), + counter: currRetryCount, + }, + }); + + throw Interrupt.Variant("SAVE_RETRY"); + } else { + this.#appendOp({ + type: "Save", + id, + value: { + type: "Failed", + err: { + retries: currRetryCount, + message: err?.message ?? `${err}`, + }, + }, + }); + } + + throw err; + } } sleep(durationMs: number) { @@ -106,3 +176,86 @@ export class Context { return result; } } + +// TODO: move all of these into substantial lib once Meta can be used inside workers +// ```rust +// #[serde(....)] +// pub enum RetryStrategy { Linear {...}, Exp { ... }} +// impl RetryStrategy { pub fn eval(&self, retries_left: i8) { .. }} +// +// ``` + +type Strategy = "linear"; + +interface SaveOption { + timeoutMs?: number; + retry?: { + strategy?: Strategy; // TODO: add more + minBackoffMs: number; + maxBackoffMs: number; + maxRetries: number; + }; +} + +function failAfter(ms: number): Promise { + return new Promise((_, reject) => { + setTimeout(() => { + reject(Error("Save timed out")); + }, ms); + }); +} + +class RetryStrategy { + minBackoffMs?: number; + maxBackoffMs?: number; + maxRetries: number; + + constructor( + maxRetries: number, + minBackoffMs?: number, + maxBackoffMs?: number + ) { + this.maxRetries = maxRetries; + this.minBackoffMs = minBackoffMs; + this.maxBackoffMs = maxBackoffMs; + + if (this.maxRetries < 1) { + throw new Error("maxRetries < 1"); + } + + const low = this.minBackoffMs; + const high = this.maxBackoffMs; + + if (low && high) { + if (low >= high) { + throw new Error("minBackoffMs >= maxBackoffMs"); + } + if (low < 0) { + throw new Error("minBackoffMs < 0"); + } + } else if (low && high == undefined) { + this.maxBackoffMs = low + 10; + } else if (low == undefined && high) { + this.minBackoffMs = Math.max(0, high - 10); + } + } + + eval(strategy: Strategy, retriesLeft: number) { + switch (strategy) { + case "linear": + return this.#linear(retriesLeft); + // TODO: add more + default: + throw new Error(`Unknown strategy "${strategy}" provided`); + } + } + + #linear(retriesLeft: number): number { + if (retriesLeft <= 0) { + throw new Error("retries left <= 0"); + } + + const dt = (this.maxBackoffMs ?? 0) - (this.minBackoffMs ?? 0); + return Math.floor(((this.maxRetries - retriesLeft) * dt) / this.maxRetries); + } +} diff --git a/src/typegate/src/runtimes/substantial/types.ts b/src/typegate/src/runtimes/substantial/types.ts index 6514b9cd8d..2a1258d3d6 100644 --- a/src/typegate/src/runtimes/substantial/types.ts +++ b/src/typegate/src/runtimes/substantial/types.ts @@ -55,6 +55,7 @@ export type WorkflowResult = { export type InterruptType = | "SLEEP" + | "SAVE_RETRY" | "WAIT_RECEIVE_EVENT" | "WAIT_HANDLE_EVENT" | "WAIT_ENSURE_VALUE"; @@ -62,8 +63,9 @@ export type InterruptType = export class Interrupt extends Error { private static readonly PREFIX = "SUBSTANTIAL_INTERRUPT_"; - private constructor(type: string) { + private constructor(type: string, cause?: unknown) { super(Interrupt.PREFIX + type); + this.cause = cause; } static getTypeOf(err: unknown): InterruptType | null { @@ -73,7 +75,7 @@ export class Interrupt extends Error { return null; } - static Variant(kind: InterruptType) { - return new Interrupt(kind); + static Variant(kind: InterruptType, cause?: unknown) { + return new Interrupt(kind, cause); } } diff --git a/src/typegate/src/runtimes/substantial/worker.ts b/src/typegate/src/runtimes/substantial/worker.ts index d8cbe1064a..084a0b6211 100644 --- a/src/typegate/src/runtimes/substantial/worker.ts +++ b/src/typegate/src/runtimes/substantial/worker.ts @@ -11,7 +11,10 @@ self.onmessage = async function (event) { switch (type) { case "START": { const { modulePath, functionName, run, schedule, kwargs } = data; + // FIXME: handle case when script is missing and notify WorkerManager so it cleans up + // its registry. const module = await import(modulePath); + // TODO: for python use the same strategy but instead call from native const workflowFn = module[functionName]; diff --git a/tests/runtimes/substantial/common.ts b/tests/runtimes/substantial/common.ts index 77ca96e350..707d6834ef 100644 --- a/tests/runtimes/substantial/common.ts +++ b/tests/runtimes/substantial/common.ts @@ -289,7 +289,7 @@ export function concurrentWorkflowTestTemplate( { result: { status: "COMPLETED", - value: 'Email sent to one@example.com: "confirmed!"', + value: "Email sent to one@example.com: confirmed!", }, run_id: runIds[0], }, @@ -320,3 +320,177 @@ export function concurrentWorkflowTestTemplate( } ); } + +export function retrySaveTestTemplate( + backendName: BackendName, + { + delays, + secrets, + }: { + delays: { + awaitCompleteAll: number; + }; + secrets?: Record; + }, + cleanup?: MetaTestCleanupFn +) { + Meta.test( + { + name: `Retry logic (${backendName})`, + }, + async (t) => { + Deno.env.set("SUB_BACKEND", backendName); + cleanup && t.addCleanup(cleanup); + + const e = await t.engine("runtimes/substantial/substantial.py", { + secrets, + }); + + let resolvedId: string, + retryId: string, + timeoutId: string, + retryAbortMeId: string; + await t.should( + `start retry workflows concurrently (${backendName})`, + async () => { + await gql` + mutation { + resolved: start_retry(kwargs: { fail: false, timeout: false }) + timeout: start_retry(kwargs: { fail: false, timeout: true }) + retry: start_retry(kwargs: { fail: true, timeout: false }) + retry_abort_me: start_retry( + kwargs: { fail: true, timeout: false } + ) + } + ` + .expectBody((body) => { + resolvedId = body.data?.resolved! as string; + retryId = body.data?.retry! as string; + timeoutId = body.data?.timeout! as string; + retryAbortMeId = body.data?.retry_abort_me! as string; + + assertExists(resolvedId, "resolve runId"); + assertExists(retryId, "retry runId"); + assertExists(timeoutId, "timeou runId"); + assertExists(retryAbortMeId, "retry_abort_me runId"); + }) + .on(e); + } + ); + + await sleep(1000); + + await t.should( + `abort workflow that attempts to retry (${backendName})`, + async () => { + await gql` + mutation { + abort_retry(run_id: $run_id) + } + ` + .withVars({ + run_id: retryAbortMeId, + }) + .expectData({ + abort_retry: retryAbortMeId, + }) + .on(e); + } + ); + + // Waiting for the retry to finish + await sleep(delays.awaitCompleteAll * 1000); + + await t.should( + `complete execution of all retries (${backendName})`, + async () => { + await gql` + query { + retry_results { + ongoing { + count + runs { + run_id + } + } + completed { + count + runs { + run_id + result { + status + value + } + } + } + } + } + ` + .expectBody((body) => { + assertEquals( + body?.data?.retry_results?.ongoing?.count, + 0, + "0 workflow currently running" + ); + + assertEquals( + body?.data?.retry_results?.completed?.count, + 4, + "4 workflows completed" + ); + + const localSorter = (a: any, b: any) => + a.run_id.localeCompare(b.run_id); + + const received = + body?.data?.retry_results?.completed?.runs ?? + ([] as Array); + const expected = [ + { + result: { + status: "COMPLETED", + value: "No timeout, No fail", + }, + run_id: resolvedId, + }, + { + result: { + status: "COMPLETED_WITH_ERROR", + value: "Failed successfully", + }, + run_id: retryId, + }, + { + result: { + status: "COMPLETED_WITH_ERROR", + value: "ABORTED", + }, + run_id: retryAbortMeId, + }, + { + result: { + status: "COMPLETED_WITH_ERROR", + value: "Save timed out", + }, + run_id: timeoutId, + }, + ]; + + assertEquals( + received.sort(localSorter), + expected.sort(localSorter), + "All workflows have completed" + ); + }) + .on(e); + } + ); + } + ); +} + +// TODO: +// mock a very basic http server in another process that counts the number of request made by a workflow +// This will allow.. +// - Emulating/keeping track of 'natural' retries, timeout +// - Checking if a resolved save makes new requests after interrupts diff --git a/tests/runtimes/substantial/imports/common_types.ts b/tests/runtimes/substantial/imports/common_types.ts index ca430af768..bc79dbe61e 100644 --- a/tests/runtimes/substantial/imports/common_types.ts +++ b/tests/runtimes/substantial/imports/common_types.ts @@ -1,8 +1,9 @@ +// TODO: include this as part of the metagen generated code + export interface Context { - // TODO: metagen including this kwargs: any; sleep: (ms: number) => void; - save(fn: () => Promise): Promise; + save(fn: () => T | Promise, option?: SaveOption): Promise; receive(eventName: string): O; handle( eventName: string, @@ -11,6 +12,16 @@ export interface Context { ensure(conditionFn: () => boolean | Promise): Promise; } +export interface SaveOption { + timeoutMs?: number; + retry?: { + strategy?: "linear"; + minBackoffMs: number; + maxBackoffMs: number; + maxRetries: number; + }; +} + export async function queryThatTakesAWhile(input: T): Promise { await sleep(2000); return input; diff --git a/tests/runtimes/substantial/kv_like_test.ts b/tests/runtimes/substantial/kv_like_test.ts index 3054f06ae1..eaf6046e10 100644 --- a/tests/runtimes/substantial/kv_like_test.ts +++ b/tests/runtimes/substantial/kv_like_test.ts @@ -1,7 +1,11 @@ // Copyright Metatype OÜ, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 -import { basicTestTemplate, concurrentWorkflowTestTemplate } from "./common.ts"; +import { + basicTestTemplate, + concurrentWorkflowTestTemplate, + retrySaveTestTemplate, +} from "./common.ts"; basicTestTemplate("memory", { delays: { awaitSleepCompleteSec: 7 }, @@ -14,3 +18,9 @@ basicTestTemplate("fs", { concurrentWorkflowTestTemplate("fs", { delays: { awaitEmailCompleteSec: 10 }, }); + +retrySaveTestTemplate("fs", { + delays: { + awaitCompleteAll: 15, + }, +}); diff --git a/tests/runtimes/substantial/redis_test.ts b/tests/runtimes/substantial/redis_test.ts index d4c2392f85..077c6c4e88 100644 --- a/tests/runtimes/substantial/redis_test.ts +++ b/tests/runtimes/substantial/redis_test.ts @@ -6,6 +6,7 @@ import { basicTestTemplate, concurrentWorkflowTestTemplate, redisCleanup, + retrySaveTestTemplate, } from "./common.ts"; basicTestTemplate( @@ -25,3 +26,15 @@ concurrentWorkflowTestTemplate( }, redisCleanup(SUB_REDIS) ); + +retrySaveTestTemplate( + "redis", + { + delays: { + awaitCompleteAll: 18, + }, + secrets: { SUB_REDIS }, + }, + + redisCleanup(SUB_REDIS) +); diff --git a/tests/runtimes/substantial/substantial.py b/tests/runtimes/substantial/substantial.py index 353b808199..919cb31f40 100644 --- a/tests/runtimes/substantial/substantial.py +++ b/tests/runtimes/substantial/substantial.py @@ -24,22 +24,37 @@ def substantial(g: Graph): deps=["imports/common_types.ts"], ) - sub_email = sub.deno( + email = sub.deno( file="workflow.ts", name="eventsAndExceptionExample", deps=["imports/common_types.ts"], ) + retry = sub.deno( + file="workflow.ts", + name="retryExample", + deps=["imports/common_types.ts"], + ) + g.expose( pub, + # sleep start=save_and_sleep.start(t.struct({"a": t.integer(), "b": t.integer()})), workers=save_and_sleep.query_resources(), results=save_and_sleep.query_results( t.either([t.integer(), t.string()]).rename("ResultOrError") ), - start_email=sub_email.start(t.struct({"to": t.email()})), - send_confirmation=sub_email.send(t.boolean(), event_name="confirmation"), - email_workers=sub_email.query_resources(), - email_results=sub_email.query_results(t.string()), - abort_email_confirmation=sub_email.stop(), + # email + start_email=email.start(t.struct({"to": t.email()})), + send_confirmation=email.send(t.boolean(), event_name="confirmation"), + email_workers=email.query_resources(), + email_results=email.query_results(t.string()), + abort_email_confirmation=email.stop(), + # retry + start_retry=retry.start( + t.struct({"fail": t.boolean(), "timeout": t.boolean()}) + ), + retry_workers=retry.query_resources(), + retry_results=retry.query_results(t.string()), + abort_retry=retry.stop(), ) diff --git a/tests/runtimes/substantial/workflow.ts b/tests/runtimes/substantial/workflow.ts index 43b614290b..392b31c6a8 100644 --- a/tests/runtimes/substantial/workflow.ts +++ b/tests/runtimes/substantial/workflow.ts @@ -2,6 +2,7 @@ import { Context, queryThatTakesAWhile, sendSubscriptionEmail, + sleep, } from "./imports/common_types.ts"; export async function eventsAndExceptionExample(ctx: Context) { @@ -15,7 +16,7 @@ export async function eventsAndExceptionExample(ctx: Context) { throw new Error(`${to} has denied the subscription`); } - return `${messageDialog}: "confirmed!"`; + return `${messageDialog}: confirmed!`; } export async function saveAndSleepExample(ctx: Context) { @@ -29,3 +30,45 @@ export async function saveAndSleepExample(ctx: Context) { ctx.sleep(5000); return newA + newB; } + +export async function retryExample(ctx: Context) { + const { fail, timeout } = ctx.kwargs; + const retryRet = await ctx.save( + () => { + if (fail) { + throw new Error(`Failed successfully`); + } + + return "No fail"; + }, + { + retry: { + strategy: "linear", + minBackoffMs: 1000, + maxBackoffMs: 5000, + maxRetries: 4, + }, + } + ); + + const timeoutRet = await ctx.save( + async () => { + if (timeout) { + await sleep(500); + } + + return "No timeout"; + }, + { + timeoutMs: 200, + retry: { + strategy: "linear", + minBackoffMs: 1000, + maxBackoffMs: 3000, + maxRetries: 5, + }, + } + ); + + return [timeoutRet, retryRet].join(", "); +} diff --git a/tools/tasks/install.ts b/tools/tasks/install.ts index 42a0358679..5e58a0cad8 100644 --- a/tools/tasks/install.ts +++ b/tools/tasks/install.ts @@ -110,8 +110,8 @@ export default { // "install-protobuf-compiler": { // fn: ($) => $`sudo apt install protobuf-compiler`, // }, - // Enable rust codegen for protoc "install-protobuf-rust": { - fn: ($) => $`cargo install protobuf-codegen@3.5.1`, + desc: "Enable rust codegen for protoc", + fn: ($) => $`cargo binstall protobuf-codegen@3.5.1`, }, } satisfies Record;