From 0c86e820e6a47780dceab143175e747d28ab4e47 Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Thu, 2 Jan 2025 12:19:42 +0530 Subject: [PATCH] chore: move message conversions to appropriate modules (#2306) Signed-off-by: Yashash H L Signed-off-by: Vigith Maurice Co-authored-by: Vigith Maurice --- rust/numaflow-core/src/config/monovertex.rs | 2 +- .../src/mapper/map/user_defined.rs | 18 + rust/numaflow-core/src/message.rs | 337 +----------------- rust/numaflow-core/src/sink.rs | 132 ++++++- rust/numaflow-core/src/sink/blackhole.rs | 8 +- rust/numaflow-core/src/sink/log.rs | 11 +- rust/numaflow-core/src/sink/user_defined.rs | 23 +- rust/numaflow-core/src/source/user_defined.rs | 109 +++++- .../src/transformer/user_defined.rs | 58 ++- 9 files changed, 330 insertions(+), 368 deletions(-) diff --git a/rust/numaflow-core/src/config/monovertex.rs b/rust/numaflow-core/src/config/monovertex.rs index 75f5a8cb9..c6f18e3c8 100644 --- a/rust/numaflow-core/src/config/monovertex.rs +++ b/rust/numaflow-core/src/config/monovertex.rs @@ -47,7 +47,7 @@ impl Default for MonovertexConfig { source_type: source::SourceType::Generator(GeneratorConfig::default()), }, sink_config: SinkConfig { - sink_type: sink::SinkType::Log(sink::LogConfig::default()), + sink_type: SinkType::Log(sink::LogConfig::default()), retry_config: None, }, transformer_config: None, diff --git a/rust/numaflow-core/src/mapper/map/user_defined.rs b/rust/numaflow-core/src/mapper/map/user_defined.rs index 6bc816c40..0799eb654 100644 --- a/rust/numaflow-core/src/mapper/map/user_defined.rs +++ b/rust/numaflow-core/src/mapper/map/user_defined.rs @@ -13,6 +13,7 @@ use tracing::error; use crate::config::get_vertex_name; use crate::error::{Error, Result}; use crate::message::{Message, MessageID, Offset}; +use crate::shared::grpc::prost_timestamp_from_utc; type ResponseSenderMap = Arc>>)>>>; @@ -26,6 +27,23 @@ struct ParentMessageInfo { headers: HashMap, } +impl From for MapRequest { + fn from(message: Message) -> Self { + Self { + request: Some(map::map_request::Request { + keys: message.keys.to_vec(), + value: message.value.to_vec(), + event_time: prost_timestamp_from_utc(message.event_time), + watermark: None, + headers: message.headers, + }), + id: message.offset.unwrap().to_string(), + handshake: None, + status: None, + } + } +} + /// UserDefinedUnaryMap is a grpc client that sends unary requests to the map server /// and forwards the responses. pub(in crate::mapper) struct UserDefinedUnaryMap { diff --git a/rust/numaflow-core/src/message.rs b/rust/numaflow-core/src/message.rs index a33b4a704..00f5cca66 100644 --- a/rust/numaflow-core/src/message.rs +++ b/rust/numaflow-core/src/message.rs @@ -3,24 +3,14 @@ use std::collections::HashMap; use std::fmt; use std::sync::Arc; -use async_nats::HeaderValue; -use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; -use base64::Engine; use bytes::{Bytes, BytesMut}; use chrono::{DateTime, Utc}; -use numaflow_pb::clients::map::MapRequest; -use numaflow_pb::clients::sink::sink_request::Request; -use numaflow_pb::clients::sink::Status::{Failure, Fallback, Success}; -use numaflow_pb::clients::sink::{sink_response, SinkRequest}; -use numaflow_pb::clients::source::read_response; -use numaflow_pb::clients::sourcetransformer::SourceTransformRequest; use prost::Message as ProtoMessage; use serde::{Deserialize, Serialize}; use crate::shared::grpc::prost_timestamp_from_utc; use crate::shared::grpc::utc_from_timestamp; -use crate::Result; -use crate::{config, Error}; +use crate::Error; const DROP: &str = "U+005C__DROP__"; @@ -62,44 +52,6 @@ impl fmt::Display for Offset { } } -impl TryFrom for Message { - type Error = Error; - - fn try_from(message: async_nats::Message) -> std::result::Result { - let payload = message.payload; - let headers: HashMap = message - .headers - .unwrap_or_default() - .iter() - .map(|(key, value)| { - ( - key.to_string(), - value.first().unwrap_or(&HeaderValue::from("")).to_string(), - ) - }) - .collect(); - // FIXME(cr): we should not be using subject. keys are in the payload - let keys = message.subject.split('.').map(|s| s.to_string()).collect(); - let event_time = Utc::now(); - let offset = None; - let id = MessageID { - vertex_name: config::get_vertex_name().to_string().into(), - offset: "0".to_string().into(), - index: 0, - }; - - Ok(Self { - keys, - tags: None, - value: payload, - offset, - event_time, - id, - headers, - }) - } -} - impl Message { // Check if the message should be dropped. pub(crate) fn dropped(&self) -> bool { @@ -135,8 +87,8 @@ impl fmt::Display for IntOffset { #[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct StringOffset { /// offset could be a complex base64 string. - offset: Bytes, - partition_idx: u16, + pub(crate) offset: Bytes, + pub(crate) partition_idx: u16, } impl StringOffset { @@ -206,26 +158,10 @@ impl fmt::Display for MessageID { } } -impl TryFrom for numaflow_pb::clients::source::Offset { - type Error = Error; - - fn try_from(offset: Offset) -> std::result::Result { - match offset { - Offset::Int(_) => Err(Error::Source("IntOffset not supported".to_string())), - Offset::String(o) => Ok(numaflow_pb::clients::source::Offset { - offset: BASE64_STANDARD - .decode(o.offset) - .expect("we control the encoding, so this should never fail"), - partition_id: o.partition_idx as i32, - }), - } - } -} - impl TryFrom for BytesMut { type Error = Error; - fn try_from(message: Message) -> std::result::Result { + fn try_from(message: Message) -> Result { let proto_message = numaflow_pb::objects::isb::Message { header: Some(numaflow_pb::objects::isb::Header { message_info: Some(numaflow_pb::objects::isb::MessageInfo { @@ -253,7 +189,7 @@ impl TryFrom for BytesMut { impl TryFrom for Message { type Error = Error; - fn try_from(bytes: Bytes) -> std::result::Result { + fn try_from(bytes: Bytes) -> Result { let proto_message = numaflow_pb::objects::isb::Message::decode(bytes) .map_err(|e| Error::Proto(e.to_string()))?; @@ -280,153 +216,14 @@ impl TryFrom for Message { } } -/// Convert the [`Message`] to [`SourceTransformRequest`] -impl From for SourceTransformRequest { - fn from(message: Message) -> Self { - Self { - request: Some( - numaflow_pb::clients::sourcetransformer::source_transform_request::Request { - id: message - .offset - .expect("offset should be present") - .to_string(), - keys: message.keys.to_vec(), - value: message.value.to_vec(), - event_time: prost_timestamp_from_utc(message.event_time), - watermark: None, - headers: message.headers, - }, - ), - handshake: None, - } - } -} - -impl From for MapRequest { - fn from(message: Message) -> Self { - Self { - request: Some(numaflow_pb::clients::map::map_request::Request { - keys: message.keys.to_vec(), - value: message.value.to_vec(), - event_time: prost_timestamp_from_utc(message.event_time), - watermark: None, - headers: message.headers, - }), - id: message.offset.unwrap().to_string(), - handshake: None, - status: None, - } - } -} - -/// Convert [`read_response::Result`] to [`Message`] -impl TryFrom for Message { - type Error = Error; - - fn try_from(result: read_response::Result) -> Result { - let source_offset = match result.offset { - Some(o) => Offset::String(StringOffset { - offset: BASE64_STANDARD.encode(o.offset).into(), - partition_idx: o.partition_id as u16, - }), - None => return Err(Error::Source("Offset not found".to_string())), - }; - - Ok(Message { - keys: Arc::from(result.keys), - tags: None, - value: result.payload.into(), - offset: Some(source_offset.clone()), - event_time: utc_from_timestamp(result.event_time), - id: MessageID { - vertex_name: config::get_vertex_name().to_string().into(), - offset: source_offset.to_string().into(), - index: 0, - }, - headers: result.headers, - }) - } -} - -/// Convert [`Message`] to [`proto::SinkRequest`] -impl From for SinkRequest { - fn from(message: Message) -> Self { - Self { - request: Some(Request { - keys: message.keys.to_vec(), - value: message.value.to_vec(), - event_time: prost_timestamp_from_utc(message.event_time), - watermark: None, - id: message.id.to_string(), - headers: message.headers, - }), - status: None, - handshake: None, - } - } -} - -/// Sink's status for each [Message] written to Sink. -#[derive(PartialEq, Debug)] -pub(crate) enum ResponseStatusFromSink { - /// Successfully wrote to the Sink. - Success, - /// Failed with error message. - Failed(String), - /// Write to FallBack Sink. - Fallback, -} - -/// Sink will give a response per [Message]. -#[derive(Debug, PartialEq)] -pub(crate) struct ResponseFromSink { - /// Unique id per [Message]. We need to track per [Message] status. - pub(crate) id: String, - /// Status of the "sink" operation per [Message]. - pub(crate) status: ResponseStatusFromSink, -} - -impl From for sink_response::Result { - fn from(value: ResponseFromSink) -> Self { - let (status, err_msg) = match value.status { - ResponseStatusFromSink::Success => (Success, "".to_string()), - ResponseStatusFromSink::Failed(err) => (Failure, err.to_string()), - ResponseStatusFromSink::Fallback => (Fallback, "".to_string()), - }; - - Self { - id: value.id, - status: status as i32, - err_msg, - } - } -} - -impl From for ResponseFromSink { - fn from(value: sink_response::Result) -> Self { - let status = match value.status() { - Success => ResponseStatusFromSink::Success, - Failure => ResponseStatusFromSink::Failed(value.err_msg), - Fallback => ResponseStatusFromSink::Fallback, - }; - Self { - id: value.id, - status, - } - } -} - #[cfg(test)] mod tests { - use std::collections::HashMap; - + use crate::error::Result; use chrono::TimeZone; - use numaflow_pb::clients::sink::sink_response::Result as SinkResult; - use numaflow_pb::clients::sink::SinkResponse; - use numaflow_pb::clients::source::Offset as SourceOffset; use numaflow_pb::objects::isb::{ Body, Header, Message as ProtoMessage, MessageId, MessageInfo, }; + use std::collections::HashMap; use super::*; @@ -530,116 +327,6 @@ mod tests { ); } - #[test] - fn test_message_to_source_transform_request() { - let message = Message { - keys: Arc::from(vec!["key1".to_string()]), - tags: None, - value: vec![1, 2, 3].into(), - offset: Some(Offset::String(StringOffset { - offset: "123".to_string().into(), - partition_idx: 0, - })), - event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), - id: MessageID { - vertex_name: "vertex".to_string().into(), - offset: "123".to_string().into(), - index: 0, - }, - headers: HashMap::new(), - }; - - let request: SourceTransformRequest = message.into(); - assert!(request.request.is_some()); - } - - #[test] - fn test_read_response_result_to_message() { - let result = read_response::Result { - payload: vec![1, 2, 3], - offset: Some(SourceOffset { - offset: BASE64_STANDARD.encode("123").into_bytes(), - partition_id: 0, - }), - event_time: Some( - prost_timestamp_from_utc(Utc.timestamp_opt(1627846261, 0).unwrap()).unwrap(), - ), - keys: vec!["key1".to_string()], - headers: HashMap::new(), - }; - - let message: Result = result.try_into(); - assert!(message.is_ok()); - - let message = message.unwrap(); - assert_eq!(message.keys.to_vec(), vec!["key1".to_string()]); - assert_eq!(message.value, vec![1, 2, 3]); - assert_eq!( - message.event_time, - Utc.timestamp_opt(1627846261, 0).unwrap() - ); - } - - #[test] - fn test_message_to_sink_request() { - let message = Message { - keys: Arc::from(vec!["key1".to_string()]), - tags: None, - value: vec![1, 2, 3].into(), - offset: Some(Offset::String(StringOffset { - offset: "123".to_string().into(), - partition_idx: 0, - })), - event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), - id: MessageID { - vertex_name: "vertex".to_string().into(), - offset: "123".to_string().into(), - index: 0, - }, - headers: HashMap::new(), - }; - - let request: SinkRequest = message.into(); - assert!(request.request.is_some()); - } - - #[test] - fn test_response_from_sink_to_sink_response() { - let response = ResponseFromSink { - id: "123".to_string(), - status: ResponseStatusFromSink::Success, - }; - - let sink_result: sink_response::Result = response.into(); - assert_eq!(sink_result.status, Success as i32); - } - - #[test] - fn test_sink_response_to_response_from_sink() { - let sink_response = SinkResponse { - results: vec![SinkResult { - id: "123".to_string(), - status: Success as i32, - err_msg: "".to_string(), - }], - handshake: None, - status: None, - }; - - let results: Vec = sink_response - .results - .into_iter() - .map(Into::into) - .collect::>(); - assert!(!results.is_empty()); - - assert_eq!(results.get(0).unwrap().id, "123"); - assert_eq!( - results.get(0).unwrap().status, - ResponseStatusFromSink::Success - ); - } - #[test] fn test_message_id_from_proto() { let proto_id = MessageId { @@ -683,15 +370,5 @@ mod tests { let offset_string = Offset::String(string_offset); assert_eq!(format!("{}", offset_string), "42-1"); - - // Test conversion from Offset to AckRequest for StringOffset - let offset = Offset::String(StringOffset::new(BASE64_STANDARD.encode("42"), 1)); - let offset: Result = offset.try_into(); - assert_eq!(offset.unwrap().partition_id, 1); - - // Test conversion from Offset to AckRequest for IntOffset (should fail) - let offset = Offset::Int(IntOffset::new(42, 1)); - let result: Result = offset.try_into(); - assert!(result.is_err()); } } diff --git a/rust/numaflow-core/src/sink.rs b/rust/numaflow-core/src/sink.rs index 0b30f4c30..c60c57bd3 100644 --- a/rust/numaflow-core/src/sink.rs +++ b/rust/numaflow-core/src/sink.rs @@ -2,6 +2,8 @@ use std::collections::HashMap; use std::time::Duration; use numaflow_pb::clients::sink::sink_client::SinkClient; +use numaflow_pb::clients::sink::sink_response; +use numaflow_pb::clients::sink::Status::{Failure, Fallback, Success}; use tokio::sync::mpsc::Receiver; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinHandle; @@ -17,7 +19,7 @@ use user_defined::UserDefinedSink; use crate::config::components::sink::{OnFailureStrategy, RetryConfig}; use crate::config::is_mono_vertex; use crate::error::Error; -use crate::message::{Message, ResponseFromSink, ResponseStatusFromSink}; +use crate::message::Message; use crate::metrics::{ monovertex_metrics, mvtx_forward_metric_labels, pipeline_forward_metric_labels, pipeline_metrics, @@ -25,8 +27,16 @@ use crate::metrics::{ use crate::tracker::TrackerHandle; use crate::Result; +/// A [blackhole] sink which reads but never writes to anywhere, semantic equivalent of `/dev/null`. +/// +/// [Blackhole]: https://numaflow.numaproj.io/user-guide/sinks/blackhole/ mod blackhole; + +/// [log] sink prints out the read messages on to stdout. +/// +/// [Log]: https://numaflow.numaproj.io/user-guide/sinks/log/ mod log; + /// [User-Defined Sink] extends Numaflow to add custom sources supported outside the builtins. /// /// [User-Defined Sink]: https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/ @@ -645,6 +655,56 @@ impl SinkWriter { } } +/// Sink's status for each [Message] written to Sink. +#[derive(PartialEq, Debug)] +pub(crate) enum ResponseStatusFromSink { + /// Successfully wrote to the Sink. + Success, + /// Failed with error message. + Failed(String), + /// Write to FallBack Sink. + Fallback, +} + +/// Sink will give a response per [Message]. +#[derive(Debug, PartialEq)] +pub(crate) struct ResponseFromSink { + /// Unique id per [Message]. We need to track per [Message] status. + pub(crate) id: String, + /// Status of the "sink" operation per [Message]. + pub(crate) status: ResponseStatusFromSink, +} + +impl From for ResponseFromSink { + fn from(value: sink_response::Result) -> Self { + let status = match value.status() { + Success => ResponseStatusFromSink::Success, + Failure => ResponseStatusFromSink::Failed(value.err_msg), + Fallback => ResponseStatusFromSink::Fallback, + }; + Self { + id: value.id, + status, + } + } +} + +impl From for sink_response::Result { + fn from(value: ResponseFromSink) -> Self { + let (status, err_msg) = match value.status { + ResponseStatusFromSink::Success => (Success, "".to_string()), + ResponseStatusFromSink::Failed(err) => (Failure, err.to_string()), + ResponseStatusFromSink::Fallback => (Fallback, "".to_string()), + }; + + Self { + id: value.id, + status: status as i32, + err_msg, + } + } +} + impl Drop for SinkWriter { fn drop(&mut self) {} } @@ -653,15 +713,15 @@ impl Drop for SinkWriter { mod tests { use std::sync::Arc; - use chrono::Utc; + use super::*; + use crate::message::{Message, MessageID, Offset, ReadAck, StringOffset}; + use crate::shared::grpc::create_rpc_channel; + use chrono::{TimeZone, Utc}; use numaflow::sink; + use numaflow_pb::clients::sink::{SinkRequest, SinkResponse}; use tokio::time::Duration; use tokio_util::sync::CancellationToken; - use super::*; - use crate::message::{Message, MessageID, ReadAck}; - use crate::shared::grpc::create_rpc_channel; - struct SimpleSink; #[tonic::async_trait] impl sink::Sinker for SimpleSink { @@ -938,4 +998,64 @@ mod tests { // check if the tracker is empty assert!(tracker_handle.is_empty().await.unwrap()); } + + #[test] + fn test_message_to_sink_request() { + let message = Message { + keys: Arc::from(vec!["key1".to_string()]), + tags: None, + value: vec![1, 2, 3].into(), + offset: Some(Offset::String(StringOffset { + offset: "123".to_string().into(), + partition_idx: 0, + })), + event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), + id: MessageID { + vertex_name: "vertex".to_string().into(), + offset: "123".to_string().into(), + index: 0, + }, + headers: HashMap::new(), + }; + + let request: SinkRequest = message.into(); + assert!(request.request.is_some()); + } + + #[test] + fn test_response_from_sink_to_sink_response() { + let response = ResponseFromSink { + id: "123".to_string(), + status: ResponseStatusFromSink::Success, + }; + + let sink_result: sink_response::Result = response.into(); + assert_eq!(sink_result.status, Success as i32); + } + + #[test] + fn test_sink_response_to_response_from_sink() { + let sink_response = SinkResponse { + results: vec![sink_response::Result { + id: "123".to_string(), + status: Success as i32, + err_msg: "".to_string(), + }], + handshake: None, + status: None, + }; + + let results: Vec = sink_response + .results + .into_iter() + .map(Into::into) + .collect::>(); + assert!(!results.is_empty()); + + assert_eq!(results.first().unwrap().id, "123"); + assert_eq!( + results.first().unwrap().status, + ResponseStatusFromSink::Success + ); + } } diff --git a/rust/numaflow-core/src/sink/blackhole.rs b/rust/numaflow-core/src/sink/blackhole.rs index eb2f33136..308a59e35 100644 --- a/rust/numaflow-core/src/sink/blackhole.rs +++ b/rust/numaflow-core/src/sink/blackhole.rs @@ -1,5 +1,5 @@ -use super::Sink; -use crate::message::{Message, ResponseFromSink, ResponseStatusFromSink}; +use super::{ResponseFromSink, ResponseStatusFromSink, Sink}; +use crate::message::Message; /// Blackhole is a sink to emulate /dev/null pub struct BlackholeSink; @@ -25,8 +25,8 @@ mod tests { use super::BlackholeSink; use crate::message::IntOffset; - use crate::message::{Message, MessageID, Offset, ResponseFromSink, ResponseStatusFromSink}; - use crate::sink::Sink; + use crate::message::{Message, MessageID, Offset}; + use crate::sink::{ResponseFromSink, ResponseStatusFromSink, Sink}; #[tokio::test] async fn test_black_hole() { diff --git a/rust/numaflow-core/src/sink/log.rs b/rust/numaflow-core/src/sink/log.rs index 9ae426f1f..71bb37374 100644 --- a/rust/numaflow-core/src/sink/log.rs +++ b/rust/numaflow-core/src/sink/log.rs @@ -1,8 +1,5 @@ -use crate::sink::Sink; -use crate::{ - error, - message::{Message, ResponseFromSink, ResponseStatusFromSink}, -}; +use crate::sink::{ResponseFromSink, ResponseStatusFromSink, Sink}; +use crate::{error, message::Message}; pub(crate) struct LogSink; @@ -41,8 +38,8 @@ mod tests { use super::LogSink; use crate::message::IntOffset; - use crate::message::{Message, MessageID, Offset, ResponseFromSink, ResponseStatusFromSink}; - use crate::sink::Sink; + use crate::message::{Message, MessageID, Offset}; + use crate::sink::{ResponseFromSink, ResponseStatusFromSink, Sink}; #[tokio::test] async fn test_log_sink() { diff --git a/rust/numaflow-core/src/sink/user_defined.rs b/rust/numaflow-core/src/sink/user_defined.rs index efb9c1178..0bcb4c685 100644 --- a/rust/numaflow-core/src/sink/user_defined.rs +++ b/rust/numaflow-core/src/sink/user_defined.rs @@ -6,8 +6,9 @@ use tonic::transport::Channel; use tonic::{Request, Streaming}; use tracing::error; -use crate::message::{Message, ResponseFromSink}; -use crate::sink::Sink; +use crate::message::Message; +use crate::shared::grpc::prost_timestamp_from_utc; +use crate::sink::{ResponseFromSink, Sink}; use crate::Error; use crate::Result; @@ -19,6 +20,24 @@ pub struct UserDefinedSink { resp_stream: Streaming, } +/// Convert [`Message`] to [`proto::SinkRequest`] +impl From for SinkRequest { + fn from(message: Message) -> Self { + Self { + request: Some(numaflow_pb::clients::sink::sink_request::Request { + keys: message.keys.to_vec(), + value: message.value.to_vec(), + event_time: prost_timestamp_from_utc(message.event_time), + watermark: None, + id: message.id.to_string(), + headers: message.headers, + }), + status: None, + handshake: None, + } + } +} + impl UserDefinedSink { pub(crate) async fn new(mut client: SinkClient) -> Result { let (sink_tx, sink_rx) = mpsc::channel(DEFAULT_CHANNEL_SIZE); diff --git a/rust/numaflow-core/src/source/user_defined.rs b/rust/numaflow-core/src/source/user_defined.rs index e5717c12a..5f274119b 100644 --- a/rust/numaflow-core/src/source/user_defined.rs +++ b/rust/numaflow-core/src/source/user_defined.rs @@ -1,19 +1,22 @@ -use std::time::Duration; - +use base64::prelude::BASE64_STANDARD; +use base64::Engine; use numaflow_pb::clients::source; use numaflow_pb::clients::source::source_client::SourceClient; use numaflow_pb::clients::source::{ - read_request, AckRequest, AckResponse, ReadRequest, ReadResponse, + read_request, read_response, AckRequest, AckResponse, ReadRequest, ReadResponse, }; +use std::sync::Arc; +use std::time::Duration; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Channel; use tonic::{Request, Streaming}; -use crate::message::{Message, Offset}; +use crate::message::{Message, MessageID, Offset, StringOffset}; use crate::reader::LagReader; +use crate::shared::grpc::utc_from_timestamp; use crate::source::{SourceAcker, SourceReader}; -use crate::{Error, Result}; +use crate::{config, Error, Result}; /// User-Defined Source to operative on custom sources. #[derive(Debug)] @@ -100,6 +103,51 @@ impl UserDefinedSourceRead { } } +/// Convert [`read_response::Result`] to [`Message`] +impl TryFrom for Message { + type Error = Error; + + fn try_from(result: read_response::Result) -> Result { + let source_offset = match result.offset { + Some(o) => Offset::String(StringOffset { + offset: BASE64_STANDARD.encode(o.offset).into(), + partition_idx: o.partition_id as u16, + }), + None => return Err(Error::Source("Offset not found".to_string())), + }; + + Ok(Message { + keys: Arc::from(result.keys), + tags: None, + value: result.payload.into(), + offset: Some(source_offset.clone()), + event_time: utc_from_timestamp(result.event_time), + id: MessageID { + vertex_name: config::get_vertex_name().to_string().into(), + offset: source_offset.to_string().into(), + index: 0, + }, + headers: result.headers, + }) + } +} + +impl TryFrom for source::Offset { + type Error = Error; + + fn try_from(offset: Offset) -> std::result::Result { + match offset { + Offset::Int(_) => Err(Error::Source("IntOffset not supported".to_string())), + Offset::String(o) => Ok(numaflow_pb::clients::source::Offset { + offset: BASE64_STANDARD + .decode(o.offset) + .expect("we control the encoding, so this should never fail"), + partition_id: o.partition_idx as i32, + }), + } + } +} + impl SourceReader for UserDefinedSourceRead { fn name(&self) -> &'static str { "user-defined-source" @@ -233,17 +281,17 @@ impl LagReader for UserDefinedSourceLagReader { #[cfg(test)] mod tests { - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; - use chrono::Utc; + use super::*; + use crate::message::IntOffset; + use crate::shared::grpc::{create_rpc_channel, prost_timestamp_from_utc}; + use chrono::{TimeZone, Utc}; use numaflow::source; use numaflow::source::{Message, Offset, SourceReadRequest}; use numaflow_pb::clients::source::source_client::SourceClient; use tokio::sync::mpsc::Sender; - use super::*; - use crate::shared::grpc::create_rpc_channel; - struct SimpleSource { num: usize, yet_to_ack: std::sync::RwLock>, @@ -353,4 +401,45 @@ mod tests { .expect("failed to send shutdown signal"); server_handle.await.expect("failed to join server task"); } + + #[test] + fn test_read_response_result_to_message() { + let result = read_response::Result { + payload: vec![1, 2, 3], + offset: Some(numaflow_pb::clients::source::Offset { + offset: BASE64_STANDARD.encode("123").into_bytes(), + partition_id: 0, + }), + event_time: Some( + prost_timestamp_from_utc(Utc.timestamp_opt(1627846261, 0).unwrap()).unwrap(), + ), + keys: vec!["key1".to_string()], + headers: HashMap::new(), + }; + + let message: Result = result.try_into(); + assert!(message.is_ok()); + + let message = message.unwrap(); + assert_eq!(message.keys.to_vec(), vec!["key1".to_string()]); + assert_eq!(message.value, vec![1, 2, 3]); + assert_eq!( + message.event_time, + Utc.timestamp_opt(1627846261, 0).unwrap() + ); + } + + #[test] + fn test_offset_conversion() { + // Test conversion from Offset to AckRequest for StringOffset + let offset = + crate::message::Offset::String(StringOffset::new(BASE64_STANDARD.encode("42"), 1)); + let offset: Result = offset.try_into(); + assert_eq!(offset.unwrap().partition_id, 1); + + // Test conversion from Offset to AckRequest for IntOffset (should fail) + let offset = crate::message::Offset::Int(IntOffset::new(42, 1)); + let result: Result = offset.try_into(); + assert!(result.is_err()); + } } diff --git a/rust/numaflow-core/src/transformer/user_defined.rs b/rust/numaflow-core/src/transformer/user_defined.rs index 398d5a4bc..78518e4c0 100644 --- a/rust/numaflow-core/src/transformer/user_defined.rs +++ b/rust/numaflow-core/src/transformer/user_defined.rs @@ -13,7 +13,7 @@ use tonic::{Request, Streaming}; use crate::config::get_vertex_name; use crate::error::{Error, Result}; use crate::message::{Message, MessageID, Offset}; -use crate::shared::grpc::utc_from_timestamp; +use crate::shared::grpc::{prost_timestamp_from_utc, utc_from_timestamp}; type ResponseSenderMap = Arc>>)>>>; @@ -38,6 +38,26 @@ impl Drop for UserDefinedTransformer { } } +/// Convert the [`Message`] to [`SourceTransformRequest`] +impl From for SourceTransformRequest { + fn from(message: Message) -> Self { + Self { + request: Some(sourcetransformer::source_transform_request::Request { + id: message + .offset + .expect("offset should be present") + .to_string(), + keys: message.keys.to_vec(), + value: message.value.to_vec(), + event_time: prost_timestamp_from_utc(message.event_time), + watermark: None, + headers: message.headers, + }), + handshake: None, + } + } +} + impl UserDefinedTransformer { /// Performs handshake with the server and creates a new UserDefinedTransformer. pub(super) async fn new( @@ -164,17 +184,16 @@ impl UserDefinedTransformer { #[cfg(test)] mod tests { + use super::*; + use crate::message::StringOffset; + use crate::shared::grpc::create_rpc_channel; + use chrono::{TimeZone, Utc}; + use numaflow::sourcetransform; use std::error::Error; - use std::sync::Arc; + use std::result::Result; use std::time::Duration; - - use numaflow::sourcetransform; - use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient; use tempfile::TempDir; - use crate::message::{MessageID, StringOffset}; - use crate::shared::grpc::create_rpc_channel; - use crate::transformer::user_defined::UserDefinedTransformer; struct NowCat; #[tonic::async_trait] @@ -258,4 +277,27 @@ mod tests { ); Ok(()) } + + #[test] + fn test_message_to_source_transform_request() { + let message = Message { + keys: Arc::from(vec!["key1".to_string()]), + tags: None, + value: vec![1, 2, 3].into(), + offset: Some(Offset::String(StringOffset { + offset: "123".to_string().into(), + partition_idx: 0, + })), + event_time: Utc.timestamp_opt(1627846261, 0).unwrap(), + id: MessageID { + vertex_name: "vertex".to_string().into(), + offset: "123".to_string().into(), + index: 0, + }, + headers: HashMap::new(), + }; + + let request: SourceTransformRequest = message.into(); + assert!(request.request.is_some()); + } }