Skip to content

Commit

Permalink
feat(KV): Add methods to get list of kv stores
Browse files Browse the repository at this point in the history
Signed-off-by: MATILLAT Quentin <[email protected]>
  • Loading branch information
tinou98 committed May 26, 2024
1 parent 1fbb459 commit 6995178
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 21 deletions.
87 changes: 66 additions & 21 deletions async-nats/src/jetstream/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use tracing::debug;
use super::consumer::{self, Consumer, FromConsumer, IntoConsumerConfig};
use super::errors::ErrorCode;
use super::is_valid_name;
use super::kv::{Store, MAX_HISTORY};
use super::kv::{self, Store, MAX_HISTORY};
use super::object_store::{is_valid_bucket_name, ObjectStore};
use super::stream::{
self, Config, ConsumerError, ConsumerErrorKind, DeleteStatus, DiscardPolicy, External, Info,
Expand Down Expand Up @@ -522,6 +522,7 @@ impl Context {
page_request: None,
streams: Vec::new(),
done: false,
subjects: None,
}
}

Expand Down Expand Up @@ -549,6 +550,7 @@ impl Context {
page_request: None,
streams: Vec::new(),
done: false,
subjects: None,
}
}
/// Returns an existing key-value bucket.
Expand Down Expand Up @@ -760,6 +762,48 @@ impl Context {
.await
}

/// Lists names of all key-value bucket for current context.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::TryStreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let mut kv_names = jetstream.key_value_store_names();
/// while let Some(kv_name) = kv_names.try_next().await? {
/// println!("KV: {}", kv_name);
/// }
/// # Ok(())
/// # }
/// ```
pub fn key_value_store_names(&self) -> kv::lister::KeyValueNames {
kv::lister::KeyValueNames::new(self)
}

/// Lists all key-value bucket info for current context.
///
/// # Examples
///
/// ```no_run
/// # #[tokio::main]
/// # async fn main() -> Result<(), async_nats::Error> {
/// use futures::TryStreamExt;
/// let client = async_nats::connect("demo.nats.io:4222").await?;
/// let jetstream = async_nats::jetstream::new(client);
/// let mut kv_infos = jetstream.key_value_stores();
/// while let Some(kv_info) = kv_infos.try_next().await? {
/// println!("KV: {:?}", kv_info);
/// }
/// # Ok(())
/// # }
/// ```
pub fn key_value_stores(&self) -> kv::lister::KeyValueStores {
kv::lister::KeyValueStores::new(self)
}

// pub async fn update_key_value<C: Borrow<kv::Config>>(&self, config: C) -> Result<(), crate::Error> {
// let config = config.borrow();
// if !crate::jetstream::kv::is_valid_bucket_name(&config.bucket) {
Expand Down Expand Up @@ -1307,12 +1351,22 @@ struct StreamInfoPage {

type PageRequest = BoxFuture<'static, Result<StreamPage, RequestError>>;

#[derive(Serialize, Debug)]
pub struct StreamsRequest {
offset: usize,

#[serde(skip_serializing_if = "Option::is_none")]
subject: Option<String>,
}

pub struct StreamNames {
context: Context,
offset: usize,
page_request: Option<PageRequest>,
streams: Vec<String>,
done: bool,

pub(crate) subjects: Option<String>,
}

impl futures::Stream for StreamNames {
Expand Down Expand Up @@ -1352,17 +1406,12 @@ impl futures::Stream for StreamNames {
return Poll::Ready(None);
}
let context = self.context.clone();
let offset = self.offset;
let req = StreamsRequest {
offset: self.offset,
subject: self.subjects.clone(),
};
self.page_request = Some(Box::pin(async move {
match context
.request(
"STREAM.NAMES",
&json!({
"offset": offset,
}),
)
.await?
{
match context.request("STREAM.NAMES", &req).await? {
Response::Err { error } => {
Err(RequestError::with_source(RequestErrorKind::Other, error))
}
Expand All @@ -1387,6 +1436,7 @@ pub struct Streams {
page_request: Option<PageInfoRequest>,
streams: Vec<super::stream::Info>,
done: bool,
pub(crate) subjects: Option<String>,
}

impl futures::Stream for Streams {
Expand Down Expand Up @@ -1426,17 +1476,12 @@ impl futures::Stream for Streams {
return Poll::Ready(None);
}
let context = self.context.clone();
let offset = self.offset;
let req = StreamsRequest {
offset: self.offset,
subject: self.subjects.clone(),
};
self.page_request = Some(Box::pin(async move {
match context
.request(
"STREAM.LIST",
&json!({
"offset": offset,
}),
)
.await?
{
match context.request("STREAM.LIST", &req).await? {
Response::Err { error } => {
Err(RequestError::with_source(RequestErrorKind::Other, error))
}
Expand Down
94 changes: 94 additions & 0 deletions async-nats/src/jetstream/kv/lister.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2020-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::{
pin::Pin,
task::{ready, Poll},
};

use futures::TryStreamExt;

use crate::jetstream::{
context::{StreamNames, Streams, StreamsError},
Context,
};

pub struct KeyValueNames {
stream_names: StreamNames,
}

impl KeyValueNames {
pub fn new(ctx: &Context) -> Self {
let mut stream_names = ctx.stream_names();
stream_names.subjects = Some("$KV.*.>".to_owned());

Self { stream_names }
}
}

impl futures::Stream for KeyValueNames {
type Item = Result<String, StreamsError>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(loop {
if let Some(name) = ready!(self.stream_names.try_poll_next_unpin(cx)?) {
let name = name.strip_prefix("KV_").map(str::to_owned);

if let Some(name) = name {
break Some(Ok(name));
}
} else {
// The stream is done
break None;
}
})
}
}

pub struct KeyValueStores {
streams: Streams,
}

impl KeyValueStores {
pub fn new(ctx: &Context) -> Self {
let mut streams = ctx.streams();
streams.subjects = Some("$KV.*.>".to_owned());

Self { streams }
}
}

impl futures::Stream for KeyValueStores {
type Item = Result<super::bucket::Status, StreamsError>;

fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
Poll::Ready(loop {
if let Some(info) = ready!(self.streams.try_poll_next_unpin(cx)?) {
let bucket_name = info.config.name.strip_prefix("KV_").map(str::to_owned);

if let Some(bucket) = bucket_name {
break Some(Ok(super::bucket::Status { info, bucket }));
}
} else {
// The stream is done
break None;
}
})
}
}
1 change: 1 addition & 0 deletions async-nats/src/jetstream/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//! A Key-Value store built on top of JetStream, allowing you to store and retrieve data using simple key-value pairs.

pub mod bucket;
pub mod lister;

use std::{
fmt::{self, Display},
Expand Down

0 comments on commit 6995178

Please sign in to comment.