Skip to content

Commit

Permalink
fix: bind to correct port for e2e tests (#25758)
Browse files Browse the repository at this point in the history
* fix: bind to correct port for e2e tests

This also fixes up some log messages on server start for naming

* chore: do notpass value in TEST_LOG env var to CI tests
  • Loading branch information
hiltontj authored Jan 7, 2025
1 parent 6524f38 commit 3efd490
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ jobs:
- rust_components
- run:
name: cargo nextest
command: TEST_LOG=1 RUST_LOG=info RUST_LOG_SPAN_EVENTS=full RUST_BACKTRACE=1 cargo nextest run --workspace --failure-output immediate-final --no-fail-fast
command: TEST_LOG= RUST_LOG=info RUST_LOG_SPAN_EVENTS=full RUST_BACKTRACE=1 cargo nextest run --workspace --failure-output immediate-final --no-fail-fast

# Build a dev binary.
#
Expand Down
7 changes: 4 additions & 3 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,14 @@ pub async fn command(config: Config) -> Result<()> {
let num_cpus = num_cpus::get();
let build_malloc_conf = build_malloc_conf();
info!(
host_id = %config.host_identifier_prefix,
git_hash = %INFLUXDB3_GIT_HASH as &str,
version = %INFLUXDB3_VERSION.as_ref() as &str,
uuid = %PROCESS_UUID.as_ref() as &str,
num_cpus,
%build_malloc_conf,
"InfluxDB3 OSS server starting",
"InfluxDB 3 Core server starting",
);
debug!(%build_malloc_conf, "build configuration");

let metrics = setup_metric_registry();

Expand Down Expand Up @@ -464,7 +465,7 @@ pub async fn command(config: Config) -> Result<()> {
.await
.map_err(Error::InitializePersistedCatalog)?,
);
info!(instance_id = ?catalog.instance_id(), "Catalog initialized with");
info!(instance_id = ?catalog.instance_id(), "catalog initialized");

let last_cache = LastCacheProvider::new_from_catalog_with_background_eviction(
Arc::clone(&catalog) as _,
Expand Down
69 changes: 43 additions & 26 deletions influxdb3/tests/server/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
net::{SocketAddr, SocketAddrV4, TcpListener},
io::{BufRead, BufReader},
process::{Child, Command, Stdio},
time::Duration,
};
Expand Down Expand Up @@ -105,7 +105,7 @@ impl ConfigProvider for TestConfig {
/// except for the `influxdb3_write` crate, which will emit logs at the `DEBUG` level.
pub struct TestServer {
auth_token: Option<String>,
bind_addr: SocketAddr,
bind_addr: String,
server_process: Child,
http_client: reqwest::Client,
}
Expand All @@ -125,22 +125,55 @@ impl TestServer {
}

async fn spawn_inner(config: &impl ConfigProvider) -> Self {
let bind_addr = get_local_bind_addr();
let mut command = Command::cargo_bin("influxdb3").expect("create the influxdb3 command");
let mut command = command
let command = command
.arg("serve")
.args(["--http-bind", &bind_addr.to_string()])
// bind to port 0 to get a random port assigned:
.args(["--http-bind", "0.0.0.0:0"])
.args(["--wal-flush-interval", "10ms"])
.args(config.as_args());
.args(config.as_args())
.stdout(Stdio::piped());

// If TEST_LOG env var is not defined, discard stdout/stderr, otherwise, pass it to the
// inner binary in the "LOG_FILTER" env var:
command = match std::env::var("TEST_LOG") {
Ok(val) => command.env("LOG_FILTER", if val.is_empty() { "info" } else { &val }),
Err(_) => command.stdout(Stdio::null()).stderr(Stdio::null()),
let emit_logs = if let Ok(val) = std::env::var("TEST_LOG") {
command.env("LOG_FILTER", if val.is_empty() { "info" } else { &val });
true
} else {
false
};

let server_process = command.spawn().expect("spawn the influxdb3 server process");
let mut server_process = command.spawn().expect("spawn the influxdb3 server process");

// pipe stdout so we can get the randomly assigned port from the log output:
let process_stdout = server_process
.stdout
.take()
.expect("should acquire stdout from process");

let mut lines = BufReader::new(process_stdout).lines();
let bind_addr = loop {
let Some(Ok(line)) = lines.next() else {
panic!("stdout closed unexpectedly");
};
if emit_logs {
println!("{line}");
}
if line.contains("startup time") {
if let Some(address) = line.split("address=").last() {
break address.to_string();
}
}
};

tokio::task::spawn_blocking(move || {
for line in lines {
let line = line.expect("io error while getting line from stdout");
if emit_logs {
println!("{line}");
}
}
});

let server = Self {
auth_token: config.auth_token().map(|s| s.to_owned()),
Expand Down Expand Up @@ -339,22 +372,6 @@ impl TestServer {
}
}

/// Get an available bind address on localhost
///
/// This binds a [`TcpListener`] to 127.0.0.1:0, which will randomly
/// select an available port, and produces the resulting local address.
/// The [`TcpListener`] is dropped at the end of the function, thus
/// freeing the port for use by the caller.
fn get_local_bind_addr() -> SocketAddr {
let ip = std::net::Ipv4Addr::new(127, 0, 0, 1);
let port = 0;
let addr = SocketAddrV4::new(ip, port);
TcpListener::bind(addr)
.expect("bind to a socket address")
.local_addr()
.expect("get local address")
}

/// Write to the server with the line protocol
pub async fn write_lp_to_db(
server: &TestServer,
Expand Down
6 changes: 5 additions & 1 deletion influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,11 @@ where
let addr = AddrIncoming::from_listener(server.listener)?;
let timer_end = Instant::now();
let startup_time = timer_end.duration_since(startup_timer);
info!("Server Startup Time: {}ms", startup_time.as_millis());
info!(
address = %addr.local_addr(),
"startup time: {}ms",
startup_time.as_millis()
);
hyper::server::Builder::new(addr, Http::new())
.tcp_nodelay(true)
.serve(hybrid_make_service)
Expand Down

0 comments on commit 3efd490

Please sign in to comment.