Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reconfigurator] initialize clickhouse cluster db schema #7306

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions clickhouse-admin/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,15 @@ pub trait ClickhouseAdminServerApi {
path_params: Path<MetricInfoPath>,
query_params: Query<TimeSeriesSettingsQuery>,
) -> Result<HttpResponseOk<Vec<SystemTimeSeries>>, HttpError>;

/// Idempotently initialize a replicated ClickHouse cluster database.
#[endpoint {
method = PUT,
path = "/init"
}]
async fn init_db(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseUpdatedNoContent, HttpError>;
}

/// API interface for our clickhouse-admin-single server
Expand Down
4 changes: 1 addition & 3 deletions clickhouse-admin/src/bin/clickhouse-admin-keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use anyhow::anyhow;
use camino::Utf8PathBuf;
use clap::Parser;
use omicron_clickhouse_admin::{Clickward, Config};
use omicron_clickhouse_admin::Config;
use omicron_common::cmd::fatal;
use omicron_common::cmd::CmdError;
use std::net::{SocketAddr, SocketAddrV6};
Expand Down Expand Up @@ -54,9 +54,7 @@ async fn main_impl() -> Result<(), CmdError> {
let mut config = Config::from_file(&config)
.map_err(|err| CmdError::Failure(anyhow!(err)))?;
config.dropshot.bind_address = SocketAddr::V6(http_address);
let clickward = Clickward::new();
let server = omicron_clickhouse_admin::start_keeper_admin_server(
clickward,
binary_path,
listen_address,
config,
Expand Down
4 changes: 1 addition & 3 deletions clickhouse-admin/src/bin/clickhouse-admin-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use anyhow::anyhow;
use camino::Utf8PathBuf;
use clap::Parser;
use omicron_clickhouse_admin::{Clickward, Config};
use omicron_clickhouse_admin::Config;
use omicron_common::cmd::fatal;
use omicron_common::cmd::CmdError;
use std::net::{SocketAddr, SocketAddrV6};
Expand Down Expand Up @@ -54,9 +54,7 @@ async fn main_impl() -> Result<(), CmdError> {
let mut config = Config::from_file(&config)
.map_err(|err| CmdError::Failure(anyhow!(err)))?;
config.dropshot.bind_address = SocketAddr::V6(http_address);
let clickward = Clickward::new();
let server = omicron_clickhouse_admin::start_server_admin_server(
clickward,
binary_path,
listen_address,
config,
Expand Down
39 changes: 30 additions & 9 deletions clickhouse-admin/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,19 @@ use std::net::SocketAddrV6;
use std::sync::Arc;
use tokio::sync::Mutex;

pub struct ServerContext {
pub struct KeeperServerContext {
clickward: Clickward,
clickhouse_cli: ClickhouseCli,
_log: Logger,
log: Logger,
}

impl ServerContext {
pub fn new(clickward: Clickward, clickhouse_cli: ClickhouseCli) -> Self {
let log =
clickhouse_cli.log.new(slog::o!("component" => "ServerContext"));
Self { clickward, clickhouse_cli, _log: log }
impl KeeperServerContext {
pub fn new(clickhouse_cli: ClickhouseCli) -> Self {
let log = clickhouse_cli
.log
.new(slog::o!("component" => "KeeperServerContext"));
let clickward = Clickward::new();
Self { clickward, clickhouse_cli, log }
}

pub fn clickward(&self) -> &Clickward {
Expand All @@ -30,36 +32,55 @@ impl ServerContext {
pub fn clickhouse_cli(&self) -> &ClickhouseCli {
&self.clickhouse_cli
}

pub fn log(&self) -> &Logger {
&self.log
}
}

pub struct SingleServerContext {
pub struct ServerContext {
clickhouse_cli: ClickhouseCli,
clickward: Clickward,
oximeter_client: OximeterClient,
initialization_lock: Arc<Mutex<()>>,
log: Logger,
}

impl SingleServerContext {
impl ServerContext {
pub fn new(clickhouse_cli: ClickhouseCli) -> Self {
let ip = clickhouse_cli.listen_address.ip();
let address = SocketAddrV6::new(*ip, CLICKHOUSE_TCP_PORT, 0, 0);
let oximeter_client =
OximeterClient::new(address.into(), &clickhouse_cli.log);
let clickward = Clickward::new();
let log =
clickhouse_cli.log.new(slog::o!("component" => "ServerContext"));
Self {
clickhouse_cli,
clickward,
oximeter_client,
initialization_lock: Arc::new(Mutex::new(())),
log,
}
}

pub fn clickhouse_cli(&self) -> &ClickhouseCli {
&self.clickhouse_cli
}

pub fn clickward(&self) -> &Clickward {
&self.clickward
}

pub fn oximeter_client(&self) -> &OximeterClient {
&self.oximeter_client
}

pub fn initialization_lock(&self) -> Arc<Mutex<()>> {
self.initialization_lock.clone()
}

pub fn log(&self) -> &Logger {
&self.log
}
}
57 changes: 50 additions & 7 deletions clickhouse-admin/src/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use crate::context::{ServerContext, SingleServerContext};
use crate::context::{KeeperServerContext, ServerContext};
use clickhouse_admin_api::*;
use clickhouse_admin_types::{
ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf,
Expand All @@ -24,13 +24,13 @@ pub fn clickhouse_admin_server_api() -> ApiDescription<Arc<ServerContext>> {
.expect("registered entrypoints")
}

pub fn clickhouse_admin_keeper_api() -> ApiDescription<Arc<ServerContext>> {
pub fn clickhouse_admin_keeper_api() -> ApiDescription<Arc<KeeperServerContext>>
{
clickhouse_admin_keeper_api_mod::api_description::<ClickhouseAdminKeeperImpl>()
.expect("registered entrypoints")
}

pub fn clickhouse_admin_single_api() -> ApiDescription<Arc<SingleServerContext>>
{
pub fn clickhouse_admin_single_api() -> ApiDescription<Arc<ServerContext>> {
clickhouse_admin_single_api_mod::api_description::<ClickhouseAdminSingleImpl>()
.expect("registered entrypoints")
}
Expand Down Expand Up @@ -78,12 +78,55 @@ impl ClickhouseAdminServerApi for ClickhouseAdminServerImpl {
ctx.clickhouse_cli().system_timeseries_avg(settings).await?;
Ok(HttpResponseOk(output))
}

async fn init_db(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
let ctx = rqctx.context();
let log = ctx.log();

// Database initialization is idempotent, but not concurrency-safe.
// Use a mutex to serialize requests.
let lock = ctx.initialization_lock();
let _guard = lock.lock().await;

// Initialize the database only if it was not previously initialized.
// TODO: Migrate schema to newer version without wiping data.
let client = ctx.oximeter_client();
let version = client.read_latest_version().await.map_err(|e| {
HttpError::for_internal_error(format!(
"can't read ClickHouse version: {e}",
))
})?;
if version == 0 {
info!(
log,
"initializing replicated ClickHouse cluster to version {OXIMETER_VERSION}"
);
ctx.oximeter_client()
.initialize_db_with_version(true, OXIMETER_VERSION)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: APIs with bool parameters make it hard to figure out what is being passed in at the callsite. I usually create a temp variable to name the flag then pass that in. I'd suggest that here with something like:

let replicated = true;
ctx.oximeter_client().initialize_db_with_version(replicated, OXIMETER_VERSION);

.await
.map_err(|e| {
HttpError::for_internal_error(format!(
"can't initialize replicated ClickHouse cluster \
to version {OXIMETER_VERSION}: {e}",
))
})?;
} else {
info!(
log,
"skipping initialization of replicated ClickHouse cluster at version {version}"
);
}

Ok(HttpResponseUpdatedNoContent())
}
}

enum ClickhouseAdminKeeperImpl {}

impl ClickhouseAdminKeeperApi for ClickhouseAdminKeeperImpl {
type Context = Arc<ServerContext>;
type Context = Arc<KeeperServerContext>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call on the rename here ;)


async fn generate_config_and_enable_svc(
rqctx: RequestContext<Self::Context>,
Expand Down Expand Up @@ -137,13 +180,13 @@ impl ClickhouseAdminKeeperApi for ClickhouseAdminKeeperImpl {
enum ClickhouseAdminSingleImpl {}

impl ClickhouseAdminSingleApi for ClickhouseAdminSingleImpl {
type Context = Arc<SingleServerContext>;
type Context = Arc<ServerContext>;

async fn init_db(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseUpdatedNoContent, HttpError> {
let log = &rqctx.log;
let ctx = rqctx.context();
let log = ctx.log();

// Database initialization is idempotent, but not concurrency-safe.
// Use a mutex to serialize requests.
Expand Down
14 changes: 6 additions & 8 deletions clickhouse-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use camino::Utf8PathBuf;
use context::{ServerContext, SingleServerContext};
use context::{KeeperServerContext, ServerContext};
use dropshot::HttpServer;
use omicron_common::FileKv;
use slog::{debug, error, Drain};
Expand Down Expand Up @@ -36,7 +36,6 @@ pub enum StartError {
/// Start the dropshot server for `clickhouse-admin-server` which
/// manages clickhouse replica servers.
pub async fn start_server_admin_server(
clickward: Clickward,
binary_path: Utf8PathBuf,
listen_address: SocketAddrV6,
server_config: Config,
Expand Down Expand Up @@ -64,7 +63,7 @@ pub async fn start_server_admin_server(
listen_address,
log.new(slog::o!("component" => "ClickhouseCli")),
);
let context = ServerContext::new(clickward, clickhouse_cli);
let context = ServerContext::new(clickhouse_cli);
dropshot::ServerBuilder::new(
http_entrypoints::clickhouse_admin_server_api(),
Arc::new(context),
Expand All @@ -78,11 +77,10 @@ pub async fn start_server_admin_server(
/// Start the dropshot server for `clickhouse-admin-server` which
/// manages clickhouse replica servers.
pub async fn start_keeper_admin_server(
clickward: Clickward,
binary_path: Utf8PathBuf,
listen_address: SocketAddrV6,
server_config: Config,
) -> Result<HttpServer<Arc<ServerContext>>, StartError> {
) -> Result<HttpServer<Arc<KeeperServerContext>>, StartError> {
let (drain, registration) = slog_dtrace::with_drain(
server_config
.log
Expand All @@ -106,7 +104,7 @@ pub async fn start_keeper_admin_server(
listen_address,
log.new(slog::o!("component" => "ClickhouseCli")),
);
let context = ServerContext::new(clickward, clickhouse_cli);
let context = KeeperServerContext::new(clickhouse_cli);
dropshot::ServerBuilder::new(
http_entrypoints::clickhouse_admin_keeper_api(),
Arc::new(context),
Expand All @@ -123,7 +121,7 @@ pub async fn start_single_admin_server(
binary_path: Utf8PathBuf,
listen_address: SocketAddrV6,
server_config: Config,
) -> Result<HttpServer<Arc<SingleServerContext>>, StartError> {
) -> Result<HttpServer<Arc<ServerContext>>, StartError> {
let (drain, registration) = slog_dtrace::with_drain(
server_config
.log
Expand All @@ -147,7 +145,7 @@ pub async fn start_single_admin_server(
listen_address,
log.new(slog::o!("component" => "ClickhouseCli")),
);
let context = SingleServerContext::new(clickhouse_cli);
let context = ServerContext::new(clickhouse_cli);
dropshot::ServerBuilder::new(
http_entrypoints::clickhouse_admin_single_api(),
Arc::new(context),
Expand Down
34 changes: 33 additions & 1 deletion nexus/reconfigurator/execution/src/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub(crate) async fn deploy_nodes(
})
}));
}
for config in server_configs {
for config in &server_configs {
let admin_addr = SocketAddr::V6(SocketAddrV6::new(
config.settings.listen_addr,
CLICKHOUSE_ADMIN_PORT,
Expand Down Expand Up @@ -160,6 +160,38 @@ pub(crate) async fn deploy_nodes(
"Successfully deployed all clickhouse server and keeper configs"
);

// We only need to initialise the database schema into one of the ClickHouse replica
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is only true for the initial cluster. If you add a new node, it will not get initialized automatically. Since the operation is idempotent I would go ahead and initialize all the servers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you add a new node, it will not get initialized automatically.

ugh, that's a bummer. yep! will change then

// servers as they are all part of the same cluster.
let Some(first_server_config) = server_configs.first() else {
let e = concat!(
"Failed to initialise database schema on the replicated ClickHouse cluster:",
" no replica server configuration file found");
error!(opctx.log, "{e}",);
return Err(vec![anyhow!(e)]);
};

let admin_addr = SocketAddr::V6(SocketAddrV6::new(
first_server_config.settings.listen_addr,
CLICKHOUSE_ADMIN_PORT,
0,
0,
));
let admin_url = format!("http://{admin_addr}");
let log = opctx.log.new(slog::o!("admin_url" => admin_url.clone()));
let client = ClickhouseSingleClient::new(&admin_url, log.clone());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be ClickhouseServerClient instead of ClickhouseSingleClient? Right now they happen to have compatible APIs for this one method (.init_db), but we probably don't want to depend on that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops! good catch. Thanks!

let _ = client.init_db().await.map(|_| ()).map_err(|e| {
let err = format!(
"Failed to initialize the replicated ClickHouse cluster database: {e}"
);
error!(opctx.log, "{err}");
return vec![anyhow!(err)];
});

info!(
opctx.log,
"Successfully initialised the replicated ClickHouse cluster database schema"
);

Ok(())
}

Expand Down
17 changes: 17 additions & 0 deletions openapi/clickhouse-admin-server.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,23 @@
}
}
},
"/init": {
"put": {
"summary": "Idempotently initialize a replicated ClickHouse cluster database.",
"operationId": "init_db",
"responses": {
"204": {
"description": "resource updated"
},
"4XX": {
"$ref": "#/components/responses/Error"
},
"5XX": {
"$ref": "#/components/responses/Error"
}
}
}
},
"/timeseries/{table}/{metric}/avg": {
"get": {
"summary": "Retrieve time series from the system database.",
Expand Down
Loading