Skip to content

Commit

Permalink
chore: make Message cheap to clone (#2272)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
yhl25 and vigith authored Dec 11, 2024
1 parent 059a585 commit b2e480b
Show file tree
Hide file tree
Showing 15 changed files with 201 additions and 162 deletions.
109 changes: 61 additions & 48 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::PartialEq;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;

use async_nats::HeaderValue;
use base64::engine::general_purpose::STANDARD as BASE64_STANDARD;
Expand All @@ -23,13 +24,13 @@ use crate::{config, Error};
const DROP: &str = "U+005C__DROP__";

/// A message that is sent from the source to the sink.
#[derive(Debug, Clone, Serialize, Deserialize)]
/// It is cheap to clone.
#[derive(Debug, Clone)]
pub(crate) struct Message {
// FIXME: Arc<[Bytes]>
/// keys of the message
pub(crate) keys: Vec<String>,
pub(crate) keys: Arc<[String]>,
/// tags of the message
pub(crate) tags: Option<Vec<String>>,
pub(crate) tags: Option<Arc<[String]>>,
/// actual payload of the message
pub(crate) value: Bytes,
/// offset of the message, it is optional because offset is only
Expand Down Expand Up @@ -81,8 +82,8 @@ impl TryFrom<async_nats::Message> for Message {
let event_time = Utc::now();
let offset = None;
let id = MessageID {
vertex_name: config::get_vertex_name().to_string(),
offset: "0".to_string(),
vertex_name: config::get_vertex_name().to_string().into(),
offset: "0".to_string().into(),
index: 0,
};

Expand Down Expand Up @@ -131,23 +132,29 @@ impl fmt::Display for IntOffset {

/// StringOffset is string based offset enum type.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StringOffset {
offset: String,
pub(crate) struct StringOffset {
/// offset could be a complex base64 string.
offset: Bytes,
partition_idx: u16,
}

impl StringOffset {
pub fn new(seq: String, partition_idx: u16) -> Self {
Self {
offset: seq,
offset: seq.into(),
partition_idx,
}
}
}

impl fmt::Display for StringOffset {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}-{}", self.offset, self.partition_idx)
write!(
f,
"{}-{}",
std::str::from_utf8(&self.offset).expect("it should be valid utf-8"),
self.partition_idx
)
}
}

Expand All @@ -159,18 +166,19 @@ pub(crate) enum ReadAck {
Nak,
}

/// Message ID which is used to uniquely identify a message. It cheap to clone this.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct MessageID {
pub(crate) vertex_name: String,
pub(crate) offset: String,
pub(crate) vertex_name: Bytes,
pub(crate) offset: Bytes,
pub(crate) index: i32,
}

impl From<numaflow_pb::objects::isb::MessageId> for MessageID {
fn from(id: numaflow_pb::objects::isb::MessageId) -> Self {
Self {
vertex_name: id.vertex_name,
offset: id.offset,
vertex_name: id.vertex_name.into(),
offset: id.offset.into(),
index: id.index,
}
}
Expand All @@ -179,16 +187,21 @@ impl From<numaflow_pb::objects::isb::MessageId> for MessageID {
impl From<MessageID> for numaflow_pb::objects::isb::MessageId {
fn from(id: MessageID) -> Self {
Self {
vertex_name: id.vertex_name,
offset: id.offset,
vertex_name: String::from_utf8_lossy(&id.vertex_name).to_string(),
offset: String::from_utf8_lossy(&id.offset).to_string(),
index: id.index,
}
}
}

impl fmt::Display for MessageID {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}-{}-{}", self.vertex_name, self.offset, self.index)
write!(
f,
"{}-{}-{}",
std::str::from_utf8(&self.vertex_name).expect("it should be valid utf-8"),
std::str::from_utf8(&self.offset).expect("it should be valid utf-8"),
self.index
)
}
}

Expand Down Expand Up @@ -220,8 +233,8 @@ impl TryFrom<Message> for BytesMut {
}),
kind: numaflow_pb::objects::isb::MessageKind::Data as i32,
id: Some(message.id.into()),
keys: message.keys.clone(),
headers: message.headers.clone(),
keys: message.keys.to_vec(),
headers: message.headers,
}),
body: Some(numaflow_pb::objects::isb::Body {
payload: message.value.to_vec(),
Expand Down Expand Up @@ -255,7 +268,7 @@ impl TryFrom<Bytes> for Message {
let id = header.id.ok_or(Error::Proto("Missing id".to_string()))?;

Ok(Message {
keys: header.keys,
keys: Arc::from(header.keys.into_boxed_slice()),
tags: None,
value: body.payload.into(),
offset: None,
Expand All @@ -273,7 +286,7 @@ impl From<Message> for SourceTransformRequest {
request: Some(
numaflow_pb::clients::sourcetransformer::source_transform_request::Request {
id: message.id.to_string(),
keys: message.keys,
keys: message.keys.to_vec(),
value: message.value.to_vec(),
event_time: prost_timestamp_from_utc(message.event_time),
watermark: None,
Expand All @@ -292,21 +305,21 @@ impl TryFrom<read_response::Result> for Message {
fn try_from(result: read_response::Result) -> Result<Self> {
let source_offset = match result.offset {
Some(o) => Offset::String(StringOffset {
offset: BASE64_STANDARD.encode(o.offset),
offset: BASE64_STANDARD.encode(o.offset).into(),
partition_idx: o.partition_id as u16,
}),
None => return Err(Error::Source("Offset not found".to_string())),
};

Ok(Message {
keys: result.keys,
keys: Arc::from(result.keys),
tags: None,
value: result.payload.into(),
offset: Some(source_offset.clone()),
event_time: utc_from_timestamp(result.event_time),
id: MessageID {
vertex_name: config::get_vertex_name().to_string(),
offset: source_offset.to_string(),
vertex_name: config::get_vertex_name().to_string().into(),
offset: source_offset.to_string().into(),
index: 0,
},
headers: result.headers,
Expand All @@ -319,7 +332,7 @@ impl From<Message> for SinkRequest {
fn from(message: Message) -> Self {
Self {
request: Some(Request {
keys: message.keys,
keys: message.keys.to_vec(),
value: message.value.to_vec(),
event_time: prost_timestamp_from_utc(message.event_time),
watermark: None,
Expand Down Expand Up @@ -399,7 +412,7 @@ mod tests {
#[test]
fn test_offset_display() {
let offset = Offset::String(StringOffset {
offset: "123".to_string(),
offset: "123".to_string().into(),
partition_idx: 1,
});
assert_eq!(format!("{}", offset), "123-1");
Expand All @@ -408,8 +421,8 @@ mod tests {
#[test]
fn test_message_id_display() {
let message_id = MessageID {
vertex_name: "vertex".to_string(),
offset: "123".to_string(),
vertex_name: "vertex".to_string().into(),
offset: "123".to_string().into(),
index: 0,
};
assert_eq!(format!("{}", message_id), "vertex-123-0");
Expand All @@ -418,17 +431,17 @@ mod tests {
#[test]
fn test_message_to_vec_u8() {
let message = Message {
keys: vec!["key1".to_string()],
keys: Arc::from(vec!["key1".to_string()]),
tags: None,
value: vec![1, 2, 3].into(),
offset: Some(Offset::String(StringOffset {
offset: "123".to_string(),
offset: "123".to_string().into(),
partition_idx: 0,
})),
event_time: Utc.timestamp_opt(1627846261, 0).unwrap(),
id: MessageID {
vertex_name: "vertex".to_string(),
offset: "123".to_string(),
vertex_name: "vertex".to_string().into(),
offset: "123".to_string().into(),
index: 0,
},
headers: HashMap::new(),
Expand All @@ -445,8 +458,8 @@ mod tests {
}),
kind: numaflow_pb::objects::isb::MessageKind::Data as i32,
id: Some(message.id.into()),
keys: message.keys.clone(),
headers: message.headers.clone(),
keys: message.keys.to_vec(),
headers: message.headers,
}),
body: Some(Body {
payload: message.value.clone().into(),
Expand Down Expand Up @@ -488,7 +501,7 @@ mod tests {
assert!(result.is_ok());

let message = result.unwrap();
assert_eq!(message.keys, vec!["key1".to_string()]);
assert_eq!(message.keys.to_vec(), vec!["key1".to_string()]);
assert_eq!(message.value, vec![1, 2, 3]);
assert_eq!(
message.event_time,
Expand All @@ -499,17 +512,17 @@ mod tests {
#[test]
fn test_message_to_source_transform_request() {
let message = Message {
keys: vec!["key1".to_string()],
keys: Arc::from(vec!["key1".to_string()]),
tags: None,
value: vec![1, 2, 3].into(),
offset: Some(Offset::String(StringOffset {
offset: "123".to_string(),
offset: "123".to_string().into(),
partition_idx: 0,
})),
event_time: Utc.timestamp_opt(1627846261, 0).unwrap(),
id: MessageID {
vertex_name: "vertex".to_string(),
offset: "123".to_string(),
vertex_name: "vertex".to_string().into(),
offset: "123".to_string().into(),
index: 0,
},
headers: HashMap::new(),
Expand Down Expand Up @@ -538,7 +551,7 @@ mod tests {
assert!(message.is_ok());

let message = message.unwrap();
assert_eq!(message.keys, vec!["key1".to_string()]);
assert_eq!(message.keys.to_vec(), vec!["key1".to_string()]);
assert_eq!(message.value, vec![1, 2, 3]);
assert_eq!(
message.event_time,
Expand All @@ -549,17 +562,17 @@ mod tests {
#[test]
fn test_message_to_sink_request() {
let message = Message {
keys: vec!["key1".to_string()],
keys: Arc::from(vec!["key1".to_string()]),
tags: None,
value: vec![1, 2, 3].into(),
offset: Some(Offset::String(StringOffset {
offset: "123".to_string(),
offset: "123".to_string().into(),
partition_idx: 0,
})),
event_time: Utc.timestamp_opt(1627846261, 0).unwrap(),
id: MessageID {
vertex_name: "vertex".to_string(),
offset: "123".to_string(),
vertex_name: "vertex".to_string().into(),
offset: "123".to_string().into(),
index: 0,
},
headers: HashMap::new(),
Expand Down Expand Up @@ -622,8 +635,8 @@ mod tests {
#[test]
fn test_message_id_to_proto() {
let message_id = MessageID {
vertex_name: "vertex".to_string(),
offset: "123".to_string(),
vertex_name: "vertex".to_string().into(),
offset: "123".to_string().into(),
index: 0,
};
let proto_id: MessageId = message_id.into();
Expand Down
7 changes: 4 additions & 3 deletions rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ async fn create_js_context(config: pipeline::isb::jetstream::ClientConfig) -> Re
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use async_nats::jetstream;
Expand Down Expand Up @@ -428,14 +429,14 @@ mod tests {

use crate::message::{Message, MessageID, Offset, StringOffset};
let message = Message {
keys: vec!["key1".to_string()],
keys: Arc::from(vec!["key1".to_string()]),
tags: None,
value: vec![1, 2, 3].into(),
offset: Some(Offset::String(StringOffset::new("123".to_string(), 0))),
event_time: Utc.timestamp_opt(1627846261, 0).unwrap(),
id: MessageID {
vertex_name: "vertex".to_string(),
offset: "123".to_string(),
vertex_name: "vertex".to_string().into(),
offset: "123".to_string().into(),
index: 0,
},
headers: HashMap::new(),
Expand Down
17 changes: 9 additions & 8 deletions rust/numaflow-core/src/pipeline/isb/jetstream/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ impl JetstreamReader {
)));

message.id = MessageID {
vertex_name: pipeline_config.vertex_name.clone(),
offset: msg_info.stream_sequence.to_string(),
vertex_name: pipeline_config.vertex_name.clone().into(),
offset: msg_info.stream_sequence.to_string().into(),
index: 0,
};

Expand Down Expand Up @@ -258,6 +258,7 @@ impl fmt::Display for JetstreamReader {
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;

use super::*;
use crate::message::{Message, MessageID};
Expand Down Expand Up @@ -327,14 +328,14 @@ mod tests {

for i in 0..10 {
let message = Message {
keys: vec![format!("key_{}", i)],
keys: Arc::from(vec![format!("key_{}", i)]),
tags: None,
value: format!("message {}", i).as_bytes().to_vec().into(),
offset: None,
event_time: Utc::now(),
id: MessageID {
vertex_name: "vertex".to_string(),
offset: format!("offset_{}", i),
vertex_name: "vertex".to_string().into(),
offset: format!("offset_{}", i).into(),
index: i,
},
headers: HashMap::new(),
Expand Down Expand Up @@ -429,14 +430,14 @@ mod tests {
// write 5 messages
for i in 0..5 {
let message = Message {
keys: vec![format!("key_{}", i)],
keys: Arc::from(vec![format!("key_{}", i)]),
tags: None,
value: format!("message {}", i).as_bytes().to_vec().into(),
offset: None,
event_time: Utc::now(),
id: MessageID {
vertex_name: "vertex".to_string(),
offset: format!("{}", i + 1),
vertex_name: "vertex".to_string().into(),
offset: format!("{}", i + 1).into(),
index: i,
},
headers: HashMap::new(),
Expand Down
Loading

0 comments on commit b2e480b

Please sign in to comment.