-
Notifications
You must be signed in to change notification settings - Fork 595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(stream): add nkey and jwt auth methods for nats connector #12227
Changes from 9 commits
9c9871e
427e633
8cc5120
ddbf1f3
dd8cfdf
d3ed07c
ec484aa
64c7ac3
1e25f4c
a0b34e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,11 +26,12 @@ use risingwave_common::error::anyhow_error; | |
use serde_derive::{Deserialize, Serialize}; | ||
use serde_with::json::JsonString; | ||
use serde_with::{serde_as, DisplayFromStr}; | ||
use time::OffsetDateTime; | ||
|
||
use crate::aws_auth::AwsAuthProps; | ||
use crate::deserialize_duration_from_string; | ||
use crate::sink::SinkError; | ||
|
||
use crate::source::nats::source::NatsOffset; | ||
// The file describes the common abstractions for each connector and can be used in both source and | ||
// sink. | ||
|
||
|
@@ -342,37 +343,68 @@ pub struct UpsertMessage<'a> { | |
#[serde_as] | ||
#[derive(Deserialize, Serialize, Debug, Clone)] | ||
pub struct NatsCommon { | ||
#[serde(rename = "nats.server_url")] | ||
#[serde(rename = "server_url")] | ||
pub server_url: String, | ||
#[serde(rename = "nats.subject")] | ||
#[serde(rename = "subject")] | ||
pub subject: String, | ||
#[serde(rename = "nats.user")] | ||
#[serde(rename = "connect_mode")] | ||
pub connect_mode: Option<String>, | ||
#[serde(rename = "username")] | ||
pub user: Option<String>, | ||
#[serde(rename = "nats.password")] | ||
#[serde(rename = "password")] | ||
pub password: Option<String>, | ||
#[serde(rename = "nats.max_bytes")] | ||
#[serde(rename = "jwt")] | ||
pub jwt: Option<String>, | ||
#[serde(rename = "nkey")] | ||
pub nkey: Option<String>, | ||
#[serde(rename = "max_bytes")] | ||
#[serde_as(as = "Option<DisplayFromStr>")] | ||
pub max_bytes: Option<i64>, | ||
#[serde(rename = "nats.max_messages")] | ||
#[serde(rename = "max_messages")] | ||
#[serde_as(as = "Option<DisplayFromStr>")] | ||
pub max_messages: Option<i64>, | ||
#[serde(rename = "nats.max_messages_per_subject")] | ||
#[serde(rename = "max_messages_per_subject")] | ||
#[serde_as(as = "Option<DisplayFromStr>")] | ||
pub max_messages_per_subject: Option<i64>, | ||
#[serde(rename = "nats.max_consumers")] | ||
#[serde(rename = "max_consumers")] | ||
#[serde_as(as = "Option<DisplayFromStr>")] | ||
pub max_consumers: Option<i32>, | ||
#[serde(rename = "nats.max_message_size")] | ||
#[serde(rename = "max_message_size")] | ||
#[serde_as(as = "Option<DisplayFromStr>")] | ||
pub max_message_size: Option<i32>, | ||
} | ||
|
||
impl NatsCommon { | ||
pub(crate) async fn build_client(&self) -> anyhow::Result<async_nats::Client> { | ||
let mut connect_options = async_nats::ConnectOptions::new(); | ||
if let (Some(v_user), Some(v_password)) = (self.user.as_ref(), self.password.as_ref()) { | ||
connect_options = connect_options.user_and_password(v_user.into(), v_password.into()); | ||
} | ||
match self.connect_mode.as_deref() { | ||
Some("user_and_password") => { | ||
if let (Some(v_user), Some(v_password)) = | ||
(self.user.as_ref(), self.password.as_ref()) | ||
{ | ||
connect_options = | ||
connect_options.user_and_password(v_user.into(), v_password.into()) | ||
} else { | ||
return Err(anyhow_error!( | ||
"nats connect mode is user_and_password, but user or password is empty" | ||
)); | ||
} | ||
} | ||
|
||
Some("credential") => { | ||
if let (Some(v_nkey), Some(v_jwt)) = (self.nkey.as_ref(), self.jwt.as_ref()) { | ||
connect_options = connect_options | ||
.credentials(&self.create_credential(v_nkey, v_jwt)?) | ||
.expect("failed to parse static creds") | ||
} else { | ||
return Err(anyhow_error!( | ||
"nats connect mode is credential, but nkey or jwt is empty" | ||
)); | ||
} | ||
} | ||
_ => {} | ||
}; | ||
|
||
let servers = self.server_url.split(',').collect::<Vec<&str>>(); | ||
let client = connect_options | ||
.connect( | ||
|
@@ -394,8 +426,8 @@ impl NatsCommon { | |
|
||
pub(crate) async fn build_consumer( | ||
&self, | ||
split_id: i32, | ||
start_sequence: Option<u64>, | ||
split_id: String, | ||
start_sequence: NatsOffset, | ||
) -> anyhow::Result< | ||
async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>, | ||
> { | ||
|
@@ -406,23 +438,28 @@ impl NatsCommon { | |
ack_policy: jetstream::consumer::AckPolicy::None, | ||
..Default::default() | ||
}; | ||
match start_sequence { | ||
Some(v) => { | ||
let consumer = stream | ||
.get_or_create_consumer(&name, { | ||
config.deliver_policy = DeliverPolicy::ByStartSequence { | ||
start_sequence: v + 1, | ||
}; | ||
config | ||
}) | ||
.await?; | ||
Ok(consumer) | ||
} | ||
None => { | ||
let consumer = stream.get_or_create_consumer(&name, config).await?; | ||
Ok(consumer) | ||
|
||
let deliver_policy = match start_sequence { | ||
NatsOffset::Earliest => DeliverPolicy::All, | ||
NatsOffset::Latest => DeliverPolicy::Last, | ||
NatsOffset::SequenceNumber(v) => { | ||
let parsed = v.parse::<u64>()?; | ||
DeliverPolicy::ByStartSequence { | ||
start_sequence: 1 + parsed, | ||
} | ||
} | ||
} | ||
NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime { | ||
start_time: OffsetDateTime::from_unix_timestamp_nanos(v * 1_000_000)?, | ||
}, | ||
NatsOffset::None => DeliverPolicy::All, | ||
}; | ||
let consumer = stream | ||
.get_or_create_consumer(&name, { | ||
config.deliver_policy = deliver_policy; | ||
config | ||
}) | ||
.await?; | ||
Ok(consumer) | ||
} | ||
|
||
pub(crate) async fn build_or_get_stream( | ||
|
@@ -432,6 +469,7 @@ impl NatsCommon { | |
let mut config = jetstream::stream::Config { | ||
// the subject default use name value | ||
name: self.subject.clone(), | ||
max_bytes: 1000000, | ||
..Default::default() | ||
}; | ||
if let Some(v) = self.max_bytes { | ||
|
@@ -452,4 +490,17 @@ impl NatsCommon { | |
let stream = jetstream.get_or_create_stream(config).await?; | ||
Ok(stream) | ||
} | ||
|
||
pub(crate) fn create_credential(&self, seed: &str, jwt: &str) -> anyhow::Result<String> { | ||
let creds = format!( | ||
"-----BEGIN NATS USER JWT-----\n{}\n------END NATS USER JWT------\n\n\ | ||
************************* IMPORTANT *************************\n\ | ||
NKEY Seed printed below can be used to sign and prove identity.\n\ | ||
NKEYs are sensitive and should be treated as secrets.\n\n\ | ||
-----BEGIN USER NKEY SEED-----\n{}\n------END USER NKEY SEED------\n\n\ | ||
*************************************************************", | ||
Comment on lines
+501
to
+506
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. incredible the func need the format There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is what the nats documents said, and also what syntropy told me |
||
jwt, seed | ||
); | ||
Ok(creds) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean rw also accepts Nats without auth?
if so, user should connect with
'plain'
or sth else, instead of leaving it empty.also, we need to reject unrecognized string here.