Skip to content

Commit

Permalink
Implement collections commands for Capella clusters
Browse files Browse the repository at this point in the history
Implement collections create for Capella clusters

Implement collections drop for Capella clusters
  • Loading branch information
Westwooo committed Sep 5, 2024
1 parent 9df73de commit 8ec9664
Show file tree
Hide file tree
Showing 6 changed files with 626 additions and 200 deletions.
247 changes: 164 additions & 83 deletions src/cli/collections.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::cli::util::{cluster_identifiers_from, get_active_cluster, NuValueMap};
use crate::cli::util::{
cluster_from_conn_str, cluster_identifiers_from, find_org_id, find_project_id,
get_active_cluster, NuValueMap,
};
use crate::client::ManagementRequest;
use crate::state::State;
use crate::state::{RemoteCapellaOrganization, State};
use log::debug;
use serde_derive::Deserialize;
use std::ops::Add;
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::time::Instant;
Expand All @@ -12,12 +16,15 @@ use crate::cli::error::{
client_error_to_shell_error, deserialize_error, no_active_bucket_error,
unexpected_status_code_error,
};
use crate::cli::no_active_scope_error;
use crate::client::cloud_json::Collection;
use crate::remote_cluster::RemoteClusterType::Provisioned;
use crate::RemoteCluster;
use nu_engine::CallExt;
use nu_protocol::ast::Call;
use nu_protocol::engine::{Command, EngineState, Stack};
use nu_protocol::{
Category, IntoPipelineData, PipelineData, ShellError, Signature, SyntaxShape, Value,
Category, IntoPipelineData, PipelineData, ShellError, Signature, Span, SyntaxShape, Value,
};

#[derive(Clone)]
Expand Down Expand Up @@ -51,7 +58,6 @@ impl Command for Collections {
"the clusters to query against",
None,
)
.switch("all", "include system scopes in the output", Some('a'))
.category(Category::Custom("couchbase".to_string()))
}

