Skip to content

Commit

Permalink
feat: Implement Sourcer traits for serving source (#2301)
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
Signed-off-by: Vigith Maurice <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
BulkBeing and vigith authored Jan 6, 2025
1 parent 0c86e82 commit 8d8340c
Show file tree
Hide file tree
Showing 19 changed files with 1,048 additions and 597 deletions.
212 changes: 159 additions & 53 deletions rust/Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ numaflow-core = { path = "numaflow-core" }
numaflow-models = { path = "numaflow-models" }
backoff = { path = "backoff" }
numaflow-pb = { path = "numaflow-pb" }
numaflow-pulsar = {path = "extns/numaflow-pulsar"}
numaflow-pulsar = { path = "extns/numaflow-pulsar" }
tokio = "1.41.1"
bytes = "1.7.1"
tracing = "0.1.40"
axum = "0.7.5"
axum-server = { version = "0.7.1", features = ["tls-rustls"] }
serde = { version = "1.0.204", features = ["derive"] }
rustls = { version = "0.23.12", features = ["aws_lc_rs"] }
reqwest = "0.12.12"
11 changes: 7 additions & 4 deletions rust/numaflow-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ serving.workspace = true
backoff.workspace = true
axum.workspace = true
axum-server.workspace = true
bytes.workspace = true
serde.workspace = true
rustls.workspace = true
tonic = "0.12.3"
bytes = "1.7.1"
thiserror = "2.0.3"
tokio-util = "0.7.11"
tokio-stream = "0.1.15"
Expand All @@ -35,8 +37,6 @@ tower = "0.4.13"
serde_json = "1.0.122"
trait-variant = "0.1.2"
rcgen = "0.13.1"
rustls = { version = "0.23.12", features = ["aws_lc_rs"] }
serde = { version = "1.0.204", features = ["derive"] }
semver = "1.0"
pep440_rs = "0.6.6"
parking_lot = "0.12.3"
Expand All @@ -50,6 +50,9 @@ async-nats = "0.38.0"
[dev-dependencies]
tempfile = "3.11.0"
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "9ca9362ad511084501520e5a37d40cdcd0cdc9d9" }
pulsar = { version = "6.3.0", default-features = false, features = ["tokio-rustls-runtime"] }
pulsar = { version = "6.3.0", default-features = false, features = [
"tokio-rustls-runtime",
] }
reqwest = { workspace = true, features = ["json"] }

[build-dependencies]
16 changes: 10 additions & 6 deletions rust/numaflow-core/src/config/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub(crate) mod source {

use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use std::{fmt::Debug, time::Duration};

use bytes::Bytes;
Expand Down Expand Up @@ -37,7 +38,9 @@ pub(crate) mod source {
Generator(GeneratorConfig),
UserDefined(UserDefinedConfig),
Pulsar(PulsarSourceConfig),
Serving(serving::Settings),
// Serving source starts an Axum HTTP server in the background.
// The settings will be used as application state which gets cloned in each handler on each request.
Serving(Arc<serving::Settings>),
}

impl From<Box<GeneratorSource>> for SourceType {
Expand Down Expand Up @@ -110,10 +113,7 @@ pub(crate) mod source {
// There should be only one option (user-defined) to define the settings.
fn try_from(cfg: Box<numaflow_models::models::ServingSource>) -> Result<Self> {
let env_vars = env::vars().collect::<HashMap<String, String>>();

let mut settings: serving::Settings = env_vars
.try_into()
.map_err(|e: serving::Error| Error::Config(e.to_string()))?;
let mut settings: serving::Settings = env_vars.try_into()?;

settings.tid_header = cfg.msg_id_header_key;

Expand Down Expand Up @@ -148,7 +148,7 @@ pub(crate) mod source {
}
settings.redis.addr = cfg.store.url;

Ok(SourceType::Serving(settings))
Ok(SourceType::Serving(Arc::new(settings)))
}
}

Expand All @@ -168,6 +168,10 @@ pub(crate) mod source {
return pulsar.try_into();
}

if let Some(serving) = source.serving.take() {
return serving.try_into();
}

Err(Error::Config(format!("Invalid source type: {source:?}")))
}
}
Expand Down
8 changes: 4 additions & 4 deletions rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub(crate) struct Message {
}

