Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(si-data-nats): upgrade to async-nats 0.36.0 #4564

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 67 additions & 65 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ rust-version = "1.78"
publish = false

[workspace.dependencies]
async-nats = { version = "0.35.1", features = ["service"] }
async-nats = { version = "0.36.0", features = ["service"] }
async-recursion = "1.0.5"
async-trait = "0.1.79"
aws-config = { version = "1.1.7", features = ["behavior-version-latest"] }
Expand Down Expand Up @@ -204,9 +204,6 @@ yrs = { version = "0.17.4" }

[patch.crates-io]
# pending a potential merge and release of
# https://github.com/nats-io/nats.rs/pull/1301
async-nats = { git = "https://github.com/systeminit/nats.rs.git", branch = "si/stable" }
# pending a potential merge and release of
# https://github.com/softprops/hyperlocal/pull/53
hyperlocal = { git = "https://github.com/fnichol/hyperlocal.git", branch = "pub-unix-stream" }
# pending a potential merge and release of
Expand Down
178 changes: 149 additions & 29 deletions lib/si-data-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use async_nats::{
header::{self, IntoHeaderName, IntoHeaderValue},
jetstream::{
account::Account,
consumer::{FromConsumer, IntoConsumerConfig},
consumer::{Consumer, FromConsumer, IntoConsumerConfig},
context::{
AccountError, CreateKeyValueError, CreateObjectStoreError, CreateStreamError,
DeleteObjectStore, DeleteStreamError, GetStreamError, KeyValueError, ObjectStoreError,
PublishAckFuture, PublishError, RequestError, UpdateStreamError,
DeleteObjectStore, DeleteStreamError, GetStreamByNameError, GetStreamError,
KeyValueError, ObjectStoreError, PublishAckFuture, PublishError, RequestError,
UpdateStreamError,
},
stream::{Config, ConsumerError, DeleteStatus},
stream::{Config, ConsumerError, DeleteStatus, Info, Stream},
},
subject::ToSubject,
HeaderMap, HeaderValue,
Expand Down Expand Up @@ -63,8 +64,9 @@ impl Context {
Self { inner, metadata }
}

/// Publishes [jetstream::Message][super::message::Message] to the [Stream] without waiting for
/// acknowledgment from the server that the message has been successfully delivered.
/// Publishes [`jetstream::Message`][async_nats::jetstream::message::Message] to the [`Stream`]
/// without waiting for acknowledgment from the server that the message has been successfully
/// delivered.
///
/// Acknowledgment future that can be polled is returned instead.
///
Expand Down Expand Up @@ -356,9 +358,9 @@ impl Context {
Ok(account)
}

/// Create a JetStream [Stream] with given config and return a handle to it.
/// Create a JetStream [`Stream`] with given config and return a handle to it.
///
/// That handle can be used to manage and use [Consumer].
/// That handle can be used to manage and use [`Consumer`].
///
/// # Examples
///
Expand Down Expand Up @@ -411,7 +413,7 @@ impl Context {
pub async fn create_stream<S>(
&self,
stream_config: S,
) -> Result<async_nats::jetstream::stream::Stream, CreateStreamError>
) -> Result<Stream<Info>, CreateStreamError>
where
Config: From<S>,
{
Expand All @@ -427,8 +429,13 @@ impl Context {
Ok(stream)
}

/// Checks for [Stream] existence on the server and returns handle to it.
/// That handle can be used to manage and use [Consumer].
/// Checks for [`Stream`] existence on the server and returns handle to it.
///
/// That handle can be used to manage and use [`Consumer`]. This variant does not fetch
/// [`Stream`] info from the server. It means it does not check if the stream actually exists.
/// If you run more operations on few streams, it is better to use [`Context::get_stream`]
/// instead. If you however run single operations on many streams, this method is more
/// efficient.
///
/// # Examples
///
Expand All @@ -443,12 +450,12 @@ impl Context {
/// ).await?;
/// let jetstream = si_data_nats::jetstream::new(client);
///
/// let stream = jetstream.get_stream("events").await?;
/// let stream = jetstream.get_stream_no_info("events").await?;
/// # Ok(())
/// # }
/// ```
#[instrument(
name = "context.get_stream",
name = "context.get_stream_no_info",
skip_all,
level = "debug",
fields(
Expand All @@ -469,10 +476,65 @@ impl Context {
server.port = self.metadata.server_port(),
)
)]
pub async fn get_stream<T: AsRef<str>>(
pub async fn get_stream_no_info<T: AsRef<str>>(
&self,
stream: T,
) -> Result<async_nats::jetstream::stream::Stream, GetStreamError> {
) -> Result<Stream<()>, GetStreamError> {
let span = Span::current();

let stream = self
.inner
.get_stream_no_info(stream)
.await
.map_err(|err| span.record_err(err))?;

span.record_ok();
Ok(stream)
}

/// Checks for [`Stream`] existence on the server and returns handle to it.
/// That handle can be used to manage and use [`Consumer`].
///
/// # Examples
///
/// ```no_run
/// # use si_data_nats::{Client, ConnectOptions};
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// let client = Client::connect_with_options(
/// "localhost:4222",
/// None,
/// ConnectOptions::default(),
/// ).await?;
/// let jetstream = si_data_nats::jetstream::new(client);
///
/// let stream = jetstream.get_stream("events").await?;
/// # Ok(())
/// # }
/// ```
#[instrument(
name = "context.get_stream",
skip_all,
level = "debug",
fields(
messaging.client_id = self.metadata.messaging_client_id(),
messaging.nats.server.id = self.metadata.messaging_nats_server_id(),
messaging.nats.server.name = self.metadata.messaging_nats_server_name(),
messaging.nats.server.version = self.metadata.messaging_nats_server_version(),
messaging.system = self.metadata.messaging_system(),
messaging.url = self.metadata.messaging_url(),
network.peer.address = self.metadata.network_peer_address(),
network.protocol.name = self.metadata.network_protocol_name(),
network.protocol.version = self.metadata.network_protocol_version(),
network.transport = self.metadata.network_transport(),
otel.kind = SpanKind::Internal.as_str(),
otel.status_code = Empty,
otel.status_message = Empty,
server.address = self.metadata.server_address(),
server.port = self.metadata.server_port(),
)
)]
pub async fn get_stream<T: AsRef<str>>(&self, stream: T) -> Result<Stream, GetStreamError> {
let span = Span::current();

let stream = self
Expand Down Expand Up @@ -541,7 +603,7 @@ impl Context {
pub async fn get_or_create_stream<S>(
&self,
stream_config: S,
) -> Result<async_nats::jetstream::stream::Stream, CreateStreamError>
) -> Result<Stream, CreateStreamError>
where
S: Into<Config>,
{
Expand All @@ -560,7 +622,7 @@ impl Context {
Ok(stream)
}

/// Deletes a [Stream] with a given name.
/// Deletes a [`Stream`] with a given name.
///
/// # Examples
///
Expand Down Expand Up @@ -618,7 +680,7 @@ impl Context {
Ok(status)
}

/// Updates a [Stream] with a given config. If specific field cannot be updated, error is
/// Updates a [`Stream`] with a given config. If specific field cannot be updated, error is
/// returned.
///
/// # Examples
Expand Down Expand Up @@ -688,6 +750,67 @@ impl Context {
Ok(info)
}

/// Looks up Stream that contains provided subject.
///
/// # Examples
///
/// ```no_run
/// # use si_data_nats::{Client, ConnectOptions};
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::TryStreamExt;
/// let client = Client::connect_with_options(
/// "localhost:4222",
/// None,
/// ConnectOptions::default(),
/// ).await?;
/// let jetstream = si_data_nats::jetstream::new(client);
/// let stream_name = jetstream.stream_by_subject("foo.>");
/// # Ok(())
/// # }
/// ```
#[instrument(
name = "context.stream_by_subject",
skip_all,
level = "debug",
fields(
messaging.client_id = self.metadata.messaging_client_id(),
messaging.destination.name = Empty,
messaging.nats.server.id = self.metadata.messaging_nats_server_id(),
messaging.nats.server.name = self.metadata.messaging_nats_server_name(),
messaging.nats.server.version = self.metadata.messaging_nats_server_version(),
messaging.system = self.metadata.messaging_system(),
messaging.url = self.metadata.messaging_url(),
network.peer.address = self.metadata.network_peer_address(),
network.protocol.name = self.metadata.network_protocol_name(),
network.protocol.version = self.metadata.network_protocol_version(),
network.transport = self.metadata.network_transport(),
otel.kind = SpanKind::Internal.as_str(),
otel.status_code = Empty,
otel.status_message = Empty,
server.address = self.metadata.server_address(),
server.port = self.metadata.server_port(),
)
)]
pub async fn stream_by_subject<T: Into<String>>(
&self,
subject: T,
) -> Result<String, GetStreamByNameError> {
let span = Span::current();

let subject = subject.into();
span.record("messaging.destination.name", subject.as_str());

let name = self
.inner
.stream_by_subject(subject)
.await
.map_err(|err| span.record_err(err))?;

span.record_ok();
Ok(name)
}

/// Lists names of all streams for current context.
///
/// # Examples
Expand Down Expand Up @@ -920,11 +1043,10 @@ impl Context {
Ok(status)
}

/// Get a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to a
/// [Stream] first.
/// Get a [`Consumer`] straight from [`Context`], without binding to a [`Stream`] first.
///
/// It has one less interaction with the server when binding to only one
/// [crate::jetstream::consumer::Consumer].
/// [`Consumer`].
///
/// # Examples:
///
Expand Down Expand Up @@ -974,7 +1096,7 @@ impl Context {
&self,
consumer: C,
stream: S,
) -> Result<async_nats::jetstream::consumer::Consumer<T>, ConsumerError>
) -> Result<Consumer<T>, ConsumerError>
where
T: FromConsumer + IntoConsumerConfig,
S: AsRef<str>,
Expand All @@ -992,11 +1114,9 @@ impl Context {
Ok(consumer)
}

