diff --git a/Cargo.lock b/Cargo.lock index 6cb0662b32155..693fc4d3c1864 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,7 +163,7 @@ checksum = "9c0fdddc3fdac97394ffcc5c89c634faa9c1c166ced54189af34e407c97b6ee7" dependencies = [ "apache-avro-derive", "byteorder", - "digest 0.10.7", + "digest", "lazy_static", "libflate", "log", @@ -189,7 +189,7 @@ dependencies = [ "byteorder", "bzip2", "crc32fast", - "digest 0.10.7", + "digest", "lazy_static", "libflate", "log", @@ -795,7 +795,7 @@ dependencies = [ "once_cell", "percent-encoding", "regex", - "sha2 0.10.7", + "sha2", "time", "tracing", ] @@ -829,7 +829,7 @@ dependencies = [ "md-5", "pin-project-lite", "sha1", - "sha2 0.10.7", + "sha2", "tracing", ] @@ -1215,15 +1215,6 @@ dependencies = [ "triple_accel", ] -[[package]] -name = "block-buffer" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" -dependencies = [ - "generic-array", -] - [[package]] name = "block-buffer" version = "0.10.4" @@ -1767,12 +1758,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "const-oid" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d6f2aa4d0537bcc1c74df8755072bd31c1ef1a3a1b85a68e8404a8c353b7b8b" - [[package]] name = "const-oid" version = "0.9.5" @@ -2082,15 +2067,29 @@ dependencies = [ [[package]] name = "curve25519-dalek" -version = "3.2.0" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b9fdf9972b2bd6af2d913799d9ebc165ea4d2e65878e329d9c6b372c4491b61" +checksum = "622178105f911d937a42cdb140730ba4a3ed2becd8ae6ce39c7d28b5d75d4588" dependencies = [ - "byteorder", - "digest 0.9.0", - "rand_core 0.5.1", + "cfg-if", + "cpufeatures", + "curve25519-dalek-derive", + "digest", + "fiat-crypto", + "platforms", + "rustc_version", "subtle", - "zeroize", +] + +[[package]] +name = "curve25519-dalek-derive" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fdaf97f4804dcebfa5862639bc9ce4121e82140bec2a987ac5140294865b5b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.33", ] [[package]] @@ -2291,23 +2290,14 @@ dependencies = [ "uuid", ] -[[package]] -name = "der" -version = "0.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79b71cca7d95d7681a4b3b9cdf63c8dbc3730d0584c2c74e31416d64a90493f4" -dependencies = [ - "const-oid 0.6.2", -] - [[package]] name = "der" version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fffa369a668c8af7dbf8b5e56c9f744fbd399949ed171606040001947de40b1c" dependencies = [ - "const-oid 0.9.5", - "pem-rfc7468 0.7.0", + "const-oid", + "pem-rfc7468", "zeroize", ] @@ -2386,23 +2376,14 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" -[[package]] -name = "digest" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" -dependencies = [ - "generic-array", -] - [[package]] name = "digest" version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer 0.10.4", - "const-oid 0.9.5", + "block-buffer", + "const-oid", "crypto-common", "subtle", ] @@ -2474,23 +2455,23 @@ checksum = "49457524c7e65648794c98283282a0b7c73b10018e7091f1cdcfff314fd7ae59" [[package]] name = "ed25519" -version = "1.5.3" +version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91cff35c70bba8a626e3185d8cd48cc11b5437e1a5bcd15b9b5fa3c64b6dfee7" +checksum = "60f6d271ca33075c88028be6f04d502853d63a5ece419d269c15315d4fc1cf1d" dependencies = [ - "signature 1.6.4", + "signature", ] [[package]] name = "ed25519-dalek" -version = "1.0.1" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d" +checksum = "7277392b266383ef8396db7fdeb1e77b6c52fed775f5df15bb24f35b72156980" dependencies = [ "curve25519-dalek", "ed25519", - "sha2 0.9.9", - "zeroize", + "sha2", + "signature", ] [[package]] @@ -2711,6 +2692,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" +[[package]] +name = "fiat-crypto" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0870c84016d4b481be5c9f323c24f65e31e901ae618f0e80f4308fb00de1d2d" + [[package]] name = "fiemap" version = "0.1.1" @@ -3430,7 +3417,7 @@ version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest 0.10.7", + "digest", ] [[package]] @@ -4364,7 +4351,7 @@ version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" dependencies = [ - "digest 0.10.7", + "digest", ] [[package]] @@ -4601,7 +4588,7 @@ dependencies = [ "serde", "serde_json", "sha1", - "sha2 0.10.7", + "sha2", "smallvec", "subprocess", "thiserror", @@ -4677,9 +4664,9 @@ dependencies = [ [[package]] name = "nkeys" -version = "0.3.1" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9261eb915c785ea65708bc45ef43507ea46914e1a73f1412d1a38aba967c8e" +checksum = "aad178aad32087b19042ee36dfd450b73f5f934fbfb058b59b198684dfec4c47" dependencies = [ "byteorder", "data-encoding", @@ -4913,7 +4900,7 @@ dependencies = [ "serde", "serde_json", "serde_path_to_error", - "sha2 0.10.7", + "sha2", "thiserror", "url", ] @@ -4939,12 +4926,6 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" -[[package]] -name = "opaque-debug" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" - [[package]] name = "opendal" version = "0.39.0" @@ -4973,7 +4954,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "sha2 0.10.7", + "sha2", "tokio", "uuid", ] @@ -5406,15 +5387,6 @@ dependencies = [ "serde", ] -[[package]] -name = "pem-rfc7468" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f22eb0e3c593294a99e9ff4b24cf6b752d43f193aa4415fe5077c159996d497" -dependencies = [ - "base64ct", -] - [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -5549,21 +5521,9 @@ version = "0.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8ffb9f10fa047879315e6625af03c164b16962a5368d724ed16323b68ace47f" dependencies = [ - "der 0.7.8", - "pkcs8 0.10.2", - "spki 0.7.2", -] - -[[package]] -name = "pkcs8" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee3ef9b64d26bad0536099c816c6734379e45bbd5f14798def6809e5cc350447" -dependencies = [ - "der 0.4.5", - "pem-rfc7468 0.2.3", - "spki 0.4.1", - "zeroize", + "der", + "pkcs8", + "spki", ] [[package]] @@ -5572,8 +5532,8 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f950b2377845cebe5cf8b5165cb3cc1a5e0fa5cfa3e1f7f55707d8fd82e0a7b7" dependencies = [ - "der 0.7.8", - "spki 0.7.2", + "der", + "spki", ] [[package]] @@ -5582,6 +5542,12 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "platforms" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4503fa043bf02cee09a9582e9554b4c6403b2ef55e4612e96561d294419429f8" + [[package]] name = "plotters" version = "0.3.5" @@ -5672,7 +5638,7 @@ dependencies = [ "md-5", "memchr", "rand", - "sha2 0.10.7", + "sha2", "stringprep", ] @@ -6154,7 +6120,7 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", "rand_chacha", - "rand_core 0.6.4", + "rand_core", ] [[package]] @@ -6164,15 +6130,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core 0.6.4", + "rand_core", ] -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" - [[package]] name = "rand_core" version = "0.6.4" @@ -6188,7 +6148,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" dependencies = [ - "rand_core 0.6.4", + "rand_core", ] [[package]] @@ -6350,7 +6310,7 @@ dependencies = [ "serde", "serde_json", "sha1", - "sha2 0.10.7", + "sha2", "tokio", ] @@ -6884,6 +6844,7 @@ dependencies = [ "mysql_async", "mysql_common", "nexmark", + "nkeys", "num-bigint", "opendal", "parking_lot 0.12.1", @@ -6909,6 +6870,7 @@ dependencies = [ "simd-json", "tempfile", "thiserror", + "time", "tokio-retry", "tokio-stream", "tokio-util", @@ -7007,7 +6969,7 @@ dependencies = [ "serde", "serde_json", "sha1", - "sha2 0.10.7", + "sha2", "smallvec", "static_assertions", "thiserror", @@ -7080,7 +7042,7 @@ dependencies = [ "risingwave_variables", "serde", "serde_json", - "sha2 0.10.7", + "sha2", "smallvec", "tempfile", "thiserror", @@ -7745,17 +7707,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ab43bb47d23c1a631b4b680199a45255dce26fa9ab2fa902581f624ff13e6a8" dependencies = [ "byteorder", - "const-oid 0.9.5", - "digest 0.10.7", + "const-oid", + "digest", "num-bigint-dig", "num-integer", "num-iter", "num-traits", "pkcs1", - "pkcs8 0.10.2", - "rand_core 0.6.4", - "signature 2.1.0", - "spki 0.7.2", + "pkcs8", + "rand_core", + "signature", + "spki", "subtle", "zeroize", ] @@ -8332,7 +8294,7 @@ checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.10.7", + "digest", ] [[package]] @@ -8341,19 +8303,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" -[[package]] -name = "sha2" -version = "0.9.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" -dependencies = [ - "block-buffer 0.9.0", - "cfg-if", - "cpufeatures", - "digest 0.9.0", - "opaque-debug", -] - [[package]] name = "sha2" version = "0.10.7" @@ -8362,7 +8311,7 @@ checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" dependencies = [ "cfg-if", "cpufeatures", - "digest 0.10.7", + "digest", ] [[package]] @@ -8428,30 +8377,24 @@ dependencies = [ [[package]] name = "signatory" -version = "0.23.2" +version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dfecc059e81632eef1dd9b79e22fc28b8fe69b30d3357512a77a0ad8ee3c782" +checksum = "c1e303f8205714074f6068773f0e29527e0453937fe837c9717d066635b65f31" dependencies = [ - "pkcs8 0.7.6", - "rand_core 0.6.4", - "signature 1.6.4", + "pkcs8", + "rand_core", + "signature", "zeroize", ] [[package]] name = "signature" -version = "1.6.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" - -[[package]] -name = "signature" -version = "2.1.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e1788eed21689f9cf370582dfc467ef36ed9c707f073528ddafa8d83e3b8500" +checksum = "8fe458c98333f9c8152221191a77e2a44e8325d0193484af2e9421a53019e57d" dependencies = [ - "digest 0.10.7", - "rand_core 0.6.4", + "digest", + "rand_core", ] [[package]] @@ -8613,15 +8556,6 @@ dependencies = [ "lock_api", ] -[[package]] -name = "spki" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c01a0c15da1b0b0e1494112e7af814a678fec9bd157881b49beac661e9b6f32" -dependencies = [ - "der 0.4.5", -] - [[package]] name = "spki" version = "0.7.2" @@ -8629,7 +8563,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d1e996ef02c474957d681f1b05213dfb0abab947b446a62d37770b23500184a" dependencies = [ "base64ct", - "der 0.7.8", + "der", ] [[package]] @@ -10052,7 +9986,6 @@ dependencies = [ "base64 0.21.3", "bit-vec", "bitflags 2.4.0", - "byteorder", "bytes", "cc", "chrono", @@ -10110,7 +10043,7 @@ dependencies = [ "prost", "rand", "rand_chacha", - "rand_core 0.6.4", + "rand_core", "regex", "regex-automata 0.3.8", "regex-syntax 0.7.5", @@ -10143,7 +10076,6 @@ dependencies = [ "unicode-normalization", "url", "uuid", - "zeroize", ] [[package]] @@ -10227,20 +10159,6 @@ name = "zeroize" version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9" -dependencies = [ - "zeroize_derive", -] - -[[package]] -name = "zeroize_derive" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.33", -] [[package]] name = "zstd" diff --git a/src/connector/Cargo.toml b/src/connector/Cargo.toml index 6b0aebf4b0f18..bb1b4b2b4ee0f 100644 --- a/src/connector/Cargo.toml +++ b/src/connector/Cargo.toml @@ -66,6 +66,7 @@ mysql_common = { version = "0.30", default-features = false, features = [ "chrono", ] } nexmark = { version = "0.2", features = ["serde"] } +nkeys = "0.3.2" num-bigint = "0.4" opendal = "0.39" parking_lot = "0.12" @@ -101,6 +102,7 @@ serde_with = { version = "3", features = ["json"] } simd-json = "0.10.6" tempfile = "3" thiserror = "1" +time = "0.3.28" tokio = { version = "0.2", package = "madsim-tokio", features = [ "rt", "rt-multi-thread", @@ -117,6 +119,7 @@ tonic = { workspace = true } tracing = "0.1" url = "2" urlencoding = "2" + [target.'cfg(not(madsim))'.dependencies] workspace-hack = { path = "../workspace-hack" } diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 5a03fc7bfd9af..4e2142fda0adb 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -26,11 +26,12 @@ use risingwave_common::error::anyhow_error; use serde_derive::{Deserialize, Serialize}; use serde_with::json::JsonString; use serde_with::{serde_as, DisplayFromStr}; +use time::OffsetDateTime; use crate::aws_auth::AwsAuthProps; use crate::deserialize_duration_from_string; use crate::sink::SinkError; - +use crate::source::nats::source::NatsOffset; // The file describes the common abstractions for each connector and can be used in both source and // sink. @@ -342,27 +343,33 @@ pub struct UpsertMessage<'a> { #[serde_as] #[derive(Deserialize, Serialize, Debug, Clone)] pub struct NatsCommon { - #[serde(rename = "nats.server_url")] + #[serde(rename = "server_url")] pub server_url: String, - #[serde(rename = "nats.subject")] + #[serde(rename = "subject")] pub subject: String, - #[serde(rename = "nats.user")] + #[serde(rename = "connect_mode")] + pub connect_mode: Option, + #[serde(rename = "username")] pub user: Option, - #[serde(rename = "nats.password")] + #[serde(rename = "password")] pub password: Option, - #[serde(rename = "nats.max_bytes")] + #[serde(rename = "jwt")] + pub jwt: Option, + #[serde(rename = "nkey")] + pub nkey: Option, + #[serde(rename = "max_bytes")] #[serde_as(as = "Option")] pub max_bytes: Option, - #[serde(rename = "nats.max_messages")] + #[serde(rename = "max_messages")] #[serde_as(as = "Option")] pub max_messages: Option, - #[serde(rename = "nats.max_messages_per_subject")] + #[serde(rename = "max_messages_per_subject")] #[serde_as(as = "Option")] pub max_messages_per_subject: Option, - #[serde(rename = "nats.max_consumers")] + #[serde(rename = "max_consumers")] #[serde_as(as = "Option")] pub max_consumers: Option, - #[serde(rename = "nats.max_message_size")] + #[serde(rename = "max_message_size")] #[serde_as(as = "Option")] pub max_message_size: Option, } @@ -370,9 +377,39 @@ pub struct NatsCommon { impl NatsCommon { pub(crate) async fn build_client(&self) -> anyhow::Result { let mut connect_options = async_nats::ConnectOptions::new(); - if let (Some(v_user), Some(v_password)) = (self.user.as_ref(), self.password.as_ref()) { - connect_options = connect_options.user_and_password(v_user.into(), v_password.into()); - } + match self.connect_mode.as_deref() { + Some("user_and_password") => { + if let (Some(v_user), Some(v_password)) = + (self.user.as_ref(), self.password.as_ref()) + { + connect_options = + connect_options.user_and_password(v_user.into(), v_password.into()) + } else { + return Err(anyhow_error!( + "nats connect mode is user_and_password, but user or password is empty" + )); + } + } + + Some("credential") => { + if let (Some(v_nkey), Some(v_jwt)) = (self.nkey.as_ref(), self.jwt.as_ref()) { + connect_options = connect_options + .credentials(&self.create_credential(v_nkey, v_jwt)?) + .expect("failed to parse static creds") + } else { + return Err(anyhow_error!( + "nats connect mode is credential, but nkey or jwt is empty" + )); + } + } + Some("plain") => {} + _ => { + return Err(anyhow_error!( + "nats connect mode only accept user_and_password/credential/plain" + )); + } + }; + let servers = self.server_url.split(',').collect::>(); let client = connect_options .connect( @@ -394,8 +431,8 @@ impl NatsCommon { pub(crate) async fn build_consumer( &self, - split_id: i32, - start_sequence: Option, + split_id: String, + start_sequence: NatsOffset, ) -> anyhow::Result< async_nats::jetstream::consumer::Consumer, > { @@ -406,23 +443,28 @@ impl NatsCommon { ack_policy: jetstream::consumer::AckPolicy::None, ..Default::default() }; - match start_sequence { - Some(v) => { - let consumer = stream - .get_or_create_consumer(&name, { - config.deliver_policy = DeliverPolicy::ByStartSequence { - start_sequence: v + 1, - }; - config - }) - .await?; - Ok(consumer) - } - None => { - let consumer = stream.get_or_create_consumer(&name, config).await?; - Ok(consumer) + + let deliver_policy = match start_sequence { + NatsOffset::Earliest => DeliverPolicy::All, + NatsOffset::Latest => DeliverPolicy::Last, + NatsOffset::SequenceNumber(v) => { + let parsed = v.parse::()?; + DeliverPolicy::ByStartSequence { + start_sequence: 1 + parsed, + } } - } + NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime { + start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)?, + }, + NatsOffset::None => DeliverPolicy::All, + }; + let consumer = stream + .get_or_create_consumer(&name, { + config.deliver_policy = deliver_policy; + config + }) + .await?; + Ok(consumer) } pub(crate) async fn build_or_get_stream( @@ -432,6 +474,7 @@ impl NatsCommon { let mut config = jetstream::stream::Config { // the subject default use name value name: self.subject.clone(), + max_bytes: 1000000, ..Default::default() }; if let Some(v) = self.max_bytes { @@ -452,4 +495,17 @@ impl NatsCommon { let stream = jetstream.get_or_create_stream(config).await?; Ok(stream) } + + pub(crate) fn create_credential(&self, seed: &str, jwt: &str) -> anyhow::Result { + let creds = format!( + "-----BEGIN NATS USER JWT-----\n{}\n------END NATS USER JWT------\n\n\ + ************************* IMPORTANT *************************\n\ + NKEY Seed printed below can be used to sign and prove identity.\n\ + NKEYs are sensitive and should be treated as secrets.\n\n\ + -----BEGIN USER NKEY SEED-----\n{}\n------END USER NKEY SEED------\n\n\ + *************************************************************", + jwt, seed + ); + Ok(creds) + } } diff --git a/src/connector/src/source/nats/enumerator/mod.rs b/src/connector/src/source/nats/enumerator/mod.rs index 88384bfb685e6..e987a45188114 100644 --- a/src/connector/src/source/nats/enumerator/mod.rs +++ b/src/connector/src/source/nats/enumerator/mod.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use anyhow; use async_trait::async_trait; -use super::source::NatsSplit; +use super::source::{NatsOffset, NatsSplit}; use super::NatsProperties; -use crate::source::{SourceEnumeratorContextRef, SplitEnumerator}; +use crate::source::{SourceEnumeratorContextRef, SplitEnumerator, SplitId}; #[derive(Debug, Clone, Eq, PartialEq)] pub struct NatsSplitEnumerator { subject: String, - split_num: i32, + split_id: SplitId, } #[async_trait] @@ -36,7 +38,7 @@ impl SplitEnumerator for NatsSplitEnumerator { ) -> anyhow::Result { Ok(Self { subject: properties.common.subject, - split_num: 0, + split_id: Arc::from("0"), }) } @@ -44,8 +46,8 @@ impl SplitEnumerator for NatsSplitEnumerator { // TODO: to simplify the logic, return 1 split for first version let nats_split = NatsSplit { subject: self.subject.clone(), - split_num: 0, // be the same as `from_nats_jetstream_message` - start_sequence: None, + split_id: Arc::from("0"), // be the same as `from_nats_jetstream_message` + start_sequence: NatsOffset::None, }; Ok(vec![nats_split]) diff --git a/src/connector/src/source/nats/mod.rs b/src/connector/src/source/nats/mod.rs index 1d887e342f1f8..3e8cc57bc1da8 100644 --- a/src/connector/src/source/nats/mod.rs +++ b/src/connector/src/source/nats/mod.rs @@ -29,6 +29,12 @@ pub const NATS_CONNECTOR: &str = "nats"; pub struct NatsProperties { #[serde(flatten)] pub common: NatsCommon, + + #[serde(rename = "scan.startup.mode")] + pub scan_startup_mode: Option, + + #[serde(rename = "scan.startup.timestamp_millis")] + pub start_time: Option, } impl NatsProperties {} diff --git a/src/connector/src/source/nats/source/message.rs b/src/connector/src/source/nats/source/message.rs index afb3029d3b917..e582df86664e8 100644 --- a/src/connector/src/source/nats/source/message.rs +++ b/src/connector/src/source/nats/source/message.rs @@ -13,19 +13,37 @@ // limitations under the License. use async_nats; +use async_nats::jetstream::Message; use crate::source::base::SourceMessage; -use crate::source::SourceMeta; +use crate::source::{SourceMeta, SplitId}; -impl SourceMessage { - pub fn from_nats_jetstream_message(message: async_nats::jetstream::message::Message) -> Self { +#[derive(Clone, Debug)] +pub struct NatsMessage { + pub split_id: SplitId, + pub sequence_number: String, + pub payload: Vec, +} + +impl From for SourceMessage { + fn from(message: NatsMessage) -> Self { SourceMessage { key: None, - payload: Some(message.message.payload.to_vec()), + payload: Some(message.payload), // For nats jetstream, use sequence id as offset - offset: message.info().unwrap().stream_sequence.to_string(), - split_id: "0".into(), + offset: message.sequence_number, + split_id: message.split_id, meta: SourceMeta::Empty, } } } + +impl NatsMessage { + pub fn new(split_id: SplitId, message: Message) -> Self { + NatsMessage { + split_id, + sequence_number: message.info().unwrap().stream_sequence.to_string(), + payload: message.message.payload.to_vec(), + } + } +} diff --git a/src/connector/src/source/nats/source/reader.rs b/src/connector/src/source/nats/source/reader.rs index d958b5a898495..6e22748bcf468 100644 --- a/src/connector/src/source/nats/source/reader.rs +++ b/src/connector/src/source/nats/source/reader.rs @@ -12,18 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use anyhow::Result; +use anyhow::{anyhow, Result}; use async_nats::jetstream::consumer; use async_trait::async_trait; use futures::StreamExt; use futures_async_stream::try_stream; +use super::message::NatsMessage; +use super::{NatsOffset, NatsSplit}; use crate::parser::ParserConfig; use crate::source::common::{into_chunk_stream, CommonSplitReader}; -use crate::source::nats::split::NatsSplit; use crate::source::nats::NatsProperties; use crate::source::{ - BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitReader, + BoxSourceWithStateStream, Column, SourceContextRef, SourceMessage, SplitId, SplitReader, }; pub struct NatsSplitReader { @@ -31,6 +32,8 @@ pub struct NatsSplitReader { properties: NatsProperties, parser_config: ParserConfig, source_ctx: SourceContextRef, + start_position: NatsOffset, + split_id: SplitId, } #[async_trait] @@ -47,15 +50,42 @@ impl SplitReader for NatsSplitReader { ) -> Result { // TODO: to simplify the logic, return 1 split for first version assert!(splits.len() == 1); + let split = splits.into_iter().next().unwrap(); + let split_id = split.split_id; + let start_position = match &split.start_sequence { + NatsOffset::None => match &properties.scan_startup_mode { + None => NatsOffset::Earliest, + Some(mode) => match mode.as_str() { + "latest" => NatsOffset::Latest, + "earliest" => NatsOffset::Earliest, + "timestamp_millis" => { + if let Some(time) = &properties.start_time { + NatsOffset::Timestamp(time.parse()?) + } else { + return Err(anyhow!("scan_startup_timestamp_millis is required")); + } + } + _ => { + return Err(anyhow!( + "invalid scan_startup_mode, accept earliest/latest/timestamp_millis" + )) + } + }, + }, + start_position => start_position.to_owned(), + }; + let consumer = properties .common - .build_consumer(0, splits[0].start_sequence) + .build_consumer(split_id.to_string(), start_position.clone()) .await?; Ok(Self { consumer, properties, parser_config, source_ctx, + start_position, + split_id, }) } @@ -75,7 +105,10 @@ impl CommonSplitReader for NatsSplitReader { for msgs in messages.ready_chunks(capacity) { let mut msg_vec = Vec::with_capacity(capacity); for msg in msgs { - msg_vec.push(SourceMessage::from_nats_jetstream_message(msg?)); + msg_vec.push(SourceMessage::from(NatsMessage::new( + self.split_id.clone(), + msg?, + ))); } yield msg_vec; } diff --git a/src/connector/src/source/nats/split.rs b/src/connector/src/source/nats/split.rs index f0fcfaff35481..4072de230d983 100644 --- a/src/connector/src/source/nats/split.rs +++ b/src/connector/src/source/nats/split.rs @@ -18,20 +18,29 @@ use serde::{Deserialize, Serialize}; use crate::source::{SplitId, SplitMetaData}; +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)] +pub enum NatsOffset { + Earliest, + Latest, + SequenceNumber(String), + Timestamp(i128), + None, +} + /// The states of a NATS split, which will be persisted to checkpoint. #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Hash)] pub struct NatsSplit { pub(crate) subject: String, // TODO: to simplify the logic, return 1 split for first version. May use parallelism in // future. - pub(crate) split_num: i32, - pub(crate) start_sequence: Option, + pub(crate) split_id: SplitId, + pub(crate) start_sequence: NatsOffset, } impl SplitMetaData for NatsSplit { fn id(&self) -> SplitId { // TODO: should avoid constructing a string every time - format!("{}", self.split_num).into() + format!("{}", self.split_id).into() } fn restore_from_json(value: JsonbVal) -> anyhow::Result { @@ -44,16 +53,21 @@ impl SplitMetaData for NatsSplit { } impl NatsSplit { - pub fn new(subject: String, split_num: i32, start_sequence: Option) -> Self { + pub fn new(subject: String, split_id: SplitId, start_sequence: NatsOffset) -> Self { Self { subject, - split_num, + split_id, start_sequence, } } pub fn update_with_offset(&mut self, start_sequence: String) -> anyhow::Result<()> { - self.start_sequence = Some(start_sequence.as_str().parse::().unwrap()); + let start_sequence = if start_sequence.is_empty() { + NatsOffset::Earliest + } else { + NatsOffset::SequenceNumber(start_sequence) + }; + self.start_sequence = start_sequence; Ok(()) } } diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 603619f3a8b27..bc01c7f871d89 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -26,7 +26,6 @@ aws-smithy-client = { version = "0.55", default-features = false, features = ["n base64 = { version = "0.21", features = ["alloc"] } bit-vec = { version = "0.6" } bitflags = { version = "2", default-features = false, features = ["std"] } -byteorder = { version = "1", features = ["i128"] } bytes = { version = "1", features = ["serde"] } chrono = { version = "0.4", features = ["alloc", "serde"] } clap = { version = "4", features = ["cargo", "derive", "env"] } @@ -112,7 +111,6 @@ unicode-bidi = { version = "0.3" } unicode-normalization = { version = "0.1" } url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } -zeroize = { version = "1", features = ["zeroize_derive"] } [build-dependencies] ahash = { version = "0.8" } @@ -124,7 +122,6 @@ aws-smithy-client = { version = "0.55", default-features = false, features = ["n base64 = { version = "0.21", features = ["alloc"] } bit-vec = { version = "0.6" } bitflags = { version = "2", default-features = false, features = ["std"] } -byteorder = { version = "1", features = ["i128"] } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } chrono = { version = "0.4", features = ["alloc", "serde"] } @@ -215,6 +212,5 @@ unicode-bidi = { version = "0.3" } unicode-normalization = { version = "0.1" } url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } -zeroize = { version = "1", features = ["zeroize_derive"] } ### END HAKARI SECTION