Skip to content

Commit cc0cb12

Browse files
authored
Store metric endpoint information in CockroachDB (#127)
* Stores metric producer/collector information CockroachDB - Adds tables to the omicron database to store oximeter collectors, metric producers, and the assignment of producers to collectors - Adds Rust types and conversion methods for reading/writing records from these new tables - Updates the existing Nexus endpoint for registering an oximeter collector instance into the appropriate table - Updates the existing Nexus endpoint for registering a metric producer to: insert the producer information into the appropriate table; select an oximeter instance (currently from memory) for assignment; insert the assignment into the table; complete the assignment as before. * Adds a few integration tests for Oximeter - Adds an Oximeter collector server and producer server to the Nexus common integration test context. - Verifies that records for Oximeter, the producer, and the assignment between them are accurately stored in the control plane database. * Adds better integration tests and support for automatic port assignment During integration tests, we want to run multiple copies of various servers side-by-side. This means we can't hard-code ports. This commit adds support for automatically assigning ports to ClickHouse and the `oximeter::ProducerServer` type. For the latter, the `ProducerEndpoint` type that's used to set up the `ProducerServer` can optionally have a socket address with port 0. In this case, that value is _overwritten_ with the Dropshot HTTP server's local port, once it's been bound and listening. A log message is printed in this case. For the ClickHouse server, this requires some annoying workarounds. ClickHouse supports listening on port 0, but it doesn't give us a way to discover the actual port the OS assigned it. At this point, we have to ask the OS. That's currently done with the `lsof` on most platforms, and `pfiles` on illumos, but in both cases it implies some shitty parsing of the text output. This is all done (and tested) in `omicron_common::dev::clickhouse`. Note that all tests of the parsing are run on all platforms -- the expected output is included in the tests, not collected at runtime from the system. This commit also moves some types around in `oximeter`. The `collect` module now has _just_ the `Collector` type, and the producer server and its associated types are in `oximeter::producer_server`. The `ProducerServer` itself is re-exported at the crate root. * Addressing PR comments
1 parent 7f97b94 commit cc0cb12

19 files changed

+990
-209
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

omicron-common/src/bin/omicron-dev.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,19 @@ async fn cmd_clickhouse_run(args: &ChRunArgs) -> Result<(), anyhow::Error> {
251251
let mut db_instance =
252252
dev::clickhouse::ClickHouseInstance::new(args.port).await?;
253253
println!(
254-
"omicron-dev: running ClickHouse (PID: {}), full command is \"clickhouse {}\"",
255-
db_instance.pid().expect("Failed to get process PID, it may not have started"),
254+
"omicron-dev: running ClickHouse with full command:\n\"clickhouse {}\"",
256255
db_instance.cmdline().join(" ")
257256
);
257+
println!(
258+
"omicron-dev: ClickHouse is running with PID {}",
259+
db_instance
260+
.pid()
261+
.expect("Failed to get process PID, it may not have started")
262+
);
263+
println!(
264+
"omicron-dev: ClickHouse HTTP server listening on port {}",
265+
db_instance.port()
266+
);
258267
println!(
259268
"omicron-dev: using {} for ClickHouse data storage",
260269
db_instance.data_path().display()

omicron-common/src/dev/clickhouse.rs

+301-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::path::{Path, PathBuf};
44
use std::process::Stdio;
55
use std::time::Duration;
66

7-
use anyhow::Context;
7+
use anyhow::{bail, ensure, Context};
88
use tempfile::TempDir;
99

1010
use crate::dev::poll;
@@ -77,10 +77,26 @@ impl ClickHouseInstance {
7777
}
7878
},
7979
&Duration::from_millis(500),
80-
&Duration::from_secs(30),
80+
&Duration::from_secs(10),
8181
)
8282
.await?;
8383

84+
// Discover the HTTP port on which we're listening, if it's not provided explicitly by the
85+
// caller.
86+
//
87+
// Note: This is annoying. For tests, or any situation in which we'd run multiple servers,
88+
// we'd like to let the OS choose a port for us, by listening on port 0. ClickHouse
89+
// supports this, but doesn't do anything to discover the port on which it's actually
90+
// listening (i.e., the log file just says "listening on port 0"). In contrast, CockroachDB
91+
// dumps the full URL on which it's listening to a file for users to discover.
92+
//
93+
// This is a workaround which shells out to `lsof` or `pfiles` to discover the port.
94+
let port = if port != 0 {
95+
port
96+
} else {
97+
discover_local_listening_port(child.id().unwrap()).await?
98+
};
99+
84100
Ok(Self {
85101
data_dir: Some(data_dir),
86102
data_path,
@@ -139,7 +155,7 @@ impl Drop for ClickHouseInstance {
139155
fn drop(&mut self) {
140156
if self.child.is_some() || self.data_dir.is_some() {
141157
eprintln!(
142-
"WARN: dropped CockroachInstance without cleaning it up first \
158+
"WARN: dropped ClickHouseInstance without cleaning it up first \
143159
(there may still be a child process running and a \
144160
temporary directory leaked)"
145161
);
@@ -152,3 +168,285 @@ impl Drop for ClickHouseInstance {
152168
}
153169
}
154170
}
171+
172+
// Parse the output of the command used to find the HTTP port ClickHouse listens on, in the event
173+
// we start it with a port of 0.
174+
#[cfg(not(target_os = "illumos"))]
175+
async fn discover_local_listening_port(pid: u32) -> Result<u16, anyhow::Error> {
176+
// `lsof` doesn't do the right thing with the PID. It seems to list _all_ files for the
177+
// process, on both macOS and Linux, rather than the intersection of the PID we care about and
178+
// the other filters. So we ignore it.
179+
let output = tokio::process::Command::new("lsof")
180+
.arg("-i")
181+
.arg("6TCP@localhost") // TCP over IPv6, on the local address
182+
.arg("-F")
183+
.arg("n") // Only print the file name, \n-terminated
184+
.output() // Spawn and collect the output
185+
.await
186+
.context("Could not determine ClickHouse port number: Failed to spawn process.")?;
187+
ensure!(
188+
output.status.success(),
189+
"Could not determine ClickHouse port number: Process failed"
190+
);
191+
extract_port_from_lsof_output(pid, &output.stdout)
192+
}
193+
194+
// Parse the output of the command used to find the HTTP port ClickHouse listens on, in the event
195+
// we start it with a port of 0.
196+
#[cfg(target_os = "illumos")]
197+
async fn discover_local_listening_port(pid: u32) -> Result<u16, anyhow::Error> {
198+
let output = tokio::process::Command::new("pfiles")
199+
.arg(format!("{}", pid))
200+
.output() // Spawn and collect the output
201+
.await
202+
.context("Could not determine ClickHouse port number: Failed to spawn pfiles process.")?;
203+
ensure!(
204+
output.status.success(),
205+
"Could not determine ClickHouse port number: Pfiles process failed"
206+
);
207+
extract_port_from_pfiles_output(&output.stdout)
208+
}
209+
210+
// Ports that ClickHouse opens, but we'd like to ignore when discovering.
211+
const IGNORED_CLICKHOUSE_PORTS: &[u16] = &[9000, 9004];
212+
213+
// Extract the port from `pfiles` output.
214+
//
215+
// This output is much simpler, since it already restricts things to the PID we're interested int.
216+
// Just look for AF_INET lines and pull out the port.
217+
#[cfg_attr(target_os = "illumos", allow(dead_code))]
218+
fn extract_port_from_pfiles_output(
219+
output: &[u8],
220+
) -> Result<u16, anyhow::Error> {
221+
let text = std::str::from_utf8(output)
222+
.context("Could not determine ClickHouse port number: Non-UTF8 output from command")?;
223+
for port_str in text.lines().filter_map(|line| {
224+
if line.trim().starts_with("sockname: AF_INET") {
225+
line.split_whitespace().last()
226+
} else {
227+
None
228+
}
229+
}) {
230+
let port = port_str
231+
.parse()
232+
.context("Could not determine ClickHouse port number: Invalid port found in output")?;
233+
if !IGNORED_CLICKHOUSE_PORTS.contains(&port) {
234+
return Ok(port);
235+
}
236+
}
237+
bail!("Could not determine ClickHouse port number: No valid ports found in output");
238+
}
239+
240+
// Parse the output from the `lsof` command on non-illumos systems
241+
//
242+
// The exact command run is: `lsof -i 6TCP@localhost -F n`.
243+
//
244+
// The output has groups of files like this:
245+
// p<PID>
246+
// f<FD>
247+
// n<NAME>
248+
// f<FD>
249+
// n<NAME>
250+
// ...
251+
// p<PID>
252+
// ...
253+
//
254+
// Parsing proceeds by:
255+
// - Splitting into lines
256+
// - Ignoring output until a PID line `p<PID>` is found, with the expected PID
257+
// - Ignores `n<FD>` lines
258+
// - Parses lines that look like `flocalhost:<NUMERIC_PORT>`
259+
// - Returns the first match, that's _not_ one of the other ports ClickHouse opens.
260+
//
261+
// If any of these conditions fails, an error is returned.
262+
#[cfg_attr(not(target_os = "illumos"), allow(dead_code))]
263+
fn extract_port_from_lsof_output(
264+
expected_pid: u32,
265+
output: &[u8],
266+
) -> Result<u16, anyhow::Error> {
267+
ensure!(
268+
!output.is_empty(),
269+
"Could not determine ClickHouse port number: Process output empty"
270+
);
271+
272+
// Break into newline-terminated chunks.
273+
let mut chunks = output.split(|&x| x == b'\n');
274+
275+
// Small helpers to parse chunks.
276+
let is_process_start = |chunk: &[u8]| matches!(chunk.first(), Some(b'p'));
277+
let is_file_descriptor = |chunk: &[u8]| matches!(chunk.first(), Some(b'f'));
278+
let is_file_name = |chunk: &[u8]| matches!(chunk.first(), Some(b'n'));
279+
280+
while let Some(chunk) = chunks.next() {
281+
if is_process_start(&chunk) {
282+
// Start of a process group.
283+
//
284+
// Parse the PID, check if it matches our expected PID.
285+
let pid: u32 = match String::from_utf8(chunk[1..].to_vec())
286+
.context("Could not determine ClickHouse port number: Non-UTF8 output")?
287+
.parse() {
288+
Ok(pid) => pid,
289+
_ => continue,
290+
};
291+
if pid == expected_pid {
292+
// PID matches
293+
//
294+
// The first chunk should be the numeric file descriptor
295+
if let Some(should_be_fd) = chunks.next() {
296+
ensure!(
297+
is_file_descriptor(should_be_fd),
298+
"Could not determine ClickHouse port number: Expected numeric file descriptor in output"
299+
);
300+
} else {
301+
bail!("Could not determine ClickHouse port number: Expected numeric file descriptor in output");
302+
}
303+
304+
// Process chunks until we find one that has a valid port, or we get one that's
305+
// _not_ a filename.
306+
while let Some(chunk) = chunks.next() {
307+
ensure!(
308+
is_file_name(chunk),
309+
"Could not determine ClickHouse port number: Expected file name in output"
310+
);
311+
312+
// Ignore leading `n`, which is part of the formatting from lsof
313+
let chunk = &chunk[1..];
314+
315+
// Check if this looks like `localhost:<PORT>`
316+
const LOCALHOST: &[u8] = b"localhost:";
317+
if chunk.starts_with(LOCALHOST) {
318+
let port: u16 = std::str::from_utf8(&chunk[LOCALHOST.len()..])
319+
.context("Could not determine ClickHouse port number: Invalid PID in output")?
320+
.parse()
321+
.context("Could not determine ClickHouse port number: Invalid PID in output")?;
322+
323+
// Check that it's not one of the default other TCP ports ClickHouse opens
324+
// by default
325+
if !IGNORED_CLICKHOUSE_PORTS.contains(&port) {
326+
return Ok(port);
327+
}
328+
}
329+
}
330+
331+
// Early exit, the PID matched, but we couldn't find a valid port
332+
break;
333+
}
334+
}
335+
}
336+
bail!("Could not determine ClickHouse port number: No valid ports found in output");
337+
}
338+
339+
#[cfg(test)]
340+
mod pfiles_tests {
341+
use super::extract_port_from_pfiles_output;
342+
343+
// A known-good test output.
344+
const GOOD_INPUT: &[u8] = br#"
345+
25: S_IFSOCK mode:0666 dev:547,0 ino:24056 uid:0 gid:0 rdev:0,0
346+
O_RDWR FD_CLOEXEC
347+
SOCK_STREAM
348+
SO_REUSEADDR,SO_SNDBUF(49152),SO_RCVBUF(128000)
349+
sockname: AF_INET6 ::1 port: 9004
350+
26: S_IFSOCK mode:0666 dev:547,0 ino:24056 uid:0 gid:0 rdev:0,0
351+
O_RDWR FD_CLOEXEC
352+
SOCK_STREAM
353+
SO_REUSEADDR,SO_SNDBUF(49152),SO_RCVBUF(128000)
354+
sockname: AF_INET 127.0.0.1 port: 8123
355+
27: S_IFSOCK mode:0666 dev:547,0 ino:42019 uid:0 gid:0 rdev:0,0
356+
O_RDWR FD_CLOEXEC
357+
SOCK_STREAM
358+
SO_REUSEADDR,SO_SNDBUF(49152),SO_RCVBUF(128000)
359+
sockname: AF_INET 127.0.0.1 port: 9000
360+
"#;
361+
362+
// Only contains the ignored ClickHouse ports
363+
const ONLY_IGNORED_CLICKHOUSE_PORTS: &[u8] = br#"
364+
25: S_IFSOCK mode:0666 dev:547,0 ino:24056 uid:0 gid:0 rdev:0,0
365+
O_RDWR FD_CLOEXEC
366+
SOCK_STREAM
367+
SO_REUSEADDR,SO_SNDBUF(49152),SO_RCVBUF(128000)
368+
sockname: AF_INET6 ::1 port: 9004
369+
"#;
370+
371+
#[test]
372+
fn test_extract_port_from_pfiles_output() {
373+
assert_eq!(extract_port_from_pfiles_output(&GOOD_INPUT).unwrap(), 8123);
374+
}
375+
376+
#[test]
377+
fn test_extract_port_from_lsof_output_no_valid_port() {
378+
assert!(extract_port_from_pfiles_output(
379+
&ONLY_IGNORED_CLICKHOUSE_PORTS
380+
)
381+
.is_err());
382+
}
383+
}
384+
385+
#[cfg(test)]
386+
mod lsof_tests {
387+
use super::extract_port_from_lsof_output;
388+
389+
// A known-good test output. This was generated by running the actual command while a
390+
// ClickHouse process is running.
391+
const GOOD_INPUT: &[u8] = b"p462\n\
392+
f4\n\
393+
nlocalhost:19536\n\
394+
p7741\n\
395+
f8\n\
396+
nlocalhost:53091\n\
397+
f9\n\
398+
nlocalhost:cslistener\n\
399+
f12\n\
400+
nlocalhost:9004\n";
401+
402+
// This command has some valid `localhost:PORT` lines, but those ports are known to be other
403+
// ports that ClickHouse opens that aren't HTTP. These are the native client and the mySQL
404+
// client ports.
405+
const ONLY_IGNORED_CLICKHOUSE_PORTS: &[u8] = b"p462\n\
406+
f4\n\
407+
nlocalhost:19536\n\
408+
p7741\n\
409+
f8\n\
410+
nlocalhost:9000\n\
411+
f9\n\
412+
nlocalhost:cslistener\n\
413+
f12\n\
414+
nlocalhost:9004\n";
415+
416+
// A bad output that has no lines like `flocalhost:<PORT>\n` at all
417+
const NO_FILE_NAMES: &[u8] = b"p462\n\
418+
f4\n\
419+
nlocalhost:19536\n\
420+
p7741\n\
421+
f8\n\
422+
f9\n\
423+
f12\n";
424+
425+
#[test]
426+
fn test_extract_port_from_lsof_output() {
427+
assert_eq!(
428+
extract_port_from_lsof_output(7741, &GOOD_INPUT).unwrap(),
429+
53091
430+
);
431+
}
432+
433+
#[test]
434+
fn test_extract_port_from_lsof_output_no_valid_port() {
435+
assert!(extract_port_from_lsof_output(
436+
7741,
437+
&ONLY_IGNORED_CLICKHOUSE_PORTS
438+
)
439+
.is_err());
440+
}
441+
442+
// A test that uses the good input, but assumes we're looking for another PID.
443+
#[test]
444+
fn test_extract_port_from_lsof_output_incorrect_pid() {
445+
assert!(extract_port_from_lsof_output(0, &GOOD_INPUT).is_err());
446+
}
447+
448+
#[test]
449+
fn test_extract_port_from_lsof_output_no_file_names() {
450+
assert!(extract_port_from_lsof_output(0, &NO_FILE_NAMES).is_err());
451+
}
452+
}

omicron-common/src/model.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -1417,14 +1417,21 @@ impl ProducerEndpoint {
14171417

14181418
/// Message used to notify Nexus that this oximeter instance is up and running.
14191419
#[derive(Debug, Clone, Copy, JsonSchema, Serialize, Deserialize)]
1420-
pub struct OximeterStartupInfo {
1420+
pub struct OximeterInfo {
14211421
/// The ID for this oximeter instance.
14221422
pub collector_id: Uuid,
14231423

14241424
/// The address on which this oximeter instance listens for requests
14251425
pub address: SocketAddr,
14261426
}
14271427

1428+
/// An assignment of an Oximeter instance to a metric producer for collection.
1429+
#[derive(Debug, Clone, Copy, JsonSchema, Serialize, Deserialize)]
1430+
pub struct OximeterAssignment {
1431+
pub oximeter_id: Uuid,
1432+
pub producer_id: Uuid,
1433+
}
1434+
14281435
#[cfg(test)]
14291436
mod test {
14301437
use super::ApiByteCount;

0 commit comments

Comments
 (0)