diff --git a/clickhouse-admin/api/src/lib.rs b/clickhouse-admin/api/src/lib.rs index d9e742d764..d83cfd4549 100644 --- a/clickhouse-admin/api/src/lib.rs +++ b/clickhouse-admin/api/src/lib.rs @@ -132,6 +132,15 @@ pub trait ClickhouseAdminServerApi { path_params: Path, query_params: Query, ) -> Result>, HttpError>; + + /// Idempotently initialize a replicated ClickHouse cluster database. + #[endpoint { + method = PUT, + path = "/init" + }] + async fn init_db( + rqctx: RequestContext, + ) -> Result; } /// API interface for our clickhouse-admin-single server diff --git a/clickhouse-admin/src/bin/clickhouse-admin-keeper.rs b/clickhouse-admin/src/bin/clickhouse-admin-keeper.rs index 953d109fda..7a994dc07d 100644 --- a/clickhouse-admin/src/bin/clickhouse-admin-keeper.rs +++ b/clickhouse-admin/src/bin/clickhouse-admin-keeper.rs @@ -8,7 +8,7 @@ use anyhow::anyhow; use camino::Utf8PathBuf; use clap::Parser; -use omicron_clickhouse_admin::{Clickward, Config}; +use omicron_clickhouse_admin::Config; use omicron_common::cmd::fatal; use omicron_common::cmd::CmdError; use std::net::{SocketAddr, SocketAddrV6}; @@ -54,9 +54,7 @@ async fn main_impl() -> Result<(), CmdError> { let mut config = Config::from_file(&config) .map_err(|err| CmdError::Failure(anyhow!(err)))?; config.dropshot.bind_address = SocketAddr::V6(http_address); - let clickward = Clickward::new(); let server = omicron_clickhouse_admin::start_keeper_admin_server( - clickward, binary_path, listen_address, config, diff --git a/clickhouse-admin/src/bin/clickhouse-admin-server.rs b/clickhouse-admin/src/bin/clickhouse-admin-server.rs index 486b91d3be..cc49ed9fd4 100644 --- a/clickhouse-admin/src/bin/clickhouse-admin-server.rs +++ b/clickhouse-admin/src/bin/clickhouse-admin-server.rs @@ -8,7 +8,7 @@ use anyhow::anyhow; use camino::Utf8PathBuf; use clap::Parser; -use omicron_clickhouse_admin::{Clickward, Config}; +use omicron_clickhouse_admin::Config; use omicron_common::cmd::fatal; use omicron_common::cmd::CmdError; use std::net::{SocketAddr, SocketAddrV6}; @@ -54,9 +54,7 @@ async fn main_impl() -> Result<(), CmdError> { let mut config = Config::from_file(&config) .map_err(|err| CmdError::Failure(anyhow!(err)))?; config.dropshot.bind_address = SocketAddr::V6(http_address); - let clickward = Clickward::new(); let server = omicron_clickhouse_admin::start_server_admin_server( - clickward, binary_path, listen_address, config, diff --git a/clickhouse-admin/src/context.rs b/clickhouse-admin/src/context.rs index 2469a347de..702c423130 100644 --- a/clickhouse-admin/src/context.rs +++ b/clickhouse-admin/src/context.rs @@ -10,17 +10,19 @@ use std::net::SocketAddrV6; use std::sync::Arc; use tokio::sync::Mutex; -pub struct ServerContext { +pub struct KeeperServerContext { clickward: Clickward, clickhouse_cli: ClickhouseCli, - _log: Logger, + log: Logger, } -impl ServerContext { - pub fn new(clickward: Clickward, clickhouse_cli: ClickhouseCli) -> Self { - let log = - clickhouse_cli.log.new(slog::o!("component" => "ServerContext")); - Self { clickward, clickhouse_cli, _log: log } +impl KeeperServerContext { + pub fn new(clickhouse_cli: ClickhouseCli) -> Self { + let log = clickhouse_cli + .log + .new(slog::o!("component" => "KeeperServerContext")); + let clickward = Clickward::new(); + Self { clickward, clickhouse_cli, log } } pub fn clickward(&self) -> &Clickward { @@ -30,24 +32,35 @@ impl ServerContext { pub fn clickhouse_cli(&self) -> &ClickhouseCli { &self.clickhouse_cli } + + pub fn log(&self) -> &Logger { + &self.log + } } -pub struct SingleServerContext { +pub struct ServerContext { clickhouse_cli: ClickhouseCli, + clickward: Clickward, oximeter_client: OximeterClient, initialization_lock: Arc>, + log: Logger, } -impl SingleServerContext { +impl ServerContext { pub fn new(clickhouse_cli: ClickhouseCli) -> Self { let ip = clickhouse_cli.listen_address.ip(); let address = SocketAddrV6::new(*ip, CLICKHOUSE_TCP_PORT, 0, 0); let oximeter_client = OximeterClient::new(address.into(), &clickhouse_cli.log); + let clickward = Clickward::new(); + let log = + clickhouse_cli.log.new(slog::o!("component" => "ServerContext")); Self { clickhouse_cli, + clickward, oximeter_client, initialization_lock: Arc::new(Mutex::new(())), + log, } } @@ -55,6 +68,10 @@ impl SingleServerContext { &self.clickhouse_cli } + pub fn clickward(&self) -> &Clickward { + &self.clickward + } + pub fn oximeter_client(&self) -> &OximeterClient { &self.oximeter_client } @@ -62,4 +79,8 @@ impl SingleServerContext { pub fn initialization_lock(&self) -> Arc> { self.initialization_lock.clone() } + + pub fn log(&self) -> &Logger { + &self.log + } } diff --git a/clickhouse-admin/src/http_entrypoints.rs b/clickhouse-admin/src/http_entrypoints.rs index a64b3a6435..cd8523de47 100644 --- a/clickhouse-admin/src/http_entrypoints.rs +++ b/clickhouse-admin/src/http_entrypoints.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use crate::context::{ServerContext, SingleServerContext}; +use crate::context::{KeeperServerContext, ServerContext}; use clickhouse_admin_api::*; use clickhouse_admin_types::{ ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf, @@ -24,13 +24,13 @@ pub fn clickhouse_admin_server_api() -> ApiDescription> { .expect("registered entrypoints") } -pub fn clickhouse_admin_keeper_api() -> ApiDescription> { +pub fn clickhouse_admin_keeper_api() -> ApiDescription> +{ clickhouse_admin_keeper_api_mod::api_description::() .expect("registered entrypoints") } -pub fn clickhouse_admin_single_api() -> ApiDescription> -{ +pub fn clickhouse_admin_single_api() -> ApiDescription> { clickhouse_admin_single_api_mod::api_description::() .expect("registered entrypoints") } @@ -78,12 +78,56 @@ impl ClickhouseAdminServerApi for ClickhouseAdminServerImpl { ctx.clickhouse_cli().system_timeseries_avg(settings).await?; Ok(HttpResponseOk(output)) } + + async fn init_db( + rqctx: RequestContext, + ) -> Result { + let ctx = rqctx.context(); + let log = ctx.log(); + + // Database initialization is idempotent, but not concurrency-safe. + // Use a mutex to serialize requests. + let lock = ctx.initialization_lock(); + let _guard = lock.lock().await; + + // Initialize the database only if it was not previously initialized. + // TODO: Migrate schema to newer version without wiping data. + let client = ctx.oximeter_client(); + let version = client.read_latest_version().await.map_err(|e| { + HttpError::for_internal_error(format!( + "can't read ClickHouse version: {e}", + )) + })?; + if version == 0 { + info!( + log, + "initializing replicated ClickHouse cluster to version {OXIMETER_VERSION}" + ); + let replicated = true; + ctx.oximeter_client() + .initialize_db_with_version(replicated, OXIMETER_VERSION) + .await + .map_err(|e| { + HttpError::for_internal_error(format!( + "can't initialize replicated ClickHouse cluster \ + to version {OXIMETER_VERSION}: {e}", + )) + })?; + } else { + info!( + log, + "skipping initialization of replicated ClickHouse cluster at version {version}" + ); + } + + Ok(HttpResponseUpdatedNoContent()) + } } enum ClickhouseAdminKeeperImpl {} impl ClickhouseAdminKeeperApi for ClickhouseAdminKeeperImpl { - type Context = Arc; + type Context = Arc; async fn generate_config_and_enable_svc( rqctx: RequestContext, @@ -137,13 +181,13 @@ impl ClickhouseAdminKeeperApi for ClickhouseAdminKeeperImpl { enum ClickhouseAdminSingleImpl {} impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl { - type Context = Arc; + type Context = Arc; async fn init_db( rqctx: RequestContext, ) -> Result { - let log = &rqctx.log; let ctx = rqctx.context(); + let log = ctx.log(); // Database initialization is idempotent, but not concurrency-safe. // Use a mutex to serialize requests. diff --git a/clickhouse-admin/src/lib.rs b/clickhouse-admin/src/lib.rs index e0057609c2..d6a6f1e366 100644 --- a/clickhouse-admin/src/lib.rs +++ b/clickhouse-admin/src/lib.rs @@ -3,7 +3,7 @@ // file, You can obtain one at https://mozilla.org/MPL/2.0/. use camino::Utf8PathBuf; -use context::{ServerContext, SingleServerContext}; +use context::{KeeperServerContext, ServerContext}; use dropshot::HttpServer; use omicron_common::FileKv; use slog::{debug, error, Drain}; @@ -36,7 +36,6 @@ pub enum StartError { /// Start the dropshot server for `clickhouse-admin-server` which /// manages clickhouse replica servers. pub async fn start_server_admin_server( - clickward: Clickward, binary_path: Utf8PathBuf, listen_address: SocketAddrV6, server_config: Config, @@ -64,7 +63,7 @@ pub async fn start_server_admin_server( listen_address, log.new(slog::o!("component" => "ClickhouseCli")), ); - let context = ServerContext::new(clickward, clickhouse_cli); + let context = ServerContext::new(clickhouse_cli); dropshot::ServerBuilder::new( http_entrypoints::clickhouse_admin_server_api(), Arc::new(context), @@ -78,11 +77,10 @@ pub async fn start_server_admin_server( /// Start the dropshot server for `clickhouse-admin-server` which /// manages clickhouse replica servers. pub async fn start_keeper_admin_server( - clickward: Clickward, binary_path: Utf8PathBuf, listen_address: SocketAddrV6, server_config: Config, -) -> Result>, StartError> { +) -> Result>, StartError> { let (drain, registration) = slog_dtrace::with_drain( server_config .log @@ -106,7 +104,7 @@ pub async fn start_keeper_admin_server( listen_address, log.new(slog::o!("component" => "ClickhouseCli")), ); - let context = ServerContext::new(clickward, clickhouse_cli); + let context = KeeperServerContext::new(clickhouse_cli); dropshot::ServerBuilder::new( http_entrypoints::clickhouse_admin_keeper_api(), Arc::new(context), @@ -123,7 +121,7 @@ pub async fn start_single_admin_server( binary_path: Utf8PathBuf, listen_address: SocketAddrV6, server_config: Config, -) -> Result>, StartError> { +) -> Result>, StartError> { let (drain, registration) = slog_dtrace::with_drain( server_config .log @@ -147,7 +145,7 @@ pub async fn start_single_admin_server( listen_address, log.new(slog::o!("component" => "ClickhouseCli")), ); - let context = SingleServerContext::new(clickhouse_cli); + let context = ServerContext::new(clickhouse_cli); dropshot::ServerBuilder::new( http_entrypoints::clickhouse_admin_single_api(), Arc::new(context), diff --git a/nexus/reconfigurator/execution/src/clickhouse.rs b/nexus/reconfigurator/execution/src/clickhouse.rs index 36e41aec1e..df8b7afe99 100644 --- a/nexus/reconfigurator/execution/src/clickhouse.rs +++ b/nexus/reconfigurator/execution/src/clickhouse.rs @@ -114,7 +114,7 @@ pub(crate) async fn deploy_nodes( }) })); } - for config in server_configs { + for config in &server_configs { let admin_addr = SocketAddr::V6(SocketAddrV6::new( config.settings.listen_addr, CLICKHOUSE_ADMIN_PORT, @@ -125,7 +125,7 @@ pub(crate) async fn deploy_nodes( let log = opctx.log.new(slog::o!("admin_url" => admin_url.clone())); futs.push(Either::Right(async move { let client = ClickhouseServerClient::new(&admin_url, log.clone()); - client + if let Err(e) = client .generate_config_and_enable_svc(&config) .await .map(|_| ()) @@ -140,7 +140,23 @@ pub(crate) async fn deploy_nodes( admin_url, e ) - }) + }) { + return Err(e); + }; + + client + .init_db() + .await + .map(|_| ()) + .map_err(|e| { + anyhow!( + concat!( + "failed to initialize the replicated ClickHouse cluster database:", + "error = {}" + ), + e + ) + }) })); } @@ -157,7 +173,7 @@ pub(crate) async fn deploy_nodes( info!( opctx.log, - "Successfully deployed all clickhouse server and keeper configs" + "Successfully deployed all clickhouse server and keeper configs, and initialised database schema." ); Ok(()) diff --git a/openapi/clickhouse-admin-server.json b/openapi/clickhouse-admin-server.json index 50c526569e..5da8a53838 100644 --- a/openapi/clickhouse-admin-server.json +++ b/openapi/clickhouse-admin-server.json @@ -74,6 +74,23 @@ } } }, + "/init": { + "put": { + "summary": "Idempotently initialize a replicated ClickHouse cluster database.", + "operationId": "init_db", + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/timeseries/{table}/{metric}/avg": { "get": { "summary": "Retrieve time series from the system database.",