Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Asynchronous Map Implementation for Pipeline #2295

Merged
merged 17 commits into from
Dec 24, 2024
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func (in UDF) getContainers(req getContainerReq) ([]corev1.Container, []corev1.C

func (in UDF) getMainContainer(req getContainerReq) corev1.Container {
if in.GroupBy == nil {
if req.executeRustBinary {
return containerBuilder{}.init(req).command(NumaflowRustBinary).args("processor", "--type="+string(VertexTypeMapUDF), "--isbsvc-type="+string(req.isbSvcType), "--rust").build()
}
args := []string{"processor", "--type=" + string(VertexTypeMapUDF), "--isbsvc-type=" + string(req.isbSvcType)}
return containerBuilder{}.
init(req).args(args...).build()
Expand Down
24 changes: 23 additions & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/numaflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async-nats = "0.38.0"

[dev-dependencies]
tempfile = "3.11.0"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "ddd879588e11455921f1ca958ea2b3c076689293" }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "9ca9362ad511084501520e5a37d40cdcd0cdc9d9" }
pulsar = { version = "6.3.0", default-features = false, features = ["tokio-rustls-runtime"] }

[build-dependencies]
221 changes: 216 additions & 5 deletions rust/numaflow-core/src/config/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use crate::config::components::source::SourceConfig;
use crate::config::components::transformer::{TransformerConfig, TransformerType};
use crate::config::get_vertex_replica;
use crate::config::pipeline::isb::{BufferReaderConfig, BufferWriterConfig};
use crate::config::pipeline::map::MapMode;
use crate::config::pipeline::map::MapVtxConfig;
use crate::error::Error;
use crate::Result;

Expand All @@ -23,6 +25,11 @@ const DEFAULT_LOOKBACK_WINDOW_IN_SECS: u16 = 120;
const ENV_NUMAFLOW_SERVING_JETSTREAM_URL: &str = "NUMAFLOW_ISBSVC_JETSTREAM_URL";
const ENV_NUMAFLOW_SERVING_JETSTREAM_USER: &str = "NUMAFLOW_ISBSVC_JETSTREAM_USER";
const ENV_NUMAFLOW_SERVING_JETSTREAM_PASSWORD: &str = "NUMAFLOW_ISBSVC_JETSTREAM_PASSWORD";
const DEFAULT_GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; // 64 MB
const DEFAULT_MAP_SOCKET: &str = "/var/run/numaflow/map.sock";
pub(crate) const DEFAULT_BATCH_MAP_SOCKET: &str = "/var/run/numaflow/batchmap.sock";
pub(crate) const DEFAULT_STREAM_MAP_SOCKET: &str = "/var/run/numaflow/mapstream.sock";
const DEFAULT_MAP_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";

pub(crate) mod isb;

Expand Down Expand Up @@ -69,6 +76,84 @@ pub(crate) struct SourceVtxConfig {
pub(crate) transformer_config: Option<TransformerConfig>,
}

pub(crate) mod map {
use std::collections::HashMap;

use numaflow_models::models::Udf;

use crate::config::pipeline::{
DEFAULT_GRPC_MAX_MESSAGE_SIZE, DEFAULT_MAP_SERVER_INFO_FILE, DEFAULT_MAP_SOCKET,
};
use crate::error::Error;

/// A map can be run in different modes.
#[derive(Debug, Clone, PartialEq)]
pub enum MapMode {
Unary,
Batch,
Stream,
}

impl MapMode {
pub(crate) fn from_str(s: &str) -> Option<MapMode> {
match s {
"unary-map" => Some(MapMode::Unary),
"stream-map" => Some(MapMode::Stream),
"batch-map" => Some(MapMode::Batch),
_ => None,
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct MapVtxConfig {
pub(crate) concurrency: usize,
pub(crate) map_type: MapType,
pub(crate) map_mode: MapMode,
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) enum MapType {
UserDefined(UserDefinedConfig),
Builtin(BuiltinConfig),
}

impl TryFrom<Box<Udf>> for MapType {
type Error = Error;
fn try_from(udf: Box<Udf>) -> std::result::Result<Self, Self::Error> {
if let Some(builtin) = udf.builtin {
Ok(MapType::Builtin(BuiltinConfig {
name: builtin.name,
kwargs: builtin.kwargs,
args: builtin.args,
}))
} else if let Some(_container) = udf.container {
Ok(MapType::UserDefined(UserDefinedConfig {
grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE,
socket_path: DEFAULT_MAP_SOCKET.to_string(),
server_info_path: DEFAULT_MAP_SERVER_INFO_FILE.to_string(),
}))
} else {
Err(Error::Config("Invalid UDF".to_string()))
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct UserDefinedConfig {
pub grpc_max_message_size: usize,
pub socket_path: String,
pub server_info_path: String,
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct BuiltinConfig {
pub(crate) name: String,
pub(crate) kwargs: Option<HashMap<String, String>>,
pub(crate) args: Option<Vec<String>>,
}
}

#[derive(Debug, Clone, PartialEq)]
pub(crate) struct SinkVtxConfig {
pub(crate) sink_config: SinkConfig,
Expand All @@ -79,13 +164,15 @@ pub(crate) struct SinkVtxConfig {
pub(crate) enum VertexType {
Source(SourceVtxConfig),
Sink(SinkVtxConfig),
Map(MapVtxConfig),
}

impl std::fmt::Display for VertexType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
match self {
VertexType::Source(_) => write!(f, "Source"),
VertexType::Sink(_) => write!(f, "Sink"),
VertexType::Map(_) => write!(f, "Map"),
}
}
}
Expand Down Expand Up @@ -182,6 +269,12 @@ impl PipelineConfig {
},
fb_sink_config,
})
} else if let Some(map) = vertex_obj.spec.udf {
VertexType::Map(MapVtxConfig {
concurrency: batch_size as usize,
map_type: map.try_into()?,
map_mode: MapMode::Unary,
})
} else {
return Err(Error::Config(
"Only source and sink are supported ATM".to_string(),
Expand Down Expand Up @@ -283,7 +376,7 @@ impl PipelineConfig {
Ok(PipelineConfig {
batch_size: batch_size as usize,
paf_concurrency: env::var("PAF_BATCH_SIZE")
.unwrap_or("30000".to_string())
.unwrap_or((DEFAULT_BATCH_SIZE * 2).to_string())
.parse()
.unwrap(),
read_timeout: Duration::from_millis(timeout_in_ms as u64),
Expand All @@ -301,11 +394,13 @@ impl PipelineConfig {

#[cfg(test)]
mod tests {
use numaflow_models::models::{Container, Function, Udf};
use numaflow_pulsar::source::PulsarSourceConfig;

use super::*;
use crate::config::components::sink::{BlackholeConfig, LogConfig, SinkType};
use crate::config::components::source::{GeneratorConfig, SourceType};
use crate::config::pipeline::map::{MapType, UserDefinedConfig};

#[test]
fn test_default_pipeline_config() {
Expand Down Expand Up @@ -360,7 +455,7 @@ mod tests {
vertex_name: "out".to_string(),
replica: 0,
batch_size: 500,
paf_concurrency: 30000,
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
Expand All @@ -371,7 +466,7 @@ mod tests {
name: "in".to_string(),
reader_config: BufferReaderConfig {
partitions: 1,
streams: vec![("default-simple-pipeline-out-0".into(), 0)],
streams: vec![("default-simple-pipeline-out-0", 0)],
wip_ack_interval: Duration::from_secs(1),
},
partitions: 0,
Expand Down Expand Up @@ -407,7 +502,7 @@ mod tests {
vertex_name: "in".to_string(),
replica: 0,
batch_size: 1000,
paf_concurrency: 30000,
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
Expand Down Expand Up @@ -460,7 +555,7 @@ mod tests {
vertex_name: "in".to_string(),
replica: 0,
batch_size: 50,
paf_concurrency: 30000,
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
Expand Down Expand Up @@ -498,4 +593,120 @@ mod tests {

assert_eq!(pipeline_config, expected);
}

#[test]
fn test_map_vertex_config_user_defined() {
let udf = Udf {
builtin: None,
container: Some(Box::from(Container {
args: None,
command: None,
env: None,
env_from: None,
image: None,
image_pull_policy: None,
liveness_probe: None,
ports: None,
readiness_probe: None,
resources: None,
security_context: None,
volume_mounts: None,
})),
group_by: None,
};

let map_type = MapType::try_from(Box::new(udf)).unwrap();
assert!(matches!(map_type, MapType::UserDefined(_)));

let map_vtx_config = MapVtxConfig {
concurrency: 10,
map_type,
map_mode: MapMode::Unary,
};

assert_eq!(map_vtx_config.concurrency, 10);
if let MapType::UserDefined(config) = map_vtx_config.map_type {
assert_eq!(config.grpc_max_message_size, DEFAULT_GRPC_MAX_MESSAGE_SIZE);
assert_eq!(config.socket_path, DEFAULT_MAP_SOCKET);
assert_eq!(config.server_info_path, DEFAULT_MAP_SERVER_INFO_FILE);
} else {
panic!("Expected UserDefined map type");
}
}

#[test]
fn test_map_vertex_config_builtin() {
let udf = Udf {
builtin: Some(Box::from(Function {
args: None,
kwargs: None,
name: "cat".to_string(),
})),
container: None,
group_by: None,
};

let map_type = MapType::try_from(Box::new(udf)).unwrap();
assert!(matches!(map_type, MapType::Builtin(_)));

let map_vtx_config = MapVtxConfig {
concurrency: 5,
map_type,
map_mode: MapMode::Unary,
};

assert_eq!(map_vtx_config.concurrency, 5);
if let MapType::Builtin(config) = map_vtx_config.map_type {
assert_eq!(config.name, "cat");
assert!(config.kwargs.is_none());
assert!(config.args.is_none());
} else {
panic!("Expected Builtin map type");
}
}

#[test]
fn test_pipeline_config_load_map_vertex() {
let pipeline_cfg_base64 = "eyJtZXRhZGF0YSI6eyJuYW1lIjoic2ltcGxlLXBpcGVsaW5lLW1hcCIsIm5hbWVzcGFjZSI6ImRlZmF1bHQiLCJjcmVhdGlvblRpbWVzdGFtcCI6bnVsbH0sInNwZWMiOnsibmFtZSI6Im1hcCIsInVkZiI6eyJjb250YWluZXIiOnsidGVtcGxhdGUiOiJkZWZhdWx0In19LCJsaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJzY2FsZSI6eyJtaW4iOjF9LCJwaXBlbGluZU5hbWUiOiJzaW1wbGUtcGlwZWxpbmUiLCJpbnRlclN0ZXBCdWZmZXJTZXJ2aWNlTmFtZSI6IiIsInJlcGxpY2FzIjowLCJmcm9tRWRnZXMiOlt7ImZyb20iOiJpbiIsInRvIjoibWFwIiwiY29uZGl0aW9ucyI6bnVsbCwiZnJvbVZlcnRleFR5cGUiOiJTb3VyY2UiLCJmcm9tVmVydGV4UGFydGl0aW9uQ291bnQiOjEsImZyb21WZXJ0ZXhMaW1pdHMiOnsicmVhZEJhdGNoU2l6ZSI6NTAwLCJyZWFkVGltZW91dCI6IjFzIiwiYnVmZmVyTWF4TGVuZ3RoIjozMDAwMCwiYnVmZmVyVXNhZ2VMaW1pdCI6ODB9LCJ0b1ZlcnRleFR5cGUiOiJNYXAiLCJ0b1ZlcnRleFBhcnRpdGlvbkNvdW50IjoxLCJ0b1ZlcnRleExpbWl0cyI6eyJyZWFkQmF0Y2hTaXplIjo1MDAsInJlYWRUaW1lb3V0IjoiMXMiLCJidWZmZXJNYXhMZW5ndGgiOjMwMDAwLCJidWZmZXJVc2FnZUxpbWl0Ijo4MH19XSwid2F0ZXJtYXJrIjp7Im1heERlbGF5IjoiMHMifX0sInN0YXR1cyI6eyJwaGFzZSI6IiIsInJlcGxpY2FzIjowLCJkZXNpcmVkUmVwbGljYXMiOjAsImxhc3RTY2FsZWRBdCI6bnVsbH19";

let env_vars = [("NUMAFLOW_ISBSVC_JETSTREAM_URL", "localhost:4222")];
let pipeline_config =
PipelineConfig::load(pipeline_cfg_base64.to_string(), env_vars).unwrap();

let expected = PipelineConfig {
pipeline_name: "simple-pipeline".to_string(),
vertex_name: "map".to_string(),
replica: 0,
batch_size: 500,
paf_concurrency: 1000,
read_timeout: Duration::from_secs(1),
js_client_config: isb::jetstream::ClientConfig {
url: "localhost:4222".to_string(),
user: None,
password: None,
},
from_vertex_config: vec![FromVertexConfig {
name: "in".to_string(),
reader_config: BufferReaderConfig {
partitions: 1,
streams: vec![("default-simple-pipeline-map-0", 0)],
wip_ack_interval: Duration::from_secs(1),
},
partitions: 0,
}],
to_vertex_config: vec![],
vertex_config: VertexType::Map(MapVtxConfig {
concurrency: 500,
map_type: MapType::UserDefined(UserDefinedConfig {
grpc_max_message_size: DEFAULT_GRPC_MAX_MESSAGE_SIZE,
socket_path: DEFAULT_MAP_SOCKET.to_string(),
server_info_path: DEFAULT_MAP_SERVER_INFO_FILE.to_string(),
}),
map_mode: MapMode::Unary,
}),
metrics_config: MetricsConfig::default(),
};

assert_eq!(pipeline_config, expected);
}
}
3 changes: 3 additions & 0 deletions rust/numaflow-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub enum Error {
#[error("Transformer Error - {0}")]
Transformer(String),

#[error("Mapper Error - {0}")]
Mapper(String),

#[error("Forwarder Error - {0}")]
Forwarder(String),

Expand Down
3 changes: 3 additions & 0 deletions rust/numaflow-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ mod pipeline;
/// Tracker to track the completeness of message processing.
mod tracker;

/// Map is a feature that allows users to execute custom code to transform their data.
mod mapper;

pub async fn run() -> Result<()> {
let cln_token = CancellationToken::new();
let shutdown_cln_token = cln_token.clone();
Expand Down
Loading
Loading