From 25621eee5195be55fb2b452325867bede2ed64f4 Mon Sep 17 00:00:00 2001 From: yufansong Date: Fri, 8 Sep 2023 00:44:11 -0700 Subject: [PATCH 1/5] first version, with time recovery bug, others good --- Cargo.lock | 1 + src/connector/Cargo.toml | 15 ++++- src/connector/src/common.rs | 42 ++++++++------ .../src/source/nats/enumerator/mod.rs | 4 +- src/connector/src/source/nats/mod.rs | 15 +++++ .../src/source/nats/source/reader.rs | 55 ++++++++++++++++--- src/connector/src/source/nats/split.rs | 20 ++++++- 7 files changed, 119 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fea56bef3e81f..f8f8a74e1994f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6926,6 +6926,7 @@ dependencies = [ "simd-json", "tempfile", "thiserror", + "time 0.3.28", "tokio-retry", "tokio-stream", "tokio-util", diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index de0f680453b1b..b1004c1722f8e 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -41,7 +41,9 @@ chrono = { version = "0.4", default-features = false, features = [ "clock", "std", ] } -clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = ["time"] } +clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [ + "time", +] } csv = "1.2" duration-str = "0.5.1" enum-as-inner = "0.6" @@ -56,8 +58,12 @@ itertools = "0.11" jsonschema-transpiler = "1.10.0" maplit = "1.0.2" moka = { version = "0.11", features = ["future"] } -mysql_async = { version = "0.31", default-features = false, features = ["default"] } -mysql_common = { version = "0.29.2", default-features = false, features = ["chrono"] } +mysql_async = { version = "0.31", default-features = false, features = [ + "default", +] } +mysql_common = { version = "0.29.2", default-features = false, features = [ + "chrono", +] } nexmark = { version = "0.2", features = ["serde"] } num-bigint = "0.4" opendal = "0.39" @@ -109,6 +115,9 @@ tonic = { workspace = true } tracing = "0.1" url = "2" urlencoding = "2" +time = "0.3.28" + + [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 5a03fc7bfd9af..56700c3f1230a 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -26,11 +26,12 @@ use risingwave_common::error::anyhow_error; use serde_derive::{Deserialize, Serialize}; use serde_with::json::JsonString; use serde_with::{serde_as, DisplayFromStr}; +use time::OffsetDateTime; use crate::aws_auth::AwsAuthProps; use crate::deserialize_duration_from_string; use crate::sink::SinkError; - +use crate::source::nats::source::NatsOffset; // The file describes the common abstractions for each connector and can be used in both source and // sink. @@ -395,7 +396,7 @@ impl NatsCommon { pub(crate) async fn build_consumer( &self, split_id: i32, - start_sequence: Option, + start_sequence: NatsOffset, ) -> anyhow::Result< async_nats::jetstream::consumer::Consumer, > { @@ -406,23 +407,28 @@ impl NatsCommon { ack_policy: jetstream::consumer::AckPolicy::None, ..Default::default() }; - match start_sequence { - Some(v) => { - let consumer = stream - .get_or_create_consumer(&name, { - config.deliver_policy = DeliverPolicy::ByStartSequence { - start_sequence: v + 1, - }; - config - }) - .await?; - Ok(consumer) - } - None => { - let consumer = stream.get_or_create_consumer(&name, config).await?; - Ok(consumer) + + let deliver_policy = match start_sequence { + NatsOffset::Earliest => DeliverPolicy::All, + NatsOffset::Latest => DeliverPolicy::Last, + NatsOffset::SequenceNumber(v) => { + let parsed = v.parse::()?; + DeliverPolicy::ByStartSequence { + start_sequence: 1 + parsed, + } } - } + NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime { + start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)?, + }, + NatsOffset::None => DeliverPolicy::All, + }; + let consumer = stream + .get_or_create_consumer(&name, { + config.deliver_policy = deliver_policy; + config + }) + .await?; + Ok(consumer) } pub(crate) async fn build_or_get_stream( diff --git a/src/connector/src/source/nats/enumerator/mod.rs b/src/connector/src/source/nats/enumerator/mod.rs index 88384bfb685e6..b2745aff5a189 100644 --- a/src/connector/src/source/nats/enumerator/mod.rs +++ b/src/connector/src/source/nats/enumerator/mod.rs @@ -15,7 +15,7 @@ use anyhow; use async_trait::async_trait; -use super::source::NatsSplit; +use super::source::{NatsSplit, NatsOffset}; use super::NatsProperties; use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; @@ -45,7 +45,7 @@ impl SplitEnumerator for NatsSplitEnumerator { let nats_split = NatsSplit { subject: self.subject.clone(), split_num: 0, // be the same as `from_nats_jetstream_message` - start_sequence: None, + start_sequence: NatsOffset::None, }; Ok(vec![nats_split]) diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 2aa9dc2de55f2..2a075b9fab6b7 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -25,6 +25,21 @@ pub const NATS_CONNECTOR: &str = "nats"; pub struct NatsProperties { #[serde(flatten)] pub common: NatsCommon, + + #[serde(rename = "scan.startup.mode", alias = "nats.scan.startup.mode")] + pub scan_startup_mode: Option, + + #[serde( + rename = "scan.startup.sequence_number", + alias = "nats.scan.startup.sequence_number" + )] + pub start_sequence: Option, + + #[serde( + rename = "scan.startup.timestamp_millis", + alias = "nats.scan.startup.timestamp_millis" + )] + pub start_time: Option, } impl NatsProperties {} diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index c0070a16c1392..864ed22290d49 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -12,15 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::Result; +use anyhow::{anyhow, Result}; use async_nats::jetstream::consumer; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; +use super::NatsOffset; use crate::parser::ParserConfig; use crate::source::common::{into_chunk_stream, CommonSplitReader}; -use crate::source::nats::split::NatsSplit; use crate::source::nats::NatsProperties; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitImpl, SplitReader, @@ -31,6 +31,7 @@ pub struct NatsSplitReader { properties: NatsProperties, parser_config: ParserConfig, source_ctx: SourceContextRef, + start_position: NatsOffset, } #[async_trait] @@ -46,19 +47,59 @@ impl SplitReader for NatsSplitReader { ) -> Result { // TODO: to simplify the logic, return 1 split for first version assert!(splits.len() == 1); - let splits = splits - .into_iter() - .map(|split| split.into_nats().unwrap()) - .collect::>(); + let split = splits.into_iter().next().unwrap().into_nats().unwrap(); + let start_position = match &split.start_sequence { + NatsOffset::None => match &properties.scan_startup_mode { + None => NatsOffset::Earliest, + Some(mode) => match mode.as_str() { + "latest" => NatsOffset::Latest, + "earliest" => NatsOffset::Earliest, + "timestamp_millis" => { + if let Some(time) = &properties.start_time { + NatsOffset::Timestamp(time.parse()?) + } else { + return Err(anyhow!("scan_startup_timestamp_millis is required")); + } + } + "sequence_number" => { + if let Some(seq) = &properties.start_sequence { + NatsOffset::SequenceNumber(seq.clone()) + } else { + return Err(anyhow!("scan_startup_sequence_number is required")); + } + } + _ => { + return Err(anyhow!( + "invalid scan_startup_mode, accept earliest/latest/sequence_number/time" + )) + } + }, + }, + start_position => start_position.to_owned(), + }; + + if !matches!(start_position, NatsOffset::SequenceNumber(_)) + && properties.start_sequence.is_some() + { + return Err( + anyhow!("scan.startup.mode need to be set to 'sequence_number' if you want to start with a specific sequence number") + ); + } + if !matches!(start_position, NatsOffset::Timestamp(_)) && properties.start_time.is_some() { + return Err( + anyhow!("scan.startup.mode need to be set to 'timestamp_millis' if you want to start with a specific timestamp millis") + ); + } let consumer = properties .common - .build_consumer(0, splits[0].start_sequence) + .build_consumer(0, start_position.clone()) .await?; Ok(Self { consumer, properties, parser_config, source_ctx, + start_position, }) } diff --git a/src/connector/src/source/nats/split.rs b/src/connector/src/source/nats/split.rs index f0fcfaff35481..c49f12374fe60 100644 --- a/src/connector/src/source/nats/split.rs +++ b/src/connector/src/source/nats/split.rs @@ -18,6 +18,15 @@ use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub enum NatsOffset { + Earliest, + Latest, + SequenceNumber(String), + Timestamp(i128), + None, +} + /// The states of a NATS split, which will be persisted to checkpoint. #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] pub struct NatsSplit { @@ -25,7 +34,7 @@ pub struct NatsSplit { // TODO: to simplify the logic, return 1 split for first version. May use parallelism in // future. pub(crate) split_num: i32, - pub(crate) start_sequence: Option, + pub(crate) start_sequence: NatsOffset, } impl SplitMetaData for NatsSplit { @@ -44,7 +53,7 @@ impl SplitMetaData for NatsSplit { } impl NatsSplit { - pub fn new(subject: String, split_num: i32, start_sequence: Option) -> Self { + pub fn new(subject: String, split_num: i32, start_sequence: NatsOffset) -> Self { Self { subject, split_num, @@ -53,7 +62,12 @@ impl NatsSplit { } pub fn update_with_offset(&mut self, start_sequence: String) -> anyhow::Result<()> { - self.start_sequence = Some(start_sequence.as_str().parse::().unwrap()); + let start_sequence = if start_sequence.is_empty() { + NatsOffset::Earliest + } else { + NatsOffset::SequenceNumber(start_sequence) + }; + self.start_sequence = start_sequence; Ok(()) } } From bfac3fcd8b5e5f1882fd984669eb98cac80fa455 Mon Sep 17 00:00:00 2001 From: yufansong Date: Wed, 13 Sep 2023 17:02:23 -0700 Subject: [PATCH 2/5] fix bug --- src/connector/src/source/nats/source/reader.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 864ed22290d49..8ec5139c9f7db 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -78,6 +78,8 @@ impl SplitReader for NatsSplitReader { start_position => start_position.to_owned(), }; + // pay attention not to check timeoffset and start_time in properties. Because when recovery + // the policy may change to by sequence, but start time still filed. if !matches!(start_position, NatsOffset::SequenceNumber(_)) && properties.start_sequence.is_some() { @@ -85,11 +87,6 @@ impl SplitReader for NatsSplitReader { anyhow!("scan.startup.mode need to be set to 'sequence_number' if you want to start with a specific sequence number") ); } - if !matches!(start_position, NatsOffset::Timestamp(_)) && properties.start_time.is_some() { - return Err( - anyhow!("scan.startup.mode need to be set to 'timestamp_millis' if you want to start with a specific timestamp millis") - ); - } let consumer = properties .common .build_consumer(0, start_position.clone()) From 87dd744b861d14d914fcb95deb76b1a990cbf7b4 Mon Sep 17 00:00:00 2001 From: yufansong Date: Wed, 13 Sep 2023 18:46:37 -0700 Subject: [PATCH 3/5] apply suggestions --- src/connector/Cargo.toml | 12 ++------ src/connector/src/common.rs | 2 +- .../src/source/nats/enumerator/mod.rs | 12 ++++---- .../src/source/nats/source/message.rs | 30 +++++++++++++++---- .../src/source/nats/source/reader.rs | 15 ++++++++-- src/connector/src/source/nats/split.rs | 8 ++--- 6 files changed, 51 insertions(+), 28 deletions(-) diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index b1004c1722f8e..e7d78079de4c6 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -41,9 +41,7 @@ chrono = { version = "0.4", default-features = false, features = [ "clock", "std", ] } -clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = [ - "time", -] } +clickhouse = { git = "https://github.com/risingwavelabs/clickhouse.rs", rev = "622501c1c98c80baaf578c716d6903dde947804e", features = ["time"] } csv = "1.2" duration-str = "0.5.1" enum-as-inner = "0.6" @@ -58,12 +56,8 @@ itertools = "0.11" jsonschema-transpiler = "1.10.0" maplit = "1.0.2" moka = { version = "0.11", features = ["future"] } -mysql_async = { version = "0.31", default-features = false, features = [ - "default", -] } -mysql_common = { version = "0.29.2", default-features = false, features = [ - "chrono", -] } +mysql_async = { version = "0.31", default-features = false, features = ["default"] } +mysql_common = { version = "0.29.2", default-features = false, features = ["chrono"] } nexmark = { version = "0.2", features = ["serde"] } num-bigint = "0.4" opendal = "0.39" diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 56700c3f1230a..df25e27631cfa 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -395,7 +395,7 @@ impl NatsCommon { pub(crate) async fn build_consumer( &self, - split_id: i32, + split_id: String, start_sequence: NatsOffset, ) -> anyhow::Result< async_nats::jetstream::consumer::Consumer, diff --git a/src/connector/src/source/nats/enumerator/mod.rs b/src/connector/src/source/nats/enumerator/mod.rs index b2745aff5a189..e987a45188114 100644 --- a/src/connector/src/source/nats/enumerator/mod.rs +++ b/src/connector/src/source/nats/enumerator/mod.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use anyhow; use async_trait::async_trait; -use super::source::{NatsSplit, NatsOffset}; +use super::source::{NatsOffset, NatsSplit}; use super::NatsProperties; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId}; #[derive(Debug, Clone, Eq, PartialEq)] pub struct NatsSplitEnumerator { subject: String, - split_num: i32, + split_id: SplitId, } #[async_trait] @@ -36,7 +38,7 @@ impl SplitEnumerator for NatsSplitEnumerator { ) -> anyhow::Result { Ok(Self { subject: properties.common.subject, - split_num: 0, + split_id: Arc::from("0"), }) } @@ -44,7 +46,7 @@ impl SplitEnumerator for NatsSplitEnumerator { // TODO: to simplify the logic, return 1 split for first version let nats_split = NatsSplit { subject: self.subject.clone(), - split_num: 0, // be the same as `from_nats_jetstream_message` + split_id: Arc::from("0"), // be the same as `from_nats_jetstream_message` start_sequence: NatsOffset::None, }; diff --git a/src/connector/src/source/nats/source/message.rs b/src/connector/src/source/nats/source/message.rs index afb3029d3b917..e582df86664e8 100644 --- a/src/connector/src/source/nats/source/message.rs +++ b/src/connector/src/source/nats/source/message.rs @@ -13,19 +13,37 @@ // limitations under the License. use async_nats; +use async_nats::jetstream::Message; use crate::source::base::SourceMessage; -use crate::source::SourceMeta; +use crate::source::{SourceMeta, SplitId}; -impl SourceMessage { - pub fn from_nats_jetstream_message(message: async_nats::jetstream::message::Message) -> Self { +#[derive(Clone, Debug)] +pub struct NatsMessage { + pub split_id: SplitId, + pub sequence_number: String, + pub payload: Vec, +} + +impl From for SourceMessage { + fn from(message: NatsMessage) -> Self { SourceMessage { key: None, - payload: Some(message.message.payload.to_vec()), + payload: Some(message.payload), // For nats jetstream, use sequence id as offset - offset: message.info().unwrap().stream_sequence.to_string(), - split_id: "0".into(), + offset: message.sequence_number, + split_id: message.split_id, meta: SourceMeta::Empty, } } } + +impl NatsMessage { + pub fn new(split_id: SplitId, message: Message) -> Self { + NatsMessage { + split_id, + sequence_number: message.info().unwrap().stream_sequence.to_string(), + payload: message.message.payload.to_vec(), + } + } +} diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 8ec5139c9f7db..507d9ab3a0338 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -12,18 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. + use anyhow::{anyhow, Result}; use async_nats::jetstream::consumer; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; +use super::message::NatsMessage; use super::NatsOffset; use crate::parser::ParserConfig; use crate::source::common::{into_chunk_stream, CommonSplitReader}; use crate::source::nats::NatsProperties; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitImpl, SplitReader, + BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitImpl, + SplitReader, }; pub struct NatsSplitReader { @@ -32,6 +35,7 @@ pub struct NatsSplitReader { parser_config: ParserConfig, source_ctx: SourceContextRef, start_position: NatsOffset, + split_id: SplitId, } #[async_trait] @@ -48,6 +52,7 @@ impl SplitReader for NatsSplitReader { // TODO: to simplify the logic, return 1 split for first version assert!(splits.len() == 1); let split = splits.into_iter().next().unwrap().into_nats().unwrap(); + let split_id = split.split_id; let start_position = match &split.start_sequence { NatsOffset::None => match &properties.scan_startup_mode { None => NatsOffset::Earliest, @@ -89,7 +94,7 @@ impl SplitReader for NatsSplitReader { } let consumer = properties .common - .build_consumer(0, start_position.clone()) + .build_consumer(split_id.to_string(), start_position.clone()) .await?; Ok(Self { consumer, @@ -97,6 +102,7 @@ impl SplitReader for NatsSplitReader { parser_config, source_ctx, start_position, + split_id, }) } @@ -116,7 +122,10 @@ impl CommonSplitReader for NatsSplitReader { for msgs in messages.ready_chunks(capacity) { let mut msg_vec = Vec::with_capacity(capacity); for msg in msgs { - msg_vec.push(SourceMessage::from_nats_jetstream_message(msg?)); + msg_vec.push(SourceMessage::from(NatsMessage::new( + self.split_id.clone(), + msg?, + ))); } yield msg_vec; } diff --git a/src/connector/src/source/nats/split.rs b/src/connector/src/source/nats/split.rs index c49f12374fe60..4072de230d983 100644 --- a/src/connector/src/source/nats/split.rs +++ b/src/connector/src/source/nats/split.rs @@ -33,14 +33,14 @@ pub struct NatsSplit { pub(crate) subject: String, // TODO: to simplify the logic, return 1 split for first version. May use parallelism in // future. - pub(crate) split_num: i32, + pub(crate) split_id: SplitId, pub(crate) start_sequence: NatsOffset, } impl SplitMetaData for NatsSplit { fn id(&self) -> SplitId { // TODO: should avoid constructing a string every time - format!("{}", self.split_num).into() + format!("{}", self.split_id).into() } fn restore_from_json(value: JsonbVal) -> anyhow::Result { @@ -53,10 +53,10 @@ impl SplitMetaData for NatsSplit { } impl NatsSplit { - pub fn new(subject: String, split_num: i32, start_sequence: NatsOffset) -> Self { + pub fn new(subject: String, split_id: SplitId, start_sequence: NatsOffset) -> Self { Self { subject, - split_num, + split_id, start_sequence, } } From 85577be172fd3f00836d1365d66574b97fc5686c Mon Sep 17 00:00:00 2001 From: yufansong Date: Wed, 13 Sep 2023 20:26:59 -0700 Subject: [PATCH 4/5] remove sequnece --- src/connector/src/source/nats/mod.rs | 6 ------ src/connector/src/source/nats/source/reader.rs | 18 +----------------- 2 files changed, 1 insertion(+), 23 deletions(-) diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 2a075b9fab6b7..ed255e75d9f8b 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -29,12 +29,6 @@ pub struct NatsProperties { #[serde(rename = "scan.startup.mode", alias = "nats.scan.startup.mode")] pub scan_startup_mode: Option, - #[serde( - rename = "scan.startup.sequence_number", - alias = "nats.scan.startup.sequence_number" - )] - pub start_sequence: Option, - #[serde( rename = "scan.startup.timestamp_millis", alias = "nats.scan.startup.timestamp_millis" diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index 507d9ab3a0338..f229226e6f834 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -66,16 +66,9 @@ impl SplitReader for NatsSplitReader { return Err(anyhow!("scan_startup_timestamp_millis is required")); } } - "sequence_number" => { - if let Some(seq) = &properties.start_sequence { - NatsOffset::SequenceNumber(seq.clone()) - } else { - return Err(anyhow!("scan_startup_sequence_number is required")); - } - } _ => { return Err(anyhow!( - "invalid scan_startup_mode, accept earliest/latest/sequence_number/time" + "invalid scan_startup_mode, accept earliest/latest/timestamp_millis" )) } }, @@ -83,15 +76,6 @@ impl SplitReader for NatsSplitReader { start_position => start_position.to_owned(), }; - // pay attention not to check timeoffset and start_time in properties. Because when recovery - // the policy may change to by sequence, but start time still filed. - if !matches!(start_position, NatsOffset::SequenceNumber(_)) - && properties.start_sequence.is_some() - { - return Err( - anyhow!("scan.startup.mode need to be set to 'sequence_number' if you want to start with a specific sequence number") - ); - } let consumer = properties .common .build_consumer(split_id.to_string(), start_position.clone()) From 873c17bef4505ab596c9d96f34aec23c1f088df3 Mon Sep 17 00:00:00 2001 From: yufansong Date: Wed, 13 Sep 2023 20:37:07 -0700 Subject: [PATCH 5/5] cargo fmt --- src/connector/Cargo.toml | 3 +-- src/connector/src/source/nats/source/reader.rs | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index e7d78079de4c6..e555ce4e62800 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -93,6 +93,7 @@ serde_with = { version = "3", features = ["json"] } simd-json = "0.10.6" tempfile = "3" thiserror = "1" +time = "0.3.28" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", @@ -109,8 +110,6 @@ tonic = { workspace = true } tracing = "0.1" url = "2" urlencoding = "2" -time = "0.3.28" - [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index f229226e6f834..08b0e91fae6b7 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. - use anyhow::{anyhow, Result}; use async_nats::jetstream::consumer; use async_trait::async_trait;