From ddbf1f37fffd20e2a19d271240361117eea08c53 Mon Sep 17 00:00:00 2001 From: Yufan Song <33971064+yufansong@users.noreply.github.com> Date: Wed, 13 Sep 2023 22:19:52 -0700 Subject: [PATCH] feat(stream): support set nats consumer deliver policy as latest, earliest, by timestamp (#12176) --- Cargo.lock | 1 + src/connector/Cargo.toml | 2 + src/connector/src/common.rs | 44 +++++++++-------- .../src/source/nats/enumerator/mod.rs | 14 +++--- src/connector/src/source/nats/mod.rs | 9 ++++ .../src/source/nats/source/message.rs | 30 +++++++++--- .../src/source/nats/source/reader.rs | 48 +++++++++++++++---- src/connector/src/source/nats/split.rs | 26 +++++++--- 8 files changed, 128 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a6321dddca33..c6fb77929178f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6902,6 +6902,7 @@ dependencies = [ "simd-json", "tempfile", "thiserror", + "time 0.3.28", "tokio-retry", "tokio-stream", "tokio-util", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index eae769670b268..bd40eb87bf046 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -94,6 +94,7 @@ serde_with = { version = "3", features = ["json"] } simd-json = "0.10.6" tempfile = "3" thiserror = "1" +time = "0.3.28" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", @@ -110,6 +111,7 @@ tonic = { workspace = true } tracing = "0.1" url = "2" urlencoding = "2" + [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 477e9dbc95d08..7a703e7db8a20 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -26,11 +26,12 @@ use risingwave_common::error::anyhow_error; use serde_derive::{Deserialize, Serialize}; use serde_with::json::JsonString; use serde_with::{serde_as, DisplayFromStr}; +use time::OffsetDateTime; use crate::aws_auth::AwsAuthProps; use crate::deserialize_duration_from_string; use crate::sink::SinkError; - +use crate::source::nats::source::NatsOffset; // The file describes the common abstractions for each connector and can be used in both source and // sink. @@ -425,8 +426,8 @@ impl NatsCommon { pub(crate) async fn build_consumer( &self, - split_id: i32, - start_sequence: Option, + split_id: String, + start_sequence: NatsOffset, ) -> anyhow::Result< async_nats::jetstream::consumer::Consumer, > { @@ -437,23 +438,28 @@ impl NatsCommon { ack_policy: jetstream::consumer::AckPolicy::None, ..Default::default() }; - match start_sequence { - Some(v) => { - let consumer = stream - .get_or_create_consumer(&name, { - config.deliver_policy = DeliverPolicy::ByStartSequence { - start_sequence: v + 1, - }; - config - }) - .await?; - Ok(consumer) - } - None => { - let consumer = stream.get_or_create_consumer(&name, config).await?; - Ok(consumer) + + let deliver_policy = match start_sequence { + NatsOffset::Earliest => DeliverPolicy::All, + NatsOffset::Latest => DeliverPolicy::Last, + NatsOffset::SequenceNumber(v) => { + let parsed = v.parse::()?; + DeliverPolicy::ByStartSequence { + start_sequence: 1 + parsed, + } } - } + NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime { + start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)?, + }, + NatsOffset::None => DeliverPolicy::All, + }; + let consumer = stream + .get_or_create_consumer(&name, { + config.deliver_policy = deliver_policy; + config + }) + .await?; + Ok(consumer) } pub(crate) async fn build_or_get_stream( diff --git a/src/connector/src/source/nats/enumerator/mod.rs b/src/connector/src/source/nats/enumerator/mod.rs index 88384bfb685e6..e987a45188114 100644 --- a/src/connector/src/source/nats/enumerator/mod.rs +++ b/src/connector/src/source/nats/enumerator/mod.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use anyhow; use async_trait::async_trait; -use super::source::NatsSplit; +use super::source::{NatsOffset, NatsSplit}; use super::NatsProperties; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId}; #[derive(Debug, Clone, Eq, PartialEq)] pub struct NatsSplitEnumerator { subject: String, - split_num: i32, + split_id: SplitId, } #[async_trait] @@ -36,7 +38,7 @@ impl SplitEnumerator for NatsSplitEnumerator { ) -> anyhow::Result { Ok(Self { subject: properties.common.subject, - split_num: 0, + split_id: Arc::from("0"), }) } @@ -44,8 +46,8 @@ impl SplitEnumerator for NatsSplitEnumerator { // TODO: to simplify the logic, return 1 split for first version let nats_split = NatsSplit { subject: self.subject.clone(), - split_num: 0, // be the same as `from_nats_jetstream_message` - start_sequence: None, + split_id: Arc::from("0"), // be the same as `from_nats_jetstream_message` + start_sequence: NatsOffset::None, }; Ok(vec![nats_split]) diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 2aa9dc2de55f2..ed255e75d9f8b 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -25,6 +25,15 @@ pub const NATS_CONNECTOR: &str = "nats"; pub struct NatsProperties { #[serde(flatten)] pub common: NatsCommon, + + #[serde(rename = "scan.startup.mode", alias = "nats.scan.startup.mode")] + pub scan_startup_mode: Option, + + #[serde( + rename = "scan.startup.timestamp_millis", + alias = "nats.scan.startup.timestamp_millis" + )] + pub start_time: Option, } impl NatsProperties {} diff --git a/src/connector/src/source/nats/source/message.rs b/src/connector/src/source/nats/source/message.rs index afb3029d3b917..e582df86664e8 100644 --- a/src/connector/src/source/nats/source/message.rs +++ b/src/connector/src/source/nats/source/message.rs @@ -13,19 +13,37 @@ // limitations under the License. use async_nats; +use async_nats::jetstream::Message; use crate::source::base::SourceMessage; -use crate::source::SourceMeta; +use crate::source::{SourceMeta, SplitId}; -impl SourceMessage { - pub fn from_nats_jetstream_message(message: async_nats::jetstream::message::Message) -> Self { +#[derive(Clone, Debug)] +pub struct NatsMessage { + pub split_id: SplitId, + pub sequence_number: String, + pub payload: Vec, +} + +impl From for SourceMessage { + fn from(message: NatsMessage) -> Self { SourceMessage { key: None, - payload: Some(message.message.payload.to_vec()), + payload: Some(message.payload), // For nats jetstream, use sequence id as offset - offset: message.info().unwrap().stream_sequence.to_string(), - split_id: "0".into(), + offset: message.sequence_number, + split_id: message.split_id, meta: SourceMeta::Empty, } } } + +impl NatsMessage { + pub fn new(split_id: SplitId, message: Message) -> Self { + NatsMessage { + split_id, + sequence_number: message.info().unwrap().stream_sequence.to_string(), + payload: message.message.payload.to_vec(), + } + } +} diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index c0070a16c1392..08b0e91fae6b7 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -12,18 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::Result; +use anyhow::{anyhow, Result}; use async_nats::jetstream::consumer; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; +use super::message::NatsMessage; +use super::NatsOffset; use crate::parser::ParserConfig; use crate::source::common::{into_chunk_stream, CommonSplitReader}; -use crate::source::nats::split::NatsSplit; use crate::source::nats::NatsProperties; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitImpl, SplitReader, + BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitImpl, + SplitReader, }; pub struct NatsSplitReader { @@ -31,6 +33,8 @@ pub struct NatsSplitReader { properties: NatsProperties, parser_config: ParserConfig, source_ctx: SourceContextRef, + start_position: NatsOffset, + split_id: SplitId, } #[async_trait] @@ -46,19 +50,42 @@ impl SplitReader for NatsSplitReader { ) -> Result { // TODO: to simplify the logic, return 1 split for first version assert!(splits.len() == 1); - let splits = splits - .into_iter() - .map(|split| split.into_nats().unwrap()) - .collect::>(); + let split = splits.into_iter().next().unwrap().into_nats().unwrap(); + let split_id = split.split_id; + let start_position = match &split.start_sequence { + NatsOffset::None => match &properties.scan_startup_mode { + None => NatsOffset::Earliest, + Some(mode) => match mode.as_str() { + "latest" => NatsOffset::Latest, + "earliest" => NatsOffset::Earliest, + "timestamp_millis" => { + if let Some(time) = &properties.start_time { + NatsOffset::Timestamp(time.parse()?) + } else { + return Err(anyhow!("scan_startup_timestamp_millis is required")); + } + } + _ => { + return Err(anyhow!( + "invalid scan_startup_mode, accept earliest/latest/timestamp_millis" + )) + } + }, + }, + start_position => start_position.to_owned(), + }; + let consumer = properties .common - .build_consumer(0, splits[0].start_sequence) + .build_consumer(split_id.to_string(), start_position.clone()) .await?; Ok(Self { consumer, properties, parser_config, source_ctx, + start_position, + split_id, }) } @@ -78,7 +105,10 @@ impl CommonSplitReader for NatsSplitReader { for msgs in messages.ready_chunks(capacity) { let mut msg_vec = Vec::with_capacity(capacity); for msg in msgs { - msg_vec.push(SourceMessage::from_nats_jetstream_message(msg?)); + msg_vec.push(SourceMessage::from(NatsMessage::new( + self.split_id.clone(), + msg?, + ))); } yield msg_vec; } diff --git a/src/connector/src/source/nats/split.rs b/src/connector/src/source/nats/split.rs index f0fcfaff35481..4072de230d983 100644 --- a/src/connector/src/source/nats/split.rs +++ b/src/connector/src/source/nats/split.rs @@ -18,20 +18,29 @@ use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub enum NatsOffset { + Earliest, + Latest, + SequenceNumber(String), + Timestamp(i128), + None, +} + /// The states of a NATS split, which will be persisted to checkpoint. #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] pub struct NatsSplit { pub(crate) subject: String, // TODO: to simplify the logic, return 1 split for first version. May use parallelism in // future. - pub(crate) split_num: i32, - pub(crate) start_sequence: Option, + pub(crate) split_id: SplitId, + pub(crate) start_sequence: NatsOffset, } impl SplitMetaData for NatsSplit { fn id(&self) -> SplitId { // TODO: should avoid constructing a string every time - format!("{}", self.split_num).into() + format!("{}", self.split_id).into() } fn restore_from_json(value: JsonbVal) -> anyhow::Result { @@ -44,16 +53,21 @@ impl SplitMetaData for NatsSplit { } impl NatsSplit { - pub fn new(subject: String, split_num: i32, start_sequence: Option) -> Self { + pub fn new(subject: String, split_id: SplitId, start_sequence: NatsOffset) -> Self { Self { subject, - split_num, + split_id, start_sequence, } } pub fn update_with_offset(&mut self, start_sequence: String) -> anyhow::Result<()> { - self.start_sequence = Some(start_sequence.as_str().parse::().unwrap()); + let start_sequence = if start_sequence.is_empty() { + NatsOffset::Earliest + } else { + NatsOffset::SequenceNumber(start_sequence) + }; + self.start_sequence = start_sequence; Ok(()) } }