Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fast import: basic python test #10271

Open
wants to merge 8 commits into
base: 22100-change-fastimport-db-name
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion compute_tools/src/bin/fast_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ struct Args {
pg_bin_dir: Utf8PathBuf,
#[clap(long)]
pg_lib_dir: Utf8PathBuf,
#[clap(long)]
pg_port: Option<u16>, // port to run postgres on, 5432 is default
}

#[serde_with::serde_as]
Expand Down Expand Up @@ -91,6 +93,12 @@ pub(crate) async fn main() -> anyhow::Result<()> {
let working_directory = args.working_directory;
let pg_bin_dir = args.pg_bin_dir;
let pg_lib_dir = args.pg_lib_dir;
let pg_port = if args.pg_port.is_some() {
args.pg_port.unwrap()
} else {
info!("pg_port not specified, using default 5432");
5432
};

// Initialize AWS clients only if s3_prefix is specified
let (aws_config, kms_client) = if args.s3_prefix.is_some() {
Expand Down Expand Up @@ -191,6 +199,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
let mut postgres_proc = tokio::process::Command::new(pgbin)
.arg("-D")
.arg(&pgdata_dir)
.args(["-p", &format!("{pg_port}")])
.args(["-c", "wal_level=minimal"])
.args(["-c", "shared_buffers=10GB"])
.args(["-c", "max_wal_senders=0"])
Expand Down Expand Up @@ -226,7 +235,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {

// Create neondb database in the running postgres
let restore_pg_connstring =
format!("host=localhost port=5432 user={superuser} dbname=postgres");
format!("host=localhost port={pg_port} user={superuser} dbname=postgres");
loop {
match tokio_postgres::connect(&restore_pg_connstring, tokio_postgres::NoTls).await {
Ok((client, connection)) => {
Expand Down
1 change: 1 addition & 0 deletions test_runner/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@
"fixtures.compare_fixtures",
"fixtures.slow",
"fixtures.reruns",
"fixtures.fast_import",
)
99 changes: 99 additions & 0 deletions test_runner/fixtures/fast_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import shutil
import subprocess
import tempfile
from collections.abc import Iterator
from pathlib import Path

import pytest

from fixtures.log_helper import log
from fixtures.neon_cli import AbstractNeonCli
from fixtures.pg_version import PgVersion


class FastImport(AbstractNeonCli):
COMMAND = "fast_import"
cmd: subprocess.CompletedProcess[str] | None = None

def __init__(
self,
extra_env: dict[str, str] | None,
binpath: Path,
pg_distrib_dir: Path,
pg_version: PgVersion,
workdir: Path
):
if extra_env is None:
env_vars = {}
else:
env_vars = extra_env.copy()

if not (binpath / self.COMMAND).exists():
raise Exception(f"{self.COMMAND} binary not found at '{binpath}'")
super().__init__(env_vars, binpath)

pg_dir = pg_distrib_dir / pg_version.v_prefixed
self.pg_distrib_dir = pg_distrib_dir
self.pg_version = pg_version
self.pg_bin = pg_dir / "bin"
if not (self.pg_bin / "postgres").exists():
raise Exception(f"postgres binary was not found at '{self.pg_bin}'")
self.pg_lib = pg_dir / "lib"

if not workdir.exists():
raise Exception(f"Working directory '{workdir}' does not exist")
self.workdir = workdir

def run(self,
pg_port: int,
source_connection_string: str | None = None,
s3prefix: str | None = None,
interactive: bool = False,
) -> subprocess.CompletedProcess[str]:
if self.cmd is not None:
raise Exception("Command already executed")
args = [
f"--pg-bin-dir={self.pg_bin}",
f"--pg-lib-dir={self.pg_lib}",
f"--pg-port={pg_port}",
f"--working-directory={self.workdir}",
]
if source_connection_string is not None:
args.append(f"--source-connection-string={source_connection_string}")
if s3prefix is not None:
args.append(f"--s3-prefix={s3prefix}")
if interactive:
args.append("--interactive")

self.cmd = self.raw_cli(args)
return self.cmd

def __enter__(self):
return self

def __exit__(self, *args):
if self.workdir.exists():
shutil.rmtree(self.workdir)


@pytest.fixture(scope="function")
def fast_import(
pg_version: PgVersion,
test_output_dir: Path,
neon_binpath: Path,
pg_distrib_dir: Path,
) -> Iterator[FastImport]:
workdir = Path(tempfile.mkdtemp())
with FastImport(None, neon_binpath, pg_distrib_dir, pg_version, workdir) as fi:
yield fi

if fi.cmd is None:
return

# dump stdout & stderr into test log dir
with open(test_output_dir / "fast_import.stdout", "w") as f:
f.write(fi.cmd.stdout)
with open(test_output_dir / "fast_import.stderr", "w") as f:
f.write(fi.cmd.stderr)

log.info('Written logs to %s', test_output_dir)
39 changes: 39 additions & 0 deletions test_runner/regress/test_fast_import.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from __future__ import annotations

from fixtures.fast_import import FastImport
from fixtures.log_helper import log
from fixtures.neon_fixtures import VanillaPostgres, PgProtocol, PgBin
from fixtures.port_distributor import PortDistributor


Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps makes sense to make this an extension to test_pgdata_import_smoke, which has this comment currently:

    # We have a Postgres data directory now.
    # Make a localfs remote storage that looks like how after `fast_import` ran.
    # TODO: actually exercise fast_import here

Copy link
Contributor Author

@NanoBjorn NanoBjorn Jan 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just thought that it might not worth doing it all in the same test due to increased complexity -> more complicated debugging if something is failing. Right now it is using vanilla postgres pgdata as we do in fast import, so it should be enough for testing pageserver part.

Moved mine simple test to the same file, but will keep test_pgdata_import_smoke the same. Also added a todo to test full import flow separately, which will complement test_pgdata_import_smoke and test_fast_import_binary, but not complicate any of those.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT?

def test_fast_import(
test_output_dir,
vanilla_pg: VanillaPostgres,
port_distributor: PortDistributor,
fast_import: FastImport,
):
vanilla_pg.start()
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")

pg_port = port_distributor.get_port()
fast_import.run(pg_port, vanilla_pg.connstr())
vanilla_pg.stop()

pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version)
new_pgdata_vanilla_pg = VanillaPostgres(fast_import.workdir / "pgdata", pgbin, pg_port, False)
new_pgdata_vanilla_pg.start()

# database name and user are hardcoded in fast_import binary, and they are different from normal vanilla postgres
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb")
res = conn.safe_psql("SELECT count(*) FROM foo;")
log.info(f"Result: {res}")
assert res[0][0] == 10
new_pgdata_vanilla_pg.stop()


# TODO: Maybe test with pageserver?
# 1. run whole neon env
# 2. create timeline with some s3 path???
# 3. run fast_import with s3 prefix
# 4. ??? mock http where pageserver will report progress
# 5. run compute on this timeline and check if data is there
Loading