-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(stream): support set nats consumer deliver policy as latest, earliest, by sequence, by timestamp #12176
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,11 +26,12 @@ use risingwave_common::error::anyhow_error; | |
use serde_derive::{Deserialize, Serialize}; | ||
use serde_with::json::JsonString; | ||
use serde_with::{serde_as, DisplayFromStr}; | ||
use time::OffsetDateTime; | ||
|
||
use crate::aws_auth::AwsAuthProps; | ||
use crate::deserialize_duration_from_string; | ||
use crate::sink::SinkError; | ||
|
||
use crate::source::nats::source::NatsOffset; | ||
// The file describes the common abstractions for each connector and can be used in both source and | ||
// sink. | ||
|
||
|
@@ -395,7 +396,7 @@ impl NatsCommon { | |
pub(crate) async fn build_consumer( | ||
&self, | ||
split_id: i32, | ||
start_sequence: Option<u64>, | ||
start_sequence: NatsOffset, | ||
) -> anyhow::Result< | ||
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>, | ||
> { | ||
|
@@ -406,23 +407,28 @@ impl NatsCommon { | |
ack_policy: jetstream::consumer::AckPolicy::None, | ||
..Default::default() | ||
}; | ||
match start_sequence { | ||
Some(v) => { | ||
let consumer = stream | ||
.get_or_create_consumer(&name, { | ||
config.deliver_policy = DeliverPolicy::ByStartSequence { | ||
start_sequence: v + 1, | ||
}; | ||
config | ||
}) | ||
.await?; | ||
Ok(consumer) | ||
} | ||
None => { | ||
let consumer = stream.get_or_create_consumer(&name, config).await?; | ||
Ok(consumer) | ||
|
||
let deliver_policy = match start_sequence { | ||
NatsOffset::Earliest => DeliverPolicy::All, | ||
NatsOffset::Latest => DeliverPolicy::Last, | ||
NatsOffset::SequenceNumber(v) => { | ||
let parsed = v.parse::<u64>()?; | ||
DeliverPolicy::ByStartSequence { | ||
start_sequence: 1 + parsed, | ||
Comment on lines
+416
to
+417
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the behavior is not consistent with other connectors, starts with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one actually try to fix the problem that when resume. If we use by sequence policy, this one may cause problem (we need to manuelly minus 1 for input sequence number). But currently I apply your suggestion and remove the |
||
} | ||
} | ||
} | ||
NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime { | ||
start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)?, | ||
}, | ||
NatsOffset::None => DeliverPolicy::All, | ||
}; | ||
let consumer = stream | ||
.get_or_create_consumer(&name, { | ||
config.deliver_policy = deliver_policy; | ||
config | ||
}) | ||
.await?; | ||
Ok(consumer) | ||
} | ||
|
||
pub(crate) async fn build_or_get_stream( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,15 +12,15 @@ | |
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
use anyhow::Result; | ||
use anyhow::{anyhow, Result}; | ||
use async_nats::jetstream::consumer; | ||
use async_trait::async_trait; | ||
use futures::StreamExt; | ||
use futures_async_stream::try_stream; | ||
|
||
use super::NatsOffset; | ||
use crate::parser::ParserConfig; | ||
use crate::source::common::{into_chunk_stream, CommonSplitReader}; | ||
use crate::source::nats::split::NatsSplit; | ||
use crate::source::nats::NatsProperties; | ||
use crate::source::{ | ||
BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitImpl, SplitReader, | ||
|
@@ -31,6 +31,7 @@ pub struct NatsSplitReader { | |
properties: NatsProperties, | ||
parser_config: ParserConfig, | ||
source_ctx: SourceContextRef, | ||
start_position: NatsOffset, | ||
} | ||
|
||
#[async_trait] | ||
|
@@ -46,19 +47,59 @@ impl SplitReader for NatsSplitReader { | |
) -> Result<Self> { | ||
// TODO: to simplify the logic, return 1 split for first version | ||
assert!(splits.len() == 1); | ||
let splits = splits | ||
.into_iter() | ||
.map(|split| split.into_nats().unwrap()) | ||
.collect::<Vec<NatsSplit>>(); | ||
let split = splits.into_iter().next().unwrap().into_nats().unwrap(); | ||
let start_position = match &split.start_sequence { | ||
NatsOffset::None => match &properties.scan_startup_mode { | ||
None => NatsOffset::Earliest, | ||
Some(mode) => match mode.as_str() { | ||
"latest" => NatsOffset::Latest, | ||
"earliest" => NatsOffset::Earliest, | ||
"timestamp_millis" => { | ||
if let Some(time) = &properties.start_time { | ||
NatsOffset::Timestamp(time.parse()?) | ||
} else { | ||
return Err(anyhow!("scan_startup_timestamp_millis is required")); | ||
} | ||
} | ||
"sequence_number" => { | ||
if let Some(seq) = &properties.start_sequence { | ||
NatsOffset::SequenceNumber(seq.clone()) | ||
} else { | ||
return Err(anyhow!("scan_startup_sequence_number is required")); | ||
} | ||
} | ||
_ => { | ||
return Err(anyhow!( | ||
"invalid scan_startup_mode, accept earliest/latest/sequence_number/time" | ||
)) | ||
} | ||
}, | ||
}, | ||
start_position => start_position.to_owned(), | ||
}; | ||
|
||
if !matches!(start_position, NatsOffset::SequenceNumber(_)) | ||
&& properties.start_sequence.is_some() | ||
{ | ||
return Err( | ||
anyhow!("scan.startup.mode need to be set to 'sequence_number' if you want to start with a specific sequence number") | ||
); | ||
} | ||
if !matches!(start_position, NatsOffset::Timestamp(_)) && properties.start_time.is_some() { | ||
return Err( | ||
anyhow!("scan.startup.mode need to be set to 'timestamp_millis' if you want to start with a specific timestamp millis") | ||
); | ||
} | ||
let consumer = properties | ||
.common | ||
.build_consumer(0, splits[0].start_sequence) | ||
.build_consumer(0, start_position.clone()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is a bad practice to hardcode the split_id There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for you remind. I already applied your suggestions. :) |
||
.await?; | ||
Ok(Self { | ||
consumer, | ||
properties, | ||
parser_config, | ||
source_ctx, | ||
start_position, | ||
}) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,14 +18,23 @@ use serde::{Deserialize, Serialize}; | |
|
||
use crate::source::{SplitId, SplitMetaData}; | ||
|
||
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)] | ||
pub enum NatsOffset { | ||
Earliest, | ||
Latest, | ||
SequenceNumber(String), | ||
Timestamp(i128), | ||
None, | ||
} | ||
|
||
/// The states of a NATS split, which will be persisted to checkpoint. | ||
#[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] | ||
pub struct NatsSplit { | ||
pub(crate) subject: String, | ||
// TODO: to simplify the logic, return 1 split for first version. May use parallelism in | ||
// future. | ||
pub(crate) split_num: i32, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may refactor to |
||
pub(crate) start_sequence: Option<u64>, | ||
pub(crate) start_sequence: NatsOffset, | ||
} | ||
|
||
impl SplitMetaData for NatsSplit { | ||
|
@@ -44,7 +53,7 @@ impl SplitMetaData for NatsSplit { | |
} | ||
|
||
impl NatsSplit { | ||
pub fn new(subject: String, split_num: i32, start_sequence: Option<u64>) -> Self { | ||
pub fn new(subject: String, split_num: i32, start_sequence: NatsOffset) -> Self { | ||
Self { | ||
subject, | ||
split_num, | ||
|
@@ -53,7 +62,12 @@ impl NatsSplit { | |
} | ||
|
||
pub fn update_with_offset(&mut self, start_sequence: String) -> anyhow::Result<()> { | ||
self.start_sequence = Some(start_sequence.as_str().parse::<u64>().unwrap()); | ||
let start_sequence = if start_sequence.is_empty() { | ||
NatsOffset::Earliest | ||
} else { | ||
NatsOffset::SequenceNumber(start_sequence) | ||
}; | ||
self.start_sequence = start_sequence; | ||
Ok(()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems unintended modifications to this file.