diff --git a/async-nats/src/jetstream/context.rs b/async-nats/src/jetstream/context.rs index 2b3b2aed5..a3399b6fd 100644 --- a/async-nats/src/jetstream/context.rs +++ b/async-nats/src/jetstream/context.rs @@ -39,7 +39,7 @@ use tracing::debug; use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig}; use super::errors::ErrorCode; use super::is_valid_name; -use super::kv::{Store, MAX_HISTORY}; +use super::kv::{self, Store, MAX_HISTORY}; use super::object_store::{is_valid_bucket_name, ObjectStore}; use super::stream::{ self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info, @@ -522,6 +522,7 @@ impl Context { page_request: None, streams: Vec::new(), done: false, + subjects: None, } } @@ -549,6 +550,7 @@ impl Context { page_request: None, streams: Vec::new(), done: false, + subjects: None, } } /// Returns an existing key-value bucket. @@ -760,6 +762,48 @@ impl Context { .await } + /// Lists names of all key-value bucket for current context. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use futures::TryStreamExt; + /// let client = async_nats::connect("demo.nats.io:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let mut kv_names = jetstream.key_value_store_names(); + /// while let Some(kv_name) = kv_names.try_next().await? { + /// println!("KV: {}", kv_name); + /// } + /// # Ok(()) + /// # } + /// ``` + pub fn key_value_store_names(&self) -> kv::lister::KeyValueNames { + kv::lister::KeyValueNames::new(self) + } + + /// Lists all key-value bucket info for current context. + /// + /// # Examples + /// + /// ```no_run + /// # #[tokio::main] + /// # async fn main() -> Result<(), async_nats::Error> { + /// use futures::TryStreamExt; + /// let client = async_nats::connect("demo.nats.io:4222").await?; + /// let jetstream = async_nats::jetstream::new(client); + /// let mut kv_infos = jetstream.key_value_stores(); + /// while let Some(kv_info) = kv_infos.try_next().await? { + /// println!("KV: {:?}", kv_info); + /// } + /// # Ok(()) + /// # } + /// ``` + pub fn key_value_stores(&self) -> kv::lister::KeyValueStores { + kv::lister::KeyValueStores::new(self) + } + // pub async fn update_key_value>(&self, config: C) -> Result<(), crate::Error> { // let config = config.borrow(); // if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) { @@ -1307,12 +1351,22 @@ struct StreamInfoPage { type PageRequest = BoxFuture<'static, Result>; +#[derive(Serialize, Debug)] +pub struct StreamsRequest { + offset: usize, + + #[serde(skip_serializing_if = "Option::is_none")] + subject: Option, +} + pub struct StreamNames { context: Context, offset: usize, page_request: Option, streams: Vec, done: bool, + + pub(crate) subjects: Option, } impl futures::Stream for StreamNames { @@ -1352,17 +1406,12 @@ impl futures::Stream for StreamNames { return Poll::Ready(None); } let context = self.context.clone(); - let offset = self.offset; + let req = StreamsRequest { + offset: self.offset, + subject: self.subjects.clone(), + }; self.page_request = Some(Box::pin(async move { - match context - .request( - "STREAM.NAMES", - &json!({ - "offset": offset, - }), - ) - .await? - { + match context.request("STREAM.NAMES", &req).await? { Response::Err { error } => { Err(RequestError::with_source(RequestErrorKind::Other, error)) } @@ -1387,6 +1436,7 @@ pub struct Streams { page_request: Option, streams: Vec, done: bool, + pub(crate) subjects: Option, } impl futures::Stream for Streams { @@ -1426,17 +1476,12 @@ impl futures::Stream for Streams { return Poll::Ready(None); } let context = self.context.clone(); - let offset = self.offset; + let req = StreamsRequest { + offset: self.offset, + subject: self.subjects.clone(), + }; self.page_request = Some(Box::pin(async move { - match context - .request( - "STREAM.LIST", - &json!({ - "offset": offset, - }), - ) - .await? - { + match context.request("STREAM.LIST", &req).await? { Response::Err { error } => { Err(RequestError::with_source(RequestErrorKind::Other, error)) } diff --git a/async-nats/src/jetstream/kv/lister.rs b/async-nats/src/jetstream/kv/lister.rs new file mode 100644 index 000000000..e259feb4f --- /dev/null +++ b/async-nats/src/jetstream/kv/lister.rs @@ -0,0 +1,94 @@ +// Copyright 2020-2022 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + pin::Pin, + task::{ready, Poll}, +}; + +use futures::TryStreamExt; + +use crate::jetstream::{ + context::{StreamNames, Streams, StreamsError}, + Context, +}; + +pub struct KeyValueNames { + stream_names: StreamNames, +} + +impl KeyValueNames { + pub fn new(ctx: &Context) -> Self { + let mut stream_names = ctx.stream_names(); + stream_names.subjects = Some("$KV.*.>".to_owned()); + + Self { stream_names } + } +} + +impl futures::Stream for KeyValueNames { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Ready(loop { + if let Some(name) = ready!(self.stream_names.try_poll_next_unpin(cx)?) { + let name = name.strip_prefix("KV_").map(str::to_owned); + + if let Some(name) = name { + break Some(Ok(name)); + } + } else { + // The stream is done + break None; + } + }) + } +} + +pub struct KeyValueStores { + streams: Streams, +} + +impl KeyValueStores { + pub fn new(ctx: &Context) -> Self { + let mut streams = ctx.streams(); + streams.subjects = Some("$KV.*.>".to_owned()); + + Self { streams } + } +} + +impl futures::Stream for KeyValueStores { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Poll::Ready(loop { + if let Some(info) = ready!(self.streams.try_poll_next_unpin(cx)?) { + let bucket_name = info.config.name.strip_prefix("KV_").map(str::to_owned); + + if let Some(bucket) = bucket_name { + break Some(Ok(super::bucket::Status { info, bucket })); + } + } else { + // The stream is done + break None; + } + }) + } +} diff --git a/async-nats/src/jetstream/kv/mod.rs b/async-nats/src/jetstream/kv/mod.rs index ff99e364a..8be76c616 100644 --- a/async-nats/src/jetstream/kv/mod.rs +++ b/async-nats/src/jetstream/kv/mod.rs @@ -14,6 +14,7 @@ //! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs. pub mod bucket; +pub mod lister; use std::{ fmt::{self, Display},