/// Delete a [crate::jetstream::consumer::Consumer] straight from [Context], without binding to
/// a [Stream] first.
/// Delete a [`Consumer`] straight from [`Context`], without binding to a [`Stream`] first.
///
/// It has one less interaction with the server when binding to only one
/// [crate::jetstream::consumer::Consumer].
/// It has one less interaction with the server when binding to only one [`Consumer`].
///
/// # Examples:
///
Expand Down Expand Up @@ -1060,7 +1180,7 @@ impl Context {
}

/// Create a new `Durable` or `Ephemeral` Consumer (if `durable_name` was not provided) and
/// returns the info from the server about created [Consumer] without binding to a [Stream]
/// returns the info from the server about created [`Consumer`] without binding to a [`Stream`]
/// first.
///
/// # Examples
Expand Down Expand Up @@ -1115,7 +1235,7 @@ impl Context {
&self,
config: C,
stream: S,
) -> Result<async_nats::jetstream::consumer::Consumer<C>, ConsumerError> {
) -> Result<Consumer<C>, ConsumerError> {
let span = Span::current();

let consumer = self
Expand All @@ -1131,7 +1251,7 @@ impl Context {
/// Send a request to the jetstream JSON API.
///
/// This is a low level API used mostly internally, that should be used only in specific cases
/// when this crate API on [Consumer] or [Stream] does not provide needed functionality.
/// when this crate API on [`Consumer`] or [`Stream`] does not provide needed functionality.
///
/// # Examples
///
Expand Down
4 changes: 4 additions & 0 deletions lib/si-data-nats/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ impl Client {

/// Returns true if the server version is compatible with the version components.
///
/// This has to be used with caution, as it is not guaranteed that the server that client is
/// connected to is the same version that the one that is a JetStream meta/stream/consumer
/// leader, especially across leafnodes.
///
/// # Examples
/// ```no_run
/// # #[tokio::main]
Expand Down
2 changes: 1 addition & 1 deletion lib/si-data-nats/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use telemetry::prelude::*;

use super::{ConnectionMetadata, Error, Message, Result};

/// Retrieves messages from given `subscription` created by [Client::subscribe].
/// Retrieves messages from given `subscription` created by `Client::subscribe`.
///
/// Implements [futures::stream::Stream] for ergonomic async message processing.
///
Expand Down
Loading