/// Offset of the message which will be used to acknowledge the message.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) enum Offset {
Int(IntOffset),
String(StringOffset),
Expand All @@ -62,7 +62,7 @@ impl Message {
}

/// IntOffset is integer based offset enum type.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct IntOffset {
pub(crate) offset: u64,
pub(crate) partition_idx: u16,
Expand All @@ -84,7 +84,7 @@ impl fmt::Display for IntOffset {
}

/// StringOffset is string based offset enum type.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct StringOffset {
/// offset could be a complex base64 string.
pub(crate) offset: Bytes,
Expand Down Expand Up @@ -120,7 +120,7 @@ pub(crate) enum ReadAck {
}

/// Message ID which is used to uniquely identify a message. It cheap to clone this.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub(crate) struct MessageID {
pub(crate) vertex_name: Bytes,
pub(crate) offset: Bytes,
Expand Down
2 changes: 0 additions & 2 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,8 +600,6 @@ pub(crate) async fn start_metrics_https_server(
addr: SocketAddr,
metrics_state: UserDefinedContainerState,
) -> crate::Result<()> {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

// Generate a self-signed certificate
let CertifiedKey { cert, key_pair } = generate_simple_self_signed(vec!["localhost".into()])
.map_err(|e| Error::Metrics(format!("Generating self-signed certificate: {}", e)))?;
Expand Down
22 changes: 20 additions & 2 deletions rust/numaflow-core/src/shared/create_components.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use std::sync::Arc;
use std::time::Duration;

use numaflow_pb::clients::map::map_client::MapClient;
use numaflow_pb::clients::sink::sink_client::SinkClient;
use numaflow_pb::clients::source::source_client::SourceClient;
use numaflow_pb::clients::sourcetransformer::source_transform_client::SourceTransformClient;
use serving::ServingSource;
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;

use crate::config::components::sink::{SinkConfig, SinkType};
use crate::config::components::source::{SourceConfig, SourceType};
use crate::config::components::transformer::TransformerConfig;
use crate::config::get_vertex_replica;
use crate::config::pipeline::map::{MapMode, MapType, MapVtxConfig};
use crate::config::pipeline::{DEFAULT_BATCH_MAP_SOCKET, DEFAULT_STREAM_MAP_SOCKET};
use crate::error::Error;
Expand Down Expand Up @@ -334,8 +337,23 @@ pub async fn create_source(
None,
))
}
SourceType::Serving(_) => {
unimplemented!("Serving as built-in source is not yet implemented")
SourceType::Serving(config) => {
let serving = ServingSource::new(
Arc::clone(config),
batch_size,
read_timeout,
*get_vertex_replica(),
)
.await?;
Ok((
Source::new(
batch_size,
source::SourceType::Serving(serving),
tracker_handle,
source_config.read_ahead,
),
None,
))
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions rust/numaflow-core/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub(crate) mod generator;
/// [Pulsar]: https://numaflow.numaproj.io/user-guide/sources/pulsar/
pub(crate) mod pulsar;

pub(crate) mod serving;
use serving::ServingSource;

/// Set of Read related items that has to be implemented to become a Source.
pub(crate) trait SourceReader {
#[allow(dead_code)]
Expand Down Expand Up @@ -68,6 +71,7 @@ pub(crate) enum SourceType {
generator::GeneratorLagReader,
),
Pulsar(PulsarSource),
Serving(ServingSource),
}

enum ActorMessage {
Expand Down Expand Up @@ -182,6 +186,13 @@ impl Source {
actor.run().await;
});
}
SourceType::Serving(serving) => {
tokio::spawn(async move {
let actor =
SourceActor::new(receiver, serving.clone(), serving.clone(), serving);
actor.run().await;
});
}
};
Self {
read_batch_size: batch_size,
Expand Down
Loading

0 comments on commit 8d8340c

Please sign in to comment.