diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index f8cb25ff1..2b3ca0b5f 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -1,6 +1,7 @@ use std::cmp::PartialEq; use std::collections::HashMap; use std::fmt; +use std::sync::Arc; use async_nats::HeaderValue; use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; @@ -23,13 +24,13 @@ use crate::{config, Error}; const DROP: &str = "U+005C__DROP__"; /// A message that is sent from the source to the sink. -#[derive(Debug, Clone, Serialize, Deserialize)] +/// It is cheap to clone. +#[derive(Debug, Clone)] pub(crate) struct Message { - // FIXME: Arc<[Bytes]> /// keys of the message - pub(crate) keys: Vec, + pub(crate) keys: Arc<[String]>, /// tags of the message - pub(crate) tags: Option>, + pub(crate) tags: Option>, /// actual payload of the message pub(crate) value: Bytes, /// offset of the message, it is optional because offset is only @@ -81,8 +82,8 @@ impl TryFrom for Message { let event_time = Utc::now(); let offset = None; let id = MessageID { - vertex_name: config::get_vertex_name().to_string(), - offset: "0".to_string(), + vertex_name: config::get_vertex_name().to_string().into(), + offset: "0".to_string().into(), index: 0, }; @@ -131,15 +132,16 @@ impl fmt::Display for IntOffset { /// StringOffset is string based offset enum type. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct StringOffset { - offset: String, +pub(crate) struct StringOffset { + /// offset could be a complex base64 string. + offset: Bytes, partition_idx: u16, } impl StringOffset { pub fn new(seq: String, partition_idx: u16) -> Self { Self { - offset: seq, + offset: seq.into(), partition_idx, } } @@ -147,7 +149,12 @@ impl StringOffset { impl fmt::Display for StringOffset { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}-{}", self.offset, self.partition_idx) + write!( + f, + "{}-{}", + std::str::from_utf8(&self.offset).expect("it should be valid utf-8"), + self.partition_idx + ) } } @@ -159,18 +166,19 @@ pub(crate) enum ReadAck { Nak, } +/// Message ID which is used to uniquely identify a message. It cheap to clone this. #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct MessageID { - pub(crate) vertex_name: String, - pub(crate) offset: String, + pub(crate) vertex_name: Bytes, + pub(crate) offset: Bytes, pub(crate) index: i32, } impl From for MessageID { fn from(id: numaflow_pb::objects::isb::MessageId) -> Self { Self { - vertex_name: id.vertex_name, - offset: id.offset, + vertex_name: id.vertex_name.into(), + offset: id.offset.into(), index: id.index, } } @@ -179,16 +187,21 @@ impl From for MessageID { impl From for numaflow_pb::objects::isb::MessageId { fn from(id: MessageID) -> Self { Self { - vertex_name: id.vertex_name, - offset: id.offset, + vertex_name: String::from_utf8_lossy(&id.vertex_name).to_string(), + offset: String::from_utf8_lossy(&id.offset).to_string(), index: id.index, } } } - impl fmt::Display for MessageID { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}-{}-{}", self.vertex_name, self.offset, self.index) + write!( + f, + "{}-{}-{}", + std::str::from_utf8(&self.vertex_name).expect("it should be valid utf-8"), + std::str::from_utf8(&self.offset).expect("it should be valid utf-8"), + self.index + ) } } @@ -220,8 +233,8 @@ impl TryFrom for BytesMut { }), kind: numaflow_pb::objects::isb::MessageKind::Data as i32, id: Some(message.id.into()), - keys: message.keys.clone(), - headers: message.headers.clone(), + keys: message.keys.to_vec(), + headers: message.headers, }), body: Some(numaflow_pb::objects::isb::Body { payload: message.value.to_vec(), @@ -255,7 +268,7 @@ impl TryFrom for Message { let id = header.id.ok_or(Error::Proto("Missing id".to_string()))?; Ok(Message { - keys: header.keys, + keys: Arc::from(header.keys.into_boxed_slice()), tags: None, value: body.payload.into(), offset: None, @@ -273,7 +286,7 @@ impl From for SourceTransformRequest { request: Some( numaflow_pb::clients::sourcetransformer::source_transform_request::Request { id: message.id.to_string(), - keys: message.keys, + keys: message.keys.to_vec(), value: message.value.to_vec(), event_time: prost_timestamp_from_utc(message.event_time), watermark: None, @@ -292,21 +305,21 @@ impl TryFrom for Message { fn try_from(result: read_response::Result) -> Result { let source_offset = match result.offset { Some(o) => Offset::String(StringOffset { - offset: BASE64_STANDARD.encode(o.offset), + offset: BASE64_STANDARD.encode(o.offset).into(), partition_idx: o.partition_id as u16, }), None => return Err(Error::Source("Offset not found".to_string())), }; Ok(Message { - keys: result.keys, + keys: Arc::from(result.keys), tags: None, value: result.payload.into(), offset: Some(source_offset.clone()), event_time: utc_from_timestamp(result.event_time), id: MessageID { - vertex_name: config::get_vertex_name().to_string(), - offset: source_offset.to_string(), + vertex_name: config::get_vertex_name().to_string().into(), + offset: source_offset.to_string().into(), index: 0, }, headers: result.headers, @@ -319,7 +332,7 @@ impl From for SinkRequest { fn from(message: Message) -> Self { Self { request: Some(Request { - keys: message.keys, + keys: message.keys.to_vec(), value: message.value.to_vec(), event_time: prost_timestamp_from_utc(message.event_time), watermark: None, @@ -399,7 +412,7 @@ mod tests { #[test] fn test_offset_display() { let offset = Offset::String(StringOffset { - offset: "123".to_string(), + offset: "123".to_string().into(), partition_idx: 1, }); assert_eq!(format!("{}", offset), "123-1"); @@ -408,8 +421,8 @@ mod tests { #[test] fn test_message_id_display() { let message_id = MessageID { - vertex_name: "vertex".to_string(), - offset: "123".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "123".to_string().into(), index: 0, }; assert_eq!(format!("{}", message_id), "vertex-123-0"); @@ -418,17 +431,17 @@ mod tests { #[test] fn test_message_to_vec_u8() { let message = Message { - keys: vec!["key1".to_string()], + keys: Arc::from(vec!["key1".to_string()]), tags: None, value: vec![1, 2, 3].into(), offset: Some(Offset::String(StringOffset { - offset: "123".to_string(), + offset: "123".to_string().into(), partition_idx: 0, })), event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "123".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "123".to_string().into(), index: 0, }, headers: HashMap::new(), @@ -445,8 +458,8 @@ mod tests { }), kind: numaflow_pb::objects::isb::MessageKind::Data as i32, id: Some(message.id.into()), - keys: message.keys.clone(), - headers: message.headers.clone(), + keys: message.keys.to_vec(), + headers: message.headers, }), body: Some(Body { payload: message.value.clone().into(), @@ -488,7 +501,7 @@ mod tests { assert!(result.is_ok()); let message = result.unwrap(); - assert_eq!(message.keys, vec!["key1".to_string()]); + assert_eq!(message.keys.to_vec(), vec!["key1".to_string()]); assert_eq!(message.value, vec![1, 2, 3]); assert_eq!( message.event_time, @@ -499,17 +512,17 @@ mod tests { #[test] fn test_message_to_source_transform_request() { let message = Message { - keys: vec!["key1".to_string()], + keys: Arc::from(vec!["key1".to_string()]), tags: None, value: vec![1, 2, 3].into(), offset: Some(Offset::String(StringOffset { - offset: "123".to_string(), + offset: "123".to_string().into(), partition_idx: 0, })), event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "123".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "123".to_string().into(), index: 0, }, headers: HashMap::new(), @@ -538,7 +551,7 @@ mod tests { assert!(message.is_ok()); let message = message.unwrap(); - assert_eq!(message.keys, vec!["key1".to_string()]); + assert_eq!(message.keys.to_vec(), vec!["key1".to_string()]); assert_eq!(message.value, vec![1, 2, 3]); assert_eq!( message.event_time, @@ -549,17 +562,17 @@ mod tests { #[test] fn test_message_to_sink_request() { let message = Message { - keys: vec!["key1".to_string()], + keys: Arc::from(vec!["key1".to_string()]), tags: None, value: vec![1, 2, 3].into(), offset: Some(Offset::String(StringOffset { - offset: "123".to_string(), + offset: "123".to_string().into(), partition_idx: 0, })), event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "123".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "123".to_string().into(), index: 0, }, headers: HashMap::new(), @@ -622,8 +635,8 @@ mod tests { #[test] fn test_message_id_to_proto() { let message_id = MessageID { - vertex_name: "vertex".to_string(), - offset: "123".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "123".to_string().into(), index: 0, }; let proto_id: MessageId = message_id.into(); diff --git a/rust/numaflow-core/src/pipeline.rs b/rust/numaflow-core/src/pipeline.rs index ffee46033..a8deaa7b6 100644 --- a/rust/numaflow-core/src/pipeline.rs +++ b/rust/numaflow-core/src/pipeline.rs @@ -229,6 +229,7 @@ async fn create_js_context(config: pipeline::isb::jetstream::ClientConfig) -> Re #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::Arc; use std::time::Duration; use async_nats::jetstream; @@ -428,14 +429,14 @@ mod tests { use crate::message::{Message, MessageID, Offset, StringOffset}; let message = Message { - keys: vec!["key1".to_string()], + keys: Arc::from(vec!["key1".to_string()]), tags: None, value: vec![1, 2, 3].into(), offset: Some(Offset::String(StringOffset::new("123".to_string(), 0))), event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "123".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "123".to_string().into(), index: 0, }, headers: HashMap::new(), diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs index 4216484ec..377322890 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs @@ -149,8 +149,8 @@ impl JetstreamReader { ))); message.id = MessageID { - vertex_name: pipeline_config.vertex_name.clone(), - offset: msg_info.stream_sequence.to_string(), + vertex_name: pipeline_config.vertex_name.clone().into(), + offset: msg_info.stream_sequence.to_string().into(), index: 0, }; @@ -258,6 +258,7 @@ impl fmt::Display for JetstreamReader { #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::Arc; use super::*; use crate::message::{Message, MessageID}; @@ -327,14 +328,14 @@ mod tests { for i in 0..10 { let message = Message { - keys: vec![format!("key_{}", i)], + keys: Arc::from(vec![format!("key_{}", i)]), tags: None, value: format!("message {}", i).as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: format!("offset_{}", i), + vertex_name: "vertex".to_string().into(), + offset: format!("offset_{}", i).into(), index: i, }, headers: HashMap::new(), @@ -429,14 +430,14 @@ mod tests { // write 5 messages for i in 0..5 { let message = Message { - keys: vec![format!("key_{}", i)], + keys: Arc::from(vec![format!("key_{}", i)]), tags: None, value: format!("message {}", i).as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: format!("{}", i + 1), + vertex_name: "vertex".to_string().into(), + offset: format!("{}", i + 1).into(), index: i, }, headers: HashMap::new(), diff --git a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs index 5ff61de3d..969f343ab 100644 --- a/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs +++ b/rust/numaflow-core/src/pipeline/isb/jetstream/writer.rs @@ -201,7 +201,7 @@ impl JetstreamWriter { // check to which partition the message should be written let partition = forward::determine_partition( - message.id.offset.clone(), + String::from_utf8_lossy(&message.id.offset).to_string(), vertex.writer_config.partitions, &mut hash, ); @@ -456,7 +456,7 @@ impl JetstreamWriter { pub(crate) struct ResolveAndPublishResult { pub(crate) pafs: Vec<(Stream, PublishAckFuture)>, pub(crate) payload: Vec, - pub(crate) offset: String, + pub(crate) offset: Bytes, } #[cfg(test)] @@ -526,14 +526,14 @@ mod tests { ); let message = Message { - keys: vec!["key_0".to_string()], + keys: Arc::from(vec!["key_0".to_string()]), tags: None, value: "message 0".as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "offset_0".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "offset_0".to_string().into(), index: 0, }, headers: HashMap::new(), @@ -585,14 +585,14 @@ mod tests { .unwrap(); let message = Message { - keys: vec!["key_0".to_string()], + keys: Arc::from(vec!["key_0".to_string()]), tags: None, value: "message 0".as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "offset_0".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "offset_0".to_string().into(), index: 0, }, headers: HashMap::new(), @@ -669,14 +669,14 @@ mod tests { // Publish 10 messages successfully for i in 0..10 { let message = Message { - keys: vec![format!("key_{}", i)], + keys: Arc::from(vec![format!("key_{}", i)]), tags: None, value: format!("message {}", i).as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: format!("offset_{}", i), + vertex_name: "vertex".to_string().into(), + offset: format!("offset_{}", i).into(), index: i, }, headers: HashMap::new(), @@ -694,14 +694,14 @@ mod tests { // Attempt to publish a message which has a payload size greater than the max_message_size // so that it fails and sync write will be attempted and it will be blocked let message = Message { - keys: vec!["key_11".to_string()], + keys: Arc::from(vec!["key_11".to_string()]), tags: None, value: vec![0; 1025].into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "offset_11".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "offset_11".to_string().into(), index: 11, }, headers: HashMap::new(), @@ -961,14 +961,14 @@ mod tests { // Publish 500 messages for i in 0..500 { let message = Message { - keys: vec![format!("key_{}", i)], + keys: Arc::from(vec![format!("key_{}", i)]), tags: None, value: format!("message {}", i).as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: format!("offset_{}", i), + vertex_name: "vertex".to_string().into(), + offset: format!("offset_{}", i).into(), index: i, }, headers: HashMap::new(), @@ -1049,14 +1049,14 @@ mod tests { // Publish 100 messages successfully for i in 0..100 { let message = Message { - keys: vec![format!("key_{}", i)], + keys: Arc::from(vec![format!("key_{}", i)]), tags: None, value: format!("message {}", i).as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: format!("offset_{}", i), + vertex_name: "vertex".to_string().into(), + offset: format!("offset_{}", i).into(), index: i, }, headers: HashMap::new(), @@ -1076,21 +1076,21 @@ mod tests { // Attempt to publish the 101st message, which should get stuck in the retry loop // because the max message size is set to 1024 let message = Message { - keys: vec!["key_101".to_string()], + keys: Arc::from(vec!["key_101".to_string()]), tags: None, value: vec![0; 1025].into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "offset_101".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "offset_101".to_string().into(), index: 101, }, headers: HashMap::new(), }; let (ack_tx, ack_rx) = tokio::sync::oneshot::channel(); tracker_handle - .insert("offset_101".to_string(), ack_tx) + .insert("offset_101".to_string().into(), ack_tx) .await .unwrap(); ack_rxs.push(ack_rx); @@ -1189,14 +1189,14 @@ mod tests { let mut ack_rxs = vec![]; for i in 0..10 { let message = Message { - keys: vec![format!("key_{}", i)], - tags: Some(vec!["tag1".to_string(), "tag2".to_string()]), + keys: Arc::from(vec![format!("key_{}", i)]), + tags: Some(Arc::from(vec!["tag1".to_string(), "tag2".to_string()])), value: format!("message {}", i).as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: format!("offset_{}", i), + vertex_name: "vertex".to_string().into(), + offset: format!("offset_{}", i).into(), index: i, }, headers: HashMap::new(), diff --git a/rust/numaflow-core/src/shared/forward.rs b/rust/numaflow-core/src/shared/forward.rs index e249989cd..11b9195cc 100644 --- a/rust/numaflow-core/src/shared/forward.rs +++ b/rust/numaflow-core/src/shared/forward.rs @@ -1,11 +1,12 @@ use numaflow_models::models::ForwardConditions; use std::hash::{DefaultHasher, Hasher}; +use std::sync::Arc; /// Checks if the message should to written to downstream vertex based the conditions /// and message tags. If not tags are provided by there are edge conditions present, we will /// still forward to all vertices. pub(crate) fn should_forward( - tags: Option>, + tags: Option>, conditions: Option>, ) -> bool { conditions.map_or(true, |conditions| { @@ -81,7 +82,7 @@ mod tests { let mut tag_conditions = TagConditions::new(vec!["tag1".to_string(), "tag2".to_string()]); tag_conditions.operator = Some("and".to_string()); let conditions = ForwardConditions::new(tag_conditions); - let tags = Some(vec!["tag1".to_string(), "tag2".to_string()]); + let tags = Some(Arc::from(vec!["tag1".to_string(), "tag2".to_string()])); let result = should_forward(tags, Some(Box::new(conditions))); assert!(result); } @@ -91,7 +92,7 @@ mod tests { let mut tag_conditions = TagConditions::new(vec!["tag1".to_string()]); tag_conditions.operator = Some("or".to_string()); let conditions = ForwardConditions::new(tag_conditions); - let tags = Some(vec!["tag2".to_string(), "tag1".to_string()]); + let tags = Some(Arc::from(vec!["tag2".to_string(), "tag1".to_string()])); let result = should_forward(tags, Some(Box::new(conditions))); assert!(result); } @@ -101,7 +102,7 @@ mod tests { let mut tag_conditions = TagConditions::new(vec!["tag1".to_string()]); tag_conditions.operator = Some("not".to_string()); let conditions = ForwardConditions::new(tag_conditions); - let tags = Some(vec!["tag2".to_string()]); + let tags = Some(Arc::from(vec!["tag2".to_string()])); let result = should_forward(tags, Some(Box::new(conditions))); assert!(result); } diff --git a/rust/numaflow-core/src/sink.rs b/rust/numaflow-core/src/sink.rs index 6f8fa8454..474f91e77 100644 --- a/rust/numaflow-core/src/sink.rs +++ b/rust/numaflow-core/src/sink.rs @@ -607,6 +607,7 @@ impl Drop for SinkWriter { mod tests { use chrono::Utc; use numaflow::sink; + use std::sync::Arc; use tokio::time::Duration; use tokio_util::sync::CancellationToken; @@ -650,14 +651,14 @@ mod tests { let messages: Vec = (0..5) .map(|i| Message { - keys: vec![format!("key_{}", i)], + keys: Arc::from(vec![format!("key_{}", i)]), tags: None, value: format!("message {}", i).as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: format!("offset_{}", i), + vertex_name: "vertex".to_string().into(), + offset: format!("offset_{}", i).into(), index: i, }, headers: HashMap::new(), @@ -685,14 +686,14 @@ mod tests { let messages: Vec = (0..10) .map(|i| Message { - keys: vec![format!("key_{}", i)], + keys: Arc::from(vec![format!("key_{}", i)]), tags: None, value: format!("message {}", i).as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: format!("offset_{}", i), + vertex_name: "vertex".to_string().into(), + offset: format!("offset_{}", i).into(), index: i, }, headers: HashMap::new(), @@ -763,14 +764,14 @@ mod tests { let messages: Vec = (0..10) .map(|i| Message { - keys: vec!["error".to_string()], + keys: Arc::from(vec!["error".to_string()]), tags: None, value: format!("message {}", i).as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: format!("offset_{}", i), + vertex_name: "vertex".to_string().into(), + offset: format!("offset_{}", i).into(), index: i, }, headers: HashMap::new(), @@ -850,14 +851,14 @@ mod tests { let messages: Vec = (0..20) .map(|i| Message { - keys: vec!["fallback".to_string()], + keys: Arc::from(vec!["fallback".to_string()]), tags: None, value: format!("message {}", i).as_bytes().to_vec().into(), offset: None, event_time: Utc::now(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: format!("offset_{}", i), + vertex_name: "vertex".to_string().into(), + offset: format!("offset_{}", i).into(), index: i, }, headers: HashMap::new(), diff --git a/rust/numaflow-core/src/sink/blackhole.rs b/rust/numaflow-core/src/sink/blackhole.rs index 7b6bc870b..dd537d18b 100644 --- a/rust/numaflow-core/src/sink/blackhole.rs +++ b/rust/numaflow-core/src/sink/blackhole.rs @@ -20,6 +20,7 @@ impl Sink for BlackholeSink { #[cfg(test)] mod tests { use chrono::Utc; + use std::sync::Arc; use super::BlackholeSink; use crate::message::IntOffset; @@ -31,28 +32,28 @@ mod tests { let mut sink = BlackholeSink; let messages = vec![ Message { - keys: vec![], + keys: Arc::from(vec![]), tags: None, value: b"Hello, World!".to_vec().into(), offset: Some(Offset::Int(IntOffset::new(1, 0))), event_time: Utc::now(), headers: Default::default(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "1".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "1".to_string().into(), index: 0, }, }, Message { - keys: vec![], + keys: Arc::from(vec![]), tags: None, value: b"Hello, World!".to_vec().into(), offset: Some(Offset::Int(IntOffset::new(1, 0))), event_time: Utc::now(), headers: Default::default(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "2".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "2".to_string().into(), index: 1, }, }, diff --git a/rust/numaflow-core/src/sink/log.rs b/rust/numaflow-core/src/sink/log.rs index e83fa0847..a82670e8d 100644 --- a/rust/numaflow-core/src/sink/log.rs +++ b/rust/numaflow-core/src/sink/log.rs @@ -36,6 +36,7 @@ impl Sink for LogSink { #[cfg(test)] mod tests { use chrono::Utc; + use std::sync::Arc; use super::LogSink; use crate::message::IntOffset; @@ -47,28 +48,28 @@ mod tests { let mut sink = LogSink; let messages = vec![ Message { - keys: vec![], + keys: Arc::from(vec![]), tags: None, value: b"Hello, World!".to_vec().into(), offset: Some(Offset::Int(IntOffset::new(1, 0))), event_time: Utc::now(), headers: Default::default(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "1".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "1".to_string().into(), index: 0, }, }, Message { - keys: vec![], + keys: Arc::from(vec![]), tags: None, value: b"Hello, World!".to_vec().into(), offset: Some(Offset::Int(IntOffset::new(1, 0))), event_time: Utc::now(), headers: Default::default(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "2".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "2".to_string().into(), index: 1, }, }, diff --git a/rust/numaflow-core/src/sink/user_defined.rs b/rust/numaflow-core/src/sink/user_defined.rs index e475c77cd..a1817b1ae 100644 --- a/rust/numaflow-core/src/sink/user_defined.rs +++ b/rust/numaflow-core/src/sink/user_defined.rs @@ -120,6 +120,7 @@ impl Sink for UserDefinedSink { mod tests { use chrono::offset::Utc; use numaflow::sink; + use std::sync::Arc; use tokio::sync::mpsc; use tracing::info; @@ -179,28 +180,28 @@ mod tests { let messages = vec![ Message { - keys: vec![], + keys: Arc::from(vec![]), tags: None, value: b"Hello, World!".to_vec().into(), offset: None, event_time: Utc::now(), headers: Default::default(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "1".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "1".to_string().into(), index: 0, }, }, Message { - keys: vec![], + keys: Arc::from(vec![]), tags: None, value: b"Hello, World!".to_vec().into(), offset: None, event_time: Utc::now(), headers: Default::default(), id: MessageID { - vertex_name: "vertex".to_string(), - offset: "2".to_string(), + vertex_name: "vertex".to_string().into(), + offset: "2".to_string().into(), index: 1, }, }, diff --git a/rust/numaflow-core/src/source.rs b/rust/numaflow-core/src/source.rs index df551e696..3f2816514 100644 --- a/rust/numaflow-core/src/source.rs +++ b/rust/numaflow-core/src/source.rs @@ -295,7 +295,7 @@ impl Source { // insert the offset and the ack one shot in the tracker. tracker_handle - .insert(offset.to_string(), resp_ack_tx) + .insert(offset.to_string().into(), resp_ack_tx) .await?; // store the ack one shot in the batch to invoke ack later. diff --git a/rust/numaflow-core/src/source/generator.rs b/rust/numaflow-core/src/source/generator.rs index 3dd4f8aba..fdc5d590a 100644 --- a/rust/numaflow-core/src/source/generator.rs +++ b/rust/numaflow-core/src/source/generator.rs @@ -21,6 +21,7 @@ use tokio_stream::StreamExt; /// NOTE: The minimum granularity of duration is 10ms. mod stream_generator { use std::pin::Pin; + use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -165,14 +166,14 @@ mod stream_generator { } Message { - keys: self.next_key_to_be_fetched(), + keys: Arc::from(self.next_key_to_be_fetched()), tags: None, value: data.into(), offset: Some(offset.clone()), event_time, id: MessageID { - vertex_name: get_vertex_name().to_string(), - offset: offset.to_string(), + vertex_name: get_vertex_name().to_string().into(), + offset: offset.to_string().into(), index: Default::default(), }, headers: Default::default(), diff --git a/rust/numaflow-core/src/source/pulsar.rs b/rust/numaflow-core/src/source/pulsar.rs index 64a3ecda9..6a2c7162b 100644 --- a/rust/numaflow-core/src/source/pulsar.rs +++ b/rust/numaflow-core/src/source/pulsar.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Duration; use numaflow_pulsar::source::{PulsarMessage, PulsarSource, PulsarSourceConfig}; @@ -14,14 +15,14 @@ impl TryFrom for Message { let offset = Offset::Int(IntOffset::new(message.offset, 1)); // FIXME: partition id Ok(Message { - keys: vec![message.key], + keys: Arc::from(vec![message.key]), tags: None, value: message.payload, offset: Some(offset.clone()), event_time: message.event_time, id: MessageID { - vertex_name: get_vertex_name().to_string(), - offset: offset.to_string(), + vertex_name: get_vertex_name().to_string().into(), + offset: offset.to_string().into(), index: 0, }, headers: message.headers, @@ -153,6 +154,7 @@ mod tests { assert_eq!(messages.len(), 10); let offsets: Vec = messages.into_iter().map(|m| m.offset.unwrap()).collect(); + pulsar.ack(offsets).await?; Ok(()) diff --git a/rust/numaflow-core/src/tracker.rs b/rust/numaflow-core/src/tracker.rs index 9f0839a2e..a1dd32662 100644 --- a/rust/numaflow-core/src/tracker.rs +++ b/rust/numaflow-core/src/tracker.rs @@ -11,6 +11,7 @@ use crate::error::Error; use crate::message::ReadAck; use crate::Result; +use bytes::Bytes; use std::collections::HashMap; use tokio::sync::{mpsc, oneshot}; use tracing::warn; @@ -176,10 +177,13 @@ impl TrackerHandle { /// Inserts a new message into the Tracker with the given offset and acknowledgment sender. pub(crate) async fn insert( &self, - offset: String, + offset: Bytes, ack_send: oneshot::Sender, ) -> Result<()> { - let message = ActorMessage::Insert { offset, ack_send }; + let message = ActorMessage::Insert { + offset: String::from_utf8_lossy(&offset).to_string(), + ack_send, + }; self.sender .send(message) .await @@ -188,8 +192,12 @@ impl TrackerHandle { } /// Updates an existing message in the Tracker with the given offset, count, and EOF status. - pub(crate) async fn update(&self, offset: String, count: u32, eof: bool) -> Result<()> { - let message = ActorMessage::Update { offset, count, eof }; + pub(crate) async fn update(&self, offset: Bytes, count: u32, eof: bool) -> Result<()> { + let message = ActorMessage::Update { + offset: String::from_utf8_lossy(&offset).to_string(), + count, + eof, + }; self.sender .send(message) .await @@ -198,8 +206,10 @@ impl TrackerHandle { } /// Deletes a message from the Tracker with the given offset. - pub(crate) async fn delete(&self, offset: String) -> Result<()> { - let message = ActorMessage::Delete { offset }; + pub(crate) async fn delete(&self, offset: Bytes) -> Result<()> { + let message = ActorMessage::Delete { + offset: String::from_utf8_lossy(&offset).to_string(), + }; self.sender .send(message) .await @@ -208,8 +218,10 @@ impl TrackerHandle { } /// Discards a message from the Tracker with the given offset. - pub(crate) async fn discard(&self, offset: String) -> Result<()> { - let message = ActorMessage::Discard { offset }; + pub(crate) async fn discard(&self, offset: Bytes) -> Result<()> { + let message = ActorMessage::Discard { + offset: String::from_utf8_lossy(&offset).to_string(), + }; self.sender .send(message) .await @@ -245,15 +257,18 @@ mod tests { // Insert a new message handle - .insert("offset1".to_string(), ack_send) + .insert("offset1".to_string().into(), ack_send) .await .unwrap(); // Update the message - handle.update("offset1".to_string(), 1, true).await.unwrap(); + handle + .update("offset1".to_string().into(), 1, true) + .await + .unwrap(); // Delete the message - handle.delete("offset1".to_string()).await.unwrap(); + handle.delete("offset1".to_string().into()).await.unwrap(); // Verify that the message was deleted and ack was received let result = timeout(Duration::from_secs(1), ack_recv).await.unwrap(); @@ -269,20 +284,20 @@ mod tests { // Insert a new message handle - .insert("offset1".to_string(), ack_send) + .insert("offset1".to_string().into(), ack_send) .await .unwrap(); // Update the message with a count of 3 handle - .update("offset1".to_string(), 3, false) + .update("offset1".to_string().into(), 3, false) .await .unwrap(); // Delete the message three times - handle.delete("offset1".to_string()).await.unwrap(); - handle.delete("offset1".to_string()).await.unwrap(); - handle.delete("offset1".to_string()).await.unwrap(); + handle.delete("offset1".to_string().into()).await.unwrap(); + handle.delete("offset1".to_string().into()).await.unwrap(); + handle.delete("offset1".to_string().into()).await.unwrap(); // Verify that the message was deleted and ack was received after the third delete let result = timeout(Duration::from_secs(1), ack_recv).await.unwrap(); @@ -298,12 +313,12 @@ mod tests { // Insert a new message handle - .insert("offset1".to_string(), ack_send) + .insert("offset1".to_string().into(), ack_send) .await .unwrap(); // Discard the message - handle.discard("offset1".to_string()).await.unwrap(); + handle.discard("offset1".to_string().into()).await.unwrap(); // Verify that the message was discarded and nak was received let result = timeout(Duration::from_secs(1), ack_recv).await.unwrap(); @@ -319,18 +334,18 @@ mod tests { // Insert a new message handle - .insert("offset1".to_string(), ack_send) + .insert("offset1".to_string().into(), ack_send) .await .unwrap(); // Update the message with a count of 3 handle - .update("offset1".to_string(), 3, false) + .update("offset1".to_string().into(), 3, false) .await .unwrap(); // Discard the message - handle.discard("offset1".to_string()).await.unwrap(); + handle.discard("offset1".to_string().into()).await.unwrap(); // Verify that the message was discarded and nak was received let result = timeout(Duration::from_secs(1), ack_recv).await.unwrap(); diff --git a/rust/numaflow-core/src/transformer.rs b/rust/numaflow-core/src/transformer.rs index eec95e37a..d6d63bdfe 100644 --- a/rust/numaflow-core/src/transformer.rs +++ b/rust/numaflow-core/src/transformer.rs @@ -234,7 +234,7 @@ mod tests { let transformer = Transformer::new(500, 10, client, tracker_handle.clone()).await?; let message = Message { - keys: vec!["first".into()], + keys: Arc::from(vec!["first".into()]), tags: None, value: "hello".into(), offset: Some(Offset::String(crate::message::StringOffset::new( @@ -243,8 +243,8 @@ mod tests { ))), event_time: chrono::Utc::now(), id: MessageID { - vertex_name: "vertex_name".to_string(), - offset: "0".to_string(), + vertex_name: "vertex_name".to_string().into(), + offset: "0".to_string().into(), index: 0, }, headers: Default::default(), @@ -311,7 +311,7 @@ mod tests { for i in 0..5 { let message = Message { - keys: vec![format!("key_{}", i)], + keys: Arc::from(vec![format!("key_{}", i)]), tags: None, value: format!("value_{}", i).into(), offset: Some(Offset::String(crate::message::StringOffset::new( @@ -320,8 +320,8 @@ mod tests { ))), event_time: chrono::Utc::now(), id: MessageID { - vertex_name: "vertex_name".to_string(), - offset: i.to_string(), + vertex_name: "vertex_name".to_string().into(), + offset: i.to_string().into(), index: i, }, headers: Default::default(), diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index 6a493c2a9..9a82275ac 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -98,12 +98,12 @@ impl UserDefinedTransformer { for (i, result) in resp.results.into_iter().enumerate() { let message = Message { id: MessageID { - vertex_name: get_vertex_name().to_string(), + vertex_name: get_vertex_name().to_string().into(), index: i as i32, - offset: msg_info.offset.to_string(), + offset: msg_info.offset.to_string().into(), }, - keys: result.keys, - tags: Some(result.tags), + keys: Arc::from(result.keys), + tags: Some(Arc::from(result.tags)), value: result.value.into(), offset: None, event_time: utc_from_timestamp(result.event_time), @@ -142,6 +142,7 @@ impl UserDefinedTransformer { #[cfg(test)] mod tests { use std::error::Error; + use std::sync::Arc; use std::time::Duration; use numaflow::sourcetransform; @@ -194,7 +195,7 @@ mod tests { .await?; let message = crate::message::Message { - keys: vec!["first".into()], + keys: Arc::from(vec!["first".into()]), tags: None, value: "hello".into(), offset: Some(crate::message::Offset::String(StringOffset::new( @@ -203,8 +204,8 @@ mod tests { ))), event_time: chrono::Utc::now(), id: MessageID { - vertex_name: "vertex_name".to_string(), - offset: "0".to_string(), + vertex_name: "vertex_name".to_string().into(), + offset: "0".to_string().into(), index: 0, }, headers: Default::default(),