Skip to content

Commit 778a7e9

Browse files
authored
Add ClickHouse native connection pool (#6889)
- Implement a connector and connection pool for talking to ClickHouse over the native TCP protocol - Add the native TCP address or a `BoxedResolver` for resolving it to all code that connects to ClickHouse. This is a pretty noisy change, since we now need two addresses / objects everywhere. We'll take it back down to one when we completely make the switch to the native protocol and remove the HTTP interface. - Remove the feature flag gating the native code, keeping that just for the prototype SQL client shell. - NFC formatting change to bring the `oximeter_db::native` import style in line with the rest of the crate.
1 parent da0ea50 commit 778a7e9

File tree

35 files changed

+724
-282
lines changed

35 files changed

+724
-282
lines changed

.github/buildomat/jobs/deploy.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,11 @@ done
418418

419419
/usr/oxide/oxide --resolve "$OXIDE_RESOLVE" --cacert "$E2E_TLS_CERT" \
420420
project create --name images --description "some images"
421-
/usr/oxide/oxide --resolve "$OXIDE_RESOLVE" --cacert "$E2E_TLS_CERT" \
421+
# NOTE: Use a relatively large timeout on this call, to avoid #6771
422+
/usr/oxide/oxide \
423+
--resolve "$OXIDE_RESOLVE" \
424+
--cacert "$E2E_TLS_CERT" \
425+
--timeout 60 \
422426
disk import \
423427
--path debian-11-genericcloud-amd64.raw \
424428
--disk debian11-boot \

Cargo.lock

Lines changed: 35 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ clickhouse-admin-api = { path = "clickhouse-admin/api" }
319319
clickhouse-admin-keeper-client = { path = "clients/clickhouse-admin-keeper-client" }
320320
clickhouse-admin-server-client = { path = "clients/clickhouse-admin-server-client" }
321321
clickhouse-admin-types = { path = "clickhouse-admin/types" }
322-
clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "ceec762e6a87d2a22bf56792a3025e145caa095e" }
322+
clickward = { git = "https://github.com/oxidecomputer/clickward", rev = "a1b342c2558e835d09e6e39a40d3de798a29c2f" }
323323
cockroach-admin-api = { path = "cockroach-admin/api" }
324324
cockroach-admin-client = { path = "clients/cockroach-admin-client" }
325325
cockroach-admin-types = { path = "cockroach-admin/types" }
@@ -520,7 +520,7 @@ propolis_api_types = { git = "https://github.com/oxidecomputer/propolis", rev =
520520
propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" }
521521
propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "11371b0f3743f8df5b047dc0edc2699f4bdf3927" }
522522
proptest = "1.5.0"
523-
qorb = "0.0.2"
523+
qorb = "0.1.1"
524524
quote = "1.0"
525525
rand = "0.8.5"
526526
rand_core = "0.6.4"

dev-tools/omdb/src/bin/omdb/oxql.rs

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@ pub struct OxqlArgs {
3131
)]
3232
clickhouse_url: Option<String>,
3333

34+
/// URL of the ClickHouse server to connect to for the native protcol.
35+
#[arg(
36+
long,
37+
env = "OMDB_CLICKHOUSE_NATIVE_URL",
38+
global = true,
39+
help_heading = CONNECTION_OPTIONS_HEADING,
40+
)]
41+
clickhouse_native_url: Option<String>,
42+
3443
/// Print summaries of each SQL query run against the database.
3544
#[clap(long = "summaries")]
3645
print_summaries: bool,
@@ -47,29 +56,62 @@ impl OxqlArgs {
4756
omdb: &Omdb,
4857
log: &Logger,
4958
) -> anyhow::Result<()> {
50-
let addr = self.addr(omdb, log).await?;
59+
let http_addr = self.resolve_http_addr(omdb, log).await?;
60+
let native_addr = self.resolve_native_addr(omdb, log).await?;
5161

5262
let opts = ShellOptions {
5363
print_summaries: self.print_summaries,
5464
print_elapsed: self.print_elapsed,
5565
};
5666

5767
oxql::shell(
58-
addr.ip(),
59-
addr.port(),
68+
http_addr.ip(),
69+
http_addr.port(),
70+
native_addr.port(),
6071
log.new(slog::o!("component" => "clickhouse-client")),
6172
opts,
6273
)
6374
.await
6475
}
6576