Expand Down Expand Up @@ -83,90 +89,59 @@ fn collections_get(
let cluster_identifiers = cluster_identifiers_from(engine_state, stack, &state, call, true)?;
let guard = state.lock().unwrap();

let scope: Option<String> = call.get_flag(engine_state, stack, "scope")?;

let display_all = call.has_flag(engine_state, stack, "all")?;

let mut results: Vec<Value> = vec![];
for identifier in cluster_identifiers {
let active_cluster = get_active_cluster(identifier.clone(), &guard, span)?;

let bucket = get_bucket_or_active(active_cluster, engine_state, stack, call)?;
let scope = get_scope_or_active(active_cluster, engine_state, stack, call)?;

debug!(
"Running collections get for bucket {:?}, scope {:?}",
&bucket, &scope
&bucket.clone(),
&scope
);

let response = active_cluster
.cluster()
.http_client()
.management_request(
ManagementRequest::GetCollections { bucket },
Instant::now().add(active_cluster.timeouts().management_timeout()),
let collections = if active_cluster.cluster_type() == Provisioned {
get_capella_collections(
identifier.clone(),
guard.named_or_active_org(active_cluster.capella_org())?,
guard.named_or_active_project(active_cluster.project())?,
active_cluster,
bucket.clone(),
scope.clone(),
ctrl_c.clone(),
span,
)
} else {
get_server_collections(
active_cluster,
bucket.clone(),
scope.clone(),
ctrl_c.clone(),
span,
)
.map_err(|e| client_error_to_shell_error(e, span))?;

let manifest: Manifest = match response.status() {
200 => serde_json::from_str(response.content())
.map_err(|e| deserialize_error(e.to_string(), span))?,
_ => {
return Err(unexpected_status_code_error(
response.status(),
response.content(),
span,
));
}
};

for scope_res in manifest.scopes {
if let Some(scope_name) = &scope {
if scope_name != &scope_res.name {
continue;
}
}
let collections = scope_res.collections;
if collections.is_empty() {
let mut collected = NuValueMap::default();
collected.add_string("scope", scope_res.name.clone(), span);
collected.add_string("collection", "", span);
collected.add(
"max_expiry",
Value::Duration {
val: 0,
internal_span: span,
},
);
collected.add_string("cluster", identifier.clone(), span);
results.push(collected.into_value(span));
continue;
}

for collection in collections {
if scope_res.name == *"_system" && !display_all {
continue;
}
let mut collected = NuValueMap::default();
collected.add_string("scope", scope_res.name.clone(), span);
collected.add_string("collection", collection.name, span);

let expiry = match collection.max_expiry {
-1 => "".to_string(),
0 => "inherited".to_string(),
_ => format!("{:?}", Duration::from_secs(collection.max_expiry as u64)),
};

collected.add(
"max_expiry",
Value::String {
val: expiry,
internal_span: span,
},
);
collected.add_string("cluster", identifier.clone(), span);
results.push(collected.into_value(span));
}
}?;

for collection in collections {
let mut collected = NuValueMap::default();
collected.add_string("collection", collection.name(), span);

let expiry = match collection.max_expiry() {
-1 => "".to_string(),
0 => "inherited".to_string(),
_ => format!("{:?}", Duration::from_secs(collection.max_expiry() as u64)),
};

collected.add(
"max_expiry",
Value::String {
val: expiry,
internal_span: span,
},
);
collected.add_string("cluster", identifier.clone(), span);
results.push(collected.into_value(span));
}
}

Expand All @@ -192,20 +167,126 @@ pub fn get_bucket_or_active(
}
}

#[derive(Debug, Deserialize)]
pub struct ManifestCollection {
pub name: String,
#[serde(rename = "maxTTL")]
pub max_expiry: i64,
pub fn get_scope_or_active(
active_cluster: &RemoteCluster,
engine_state: &EngineState,
stack: &mut Stack,
call: &Call,
) -> Result<String, ShellError> {
match call.get_flag(engine_state, stack, "scope")? {
Some(v) => Ok(v),
None => match active_cluster.active_scope() {
Some(s) => Ok(s),
None => Err(no_active_scope_error(call.span())),
},
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Clone)]
pub struct ManifestScope {
pub name: String,
pub collections: Vec<ManifestCollection>,
pub collections: Vec<Collection>,
}

impl ManifestScope {
pub fn collections(&self) -> Vec<Collection> {
self.collections.clone()
}
}

#[derive(Debug, Deserialize)]
pub struct Manifest {
pub scopes: Vec<ManifestScope>,
}

impl Manifest {
pub fn scopes(&self) -> Vec<ManifestScope> {
self.scopes.clone()
}
}

fn get_capella_collections(
identifier: String,
org: &RemoteCapellaOrganization,
project: String,
cluster: &RemoteCluster,
bucket: String,
scope: String,
ctrl_c: Arc<AtomicBool>,
span: Span,
) -> Result<Vec<Collection>, ShellError> {
let client = org.client();
let deadline = Instant::now().add(org.timeout());

let org_id = find_org_id(ctrl_c.clone(), &client, deadline, span)?;
let project_id = find_project_id(
ctrl_c.clone(),
project,
&client,
deadline,
span,
org_id.clone(),
)?;

let json_cluster = cluster_from_conn_str(
identifier.clone(),
ctrl_c.clone(),
cluster.hostnames().clone(),
&client,
deadline,
span,
org_id.clone(),
project_id.clone(),
)?;

let collections = client
.list_collections(
org_id,
project_id,
json_cluster.id(),
bucket,
scope,
deadline,
ctrl_c,
)
.map_err(|e| client_error_to_shell_error(e, span))?;

Ok(collections.items())
}

fn get_server_collections(
cluster: &RemoteCluster,
bucket: String,
scope: String,
ctrl_c: Arc<AtomicBool>,
span: Span,
) -> Result<Vec<Collection>, ShellError> {
let response = cluster
.cluster()
.http_client()
.management_request(
ManagementRequest::GetCollections { bucket },
Instant::now().add(cluster.timeouts().management_timeout()),
ctrl_c.clone(),
)
.map_err(|e| client_error_to_shell_error(e, span))?;

let manifest: Manifest = match response.status() {
200 => serde_json::from_str(response.content())
.map_err(|e| deserialize_error(e.to_string(), span))?,
_ => {
return Err(unexpected_status_code_error(
response.status(),
response.content(),
span,
));
}
};

Ok(manifest
.scopes()
.into_iter()
.find(|s| s.name == scope)
.map(|scp| scp.collections())
.unwrap_or(vec![]))
}
Loading

0 comments on commit 8ec9664

Please sign in to comment.