Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bind to correct port for e2e tests #25758

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ jobs:
- rust_components
- run:
name: cargo nextest
command: TEST_LOG=1 RUST_LOG=info RUST_LOG_SPAN_EVENTS=full RUST_BACKTRACE=1 cargo nextest run --workspace --failure-output immediate-final --no-fail-fast
command: TEST_LOG= RUST_LOG=info RUST_LOG_SPAN_EVENTS=full RUST_BACKTRACE=1 cargo nextest run --workspace --failure-output immediate-final --no-fail-fast

# Build a dev binary.
#
Expand Down
7 changes: 4 additions & 3 deletions influxdb3/src/commands/serve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,14 @@ pub async fn command(config: Config) -> Result<()> {
let num_cpus = num_cpus::get();
let build_malloc_conf = build_malloc_conf();
info!(
host_id = %config.host_identifier_prefix,
git_hash = %INFLUXDB3_GIT_HASH as &str,
version = %INFLUXDB3_VERSION.as_ref() as &str,
uuid = %PROCESS_UUID.as_ref() as &str,
num_cpus,
%build_malloc_conf,
"InfluxDB3 OSS server starting",
"InfluxDB 3 Core server starting",
);
debug!(%build_malloc_conf, "build configuration");

let metrics = setup_metric_registry();

Expand Down Expand Up @@ -464,7 +465,7 @@ pub async fn command(config: Config) -> Result<()> {
.await
.map_err(Error::InitializePersistedCatalog)?,
);
info!(instance_id = ?catalog.instance_id(), "Catalog initialized with");
info!(instance_id = ?catalog.instance_id(), "catalog initialized");

let last_cache = LastCacheProvider::new_from_catalog_with_background_eviction(
Arc::clone(&catalog) as _,
Expand Down
69 changes: 43 additions & 26 deletions influxdb3/tests/server/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
net::{SocketAddr, SocketAddrV4, TcpListener},
io::{BufRead, BufReader},
process::{Child, Command, Stdio},
time::Duration,
};
Expand Down Expand Up @@ -105,7 +105,7 @@ impl ConfigProvider for TestConfig {
/// except for the `influxdb3_write` crate, which will emit logs at the `DEBUG` level.
pub struct TestServer {
auth_token: Option<String>,
bind_addr: SocketAddr,
bind_addr: String,
server_process: Child,
http_client: reqwest::Client,
}
Expand All @@ -125,22 +125,55 @@ impl TestServer {
}

async fn spawn_inner(config: &impl ConfigProvider) -> Self {
let bind_addr = get_local_bind_addr();
let mut command = Command::cargo_bin("influxdb3").expect("create the influxdb3 command");
let mut command = command
let command = command
.arg("serve")
.args(["--http-bind", &bind_addr.to_string()])
// bind to port 0 to get a random port assigned:
.args(["--http-bind", "0.0.0.0:0"])
.args(["--wal-flush-interval", "10ms"])
.args(config.as_args());
.args(config.as_args())
.stdout(Stdio::piped());

// If TEST_LOG env var is not defined, discard stdout/stderr, otherwise, pass it to the
// inner binary in the "LOG_FILTER" env var:
command = match std::env::var("TEST_LOG") {
Ok(val) => command.env("LOG_FILTER", if val.is_empty() { "info" } else { &val }),
Err(_) => command.stdout(Stdio::null()).stderr(Stdio::null()),
let emit_logs = if let Ok(val) = std::env::var("TEST_LOG") {
command.env("LOG_FILTER", if val.is_empty() { "info" } else { &val });
true
} else {
false
};

let server_process = command.spawn().expect("spawn the influxdb3 server process");
let mut server_process = command.spawn().expect("spawn the influxdb3 server process");

// pipe stdout so we can get the randomly assigned port from the log output:
let process_stdout = server_process
.stdout
.take()
.expect("should acquire stdout from process");

let mut lines = BufReader::new(process_stdout).lines();
let bind_addr = loop {
let Some(Ok(line)) = lines.next() else {
panic!("stdout closed unexpectedly");
};
if emit_logs {
println!("{line}");
}
if line.contains("startup time") {
if let Some(address) = line.split("address=").last() {
break address.to_string();
}
}
};

tokio::task::spawn_blocking(move || {
for line in lines {
let line = line.expect("io error while getting line from stdout");
if emit_logs {
println!("{line}");
}
}
});

let server = Self {
auth_token: config.auth_token().map(|s| s.to_owned()),
Expand Down Expand Up @@ -339,22 +372,6 @@ impl TestServer {
}
}

/// Get an available bind address on localhost
///
/// This binds a [`TcpListener`] to 127.0.0.1:0, which will randomly
/// select an available port, and produces the resulting local address.
/// The [`TcpListener`] is dropped at the end of the function, thus
/// freeing the port for use by the caller.
fn get_local_bind_addr() -> SocketAddr {
let ip = std::net::Ipv4Addr::new(127, 0, 0, 1);
let port = 0;
let addr = SocketAddrV4::new(ip, port);
TcpListener::bind(addr)
.expect("bind to a socket address")
.local_addr()
.expect("get local address")
}

/// Write to the server with the line protocol
pub async fn write_lp_to_db(
server: &TestServer,
Expand Down
6 changes: 5 additions & 1 deletion influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,11 @@ where
let addr = AddrIncoming::from_listener(server.listener)?;
let timer_end = Instant::now();
let startup_time = timer_end.duration_since(startup_timer);
info!("Server Startup Time: {}ms", startup_time.as_millis());
info!(
address = %addr.local_addr(),
"startup time: {}ms",
startup_time.as_millis()
);
hyper::server::Builder::new(addr, Http::new())
.tcp_nodelay(true)
.serve(hybrid_make_service)
Expand Down
Loading