66-
/// Resolve the ClickHouse URL to a socket address.
67-
async fn addr(
77+
/// Resolve the ClickHouse native TCP socket address.
78+
async fn resolve_native_addr(
79+
&self,
80+
omdb: &Omdb,
81+
log: &Logger,
82+
) -> anyhow::Result<SocketAddr> {
83+
self.resolve_addr(
84+
omdb,
85+
log,
86+
self.clickhouse_native_url.as_deref(),
87+
ServiceName::ClickhouseNative,
88+
)
89+
.await
90+
}
91+
92+
/// Resolve the ClickHouse HTTP URL to a socket address.
93+
async fn resolve_http_addr(
94+
&self,
95+
omdb: &Omdb,
96+
log: &Logger,
97+
) -> anyhow::Result<SocketAddr> {
98+
self.resolve_addr(
99+
omdb,
100+
log,
101+
self.clickhouse_url.as_deref(),
102+
ServiceName::Clickhouse,
103+
)
104+
.await
105+
}
106+
107+
async fn resolve_addr(
68108
&self,
69109
omdb: &Omdb,
70110
log: &Logger,
111+
maybe_url: Option<&str>,
112+
srv: ServiceName,
71113
) -> anyhow::Result<SocketAddr> {
72-
match &self.clickhouse_url {
114+
match maybe_url {
73115
Some(cli_or_env_url) => Url::parse(&cli_or_env_url)
74116
.context(
75117
"failed parsing URL from command-line or environment variable",
@@ -87,7 +129,7 @@ impl OxqlArgs {
87129
Ok(SocketAddr::V6(
88130
omdb.dns_lookup_one(
89131
log.clone(),
90-
ServiceName::Clickhouse,
132+
srv,
91133
)
92134
.await
93135
.context("failed looking up ClickHouse internal DNS entry")?,

dev-tools/omdb/tests/usage_errors.out

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -783,13 +783,16 @@ Usage: omdb oxql [OPTIONS]
783783
Options:
784784
--log-level <LOG_LEVEL> log level filter [env: LOG_LEVEL=] [default: warn]
785785
--summaries Print summaries of each SQL query run against the database
786-
--elapsed Print the total elapsed query duration
787786
--color <COLOR> Color output [default: auto] [possible values: auto, always, never]
787+
--elapsed Print the total elapsed query duration
788788
-h, --help Print help
789789

790790
Connection Options:
791791
--clickhouse-url <CLICKHOUSE_URL>
792792
URL of the ClickHouse server to connect to [env: OMDB_CLICKHOUSE_URL=]
793+
--clickhouse-native-url <CLICKHOUSE_NATIVE_URL>
794+
URL of the ClickHouse server to connect to for the native protcol [env:
795+
OMDB_CLICKHOUSE_NATIVE_URL=]
793796
--dns-server <DNS_SERVER>
794797
[env: OMDB_DNS_SERVER=]
795798

@@ -808,7 +811,7 @@ error: unexpected argument '--summarizes' found
808811

809812
tip: a similar argument exists: '--summaries'
810813

811-
Usage: omdb oxql <--clickhouse-url <CLICKHOUSE_URL>|--summaries|--elapsed>
814+
Usage: omdb oxql <--clickhouse-url <CLICKHOUSE_URL>|--clickhouse-native-url <CLICKHOUSE_NATIVE_URL>|--summaries|--elapsed>
812815

813816
For more information, try '--help'.
814817
=============================================

nexus/src/app/oximeter.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use internal_dns_types::names::ServiceName;
1212
use nexus_db_queries::context::OpContext;
1313
use nexus_db_queries::db;
1414
use nexus_db_queries::db::DataStore;
15-
use omicron_common::address::CLICKHOUSE_HTTP_PORT;
15+
use omicron_common::address::CLICKHOUSE_TCP_PORT;
1616
use omicron_common::api::external::{DataPageParams, Error, ListResultVec};
1717
use omicron_common::api::internal::nexus::{self, ProducerEndpoint};
1818
use oximeter_client::Client as OximeterClient;
@@ -60,15 +60,26 @@ impl LazyTimeseriesClient {
6060
pub(crate) async fn get(
6161
&self,
6262
) -> Result<oximeter_db::Client, ResolveError> {
63-
let address = match &self.source {
64-
ClientSource::FromIp { address } => *address,
65-
ClientSource::FromDns { resolver } => SocketAddr::new(
66-
resolver.lookup_ip(ServiceName::Clickhouse).await?,
67-
CLICKHOUSE_HTTP_PORT,
68-
),
63+
let (http_address, native_address) = match &self.source {
64+
ClientSource::FromIp { address } => {
65+
let native_address =
66+
SocketAddr::new(address.ip(), CLICKHOUSE_TCP_PORT);
67+
(*address, native_address)
68+
}
69+
ClientSource::FromDns { resolver } => {
70+
let http_address = SocketAddr::from(
71+
resolver.lookup_socket_v6(ServiceName::Clickhouse).await?,
72+
);
73+
let native_address = SocketAddr::from(
74+
resolver
75+
.lookup_socket_v6(ServiceName::ClickhouseNative)
76+
.await?,
77+
);
78+
(http_address, native_address)
79+
}
6980
};
7081

71-
Ok(oximeter_db::Client::new(address, &self.log))
82+
Ok(oximeter_db::Client::new(http_address, native_address, &self.log))
7283
}
7384
}
7485

nexus/test-utils/src/lib.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> {
624624
log.new(o!("component" => "oximeter")),
625625
nexus_internal_addr,
626626
clickhouse.http_address().port(),
627+
clickhouse.native_address().port(),
627628
collector_id,
628629
)
629630
.await
@@ -1449,11 +1450,16 @@ pub async fn start_sled_agent(
14491450
pub async fn start_oximeter(
14501451
log: Logger,
14511452
nexus_address: SocketAddr,
1452-
db_port: u16,
1453+
http_port: u16,
1454+
native_port: u16,
14531455
id: Uuid,
14541456
) -> Result<Oximeter, String> {
14551457
let db = oximeter_collector::DbConfig {
1456-
address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db_port)),
1458+
address: Some(SocketAddr::new(Ipv6Addr::LOCALHOST.into(), http_port)),
1459+
native_address: Some(SocketAddr::new(
1460+
Ipv6Addr::LOCALHOST.into(),
1461+
native_port,
1462+
)),
14571463
batch_size: 10,
14581464
batch_interval: 1,
14591465
replicated: false,

nexus/tests/integration_tests/oximeter.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,12 @@ async fn test_oximeter_reregistration() {
118118

119119
// ClickHouse client for verifying collection.
120120
let ch_address = context.clickhouse.http_address().into();
121-
let client = oximeter_db::Client::new(ch_address, &context.logctx.log);
121+
let native_address = context.clickhouse.native_address().into();
122+
let client = oximeter_db::Client::new(
123+
ch_address,
124+
native_address,
125+
&context.logctx.log,
126+
);
122127
client
123128
.init_single_node_db()
124129
.await
@@ -302,6 +307,7 @@ async fn test_oximeter_reregistration() {
302307
context.logctx.log.new(o!("component" => "oximeter")),
303308
context.server.get_http_server_internal_address().await,
304309
context.clickhouse.http_address().port(),
310+
context.clickhouse.native_address().port(),
305311
oximeter_id,
306312
)
307313
.await

oximeter/collector/src/agent.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use chrono::Utc;
1616
use futures::TryStreamExt;
1717
use nexus_client::types::IdSortMode;
1818
use nexus_client::Client as NexusClient;
19+
use omicron_common::address::CLICKHOUSE_TCP_PORT;
1920
use omicron_common::backoff;
2021
use omicron_common::backoff::BackoffError;
2122
use oximeter::types::ProducerResults;
@@ -34,6 +35,7 @@ use slog::warn;
3435
use slog::Logger;
3536
use std::collections::btree_map::Entry;
3637
use std::collections::BTreeMap;
38+
use std::net::SocketAddr;
3739
use std::net::SocketAddrV6;
3840
use std::ops::Bound;
3941
use std::sync::Arc;
@@ -383,12 +385,15 @@ pub struct OximeterAgent {
383385

384386
impl OximeterAgent {
385387
/// Construct a new agent with the given ID and logger.
388+
// TODO(cleanup): Remove this lint when we have only a native resolver.
389+
#[allow(clippy::too_many_arguments)]
386390
pub async fn with_id(
387391
id: Uuid,
388392
address: SocketAddrV6,
389393
refresh_interval: Duration,
390394
db_config: DbConfig,
391-
resolver: BoxedResolver,
395+
http_resolver: BoxedResolver,
396+
native_resolver: BoxedResolver,
392397
log: &Logger,
393398
replicated: bool,
394399
) -> Result<Self, Error> {
@@ -414,7 +419,8 @@ impl OximeterAgent {
414419
// - The DB doesn't exist at all. This reports a version number of 0. We
415420
// need to create the DB here, at the latest version. This is used in
416421
// fresh installations and tests.
417-
let client = Client::new_with_pool(resolver, &log);
422+
let client =
423+
Client::new_with_pool(http_resolver, native_resolver, &log);
418424
match client.check_db_is_at_expected_version().await {
419425
Ok(_) => {}
420426
Err(oximeter_db::Error::DatabaseVersionMismatch {
@@ -506,12 +512,18 @@ impl OximeterAgent {
506512
// prints the results as they're received.
507513
let insertion_log = log.new(o!("component" => "results-sink"));
508514
if let Some(db_config) = db_config {
509-
let Some(address) = db_config.address else {
515+
let Some(http_address) = db_config.address else {
510516
return Err(Error::Standalone(anyhow!(
511517
"Must provide explicit IP address in standalone mode"
512518
)));
513519
};
514-
let client = Client::new(address, &log);
520+
521+
// Grab the native TCP address, or construct one from the defaults.
522+
let native_address =
523+
db_config.native_address.unwrap_or_else(|| {
524+
SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT)
525+
});
526+
let client = Client::new(http_address, native_address, &log);
515527
let replicated = client.is_oximeter_cluster().await?;
516528
if !replicated {
517529
client.init_single_node_db().await?;

0 commit comments

Comments
 (0)