-
Notifications
You must be signed in to change notification settings - Fork 466
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fast import: basic python test #10271
Open
NanoBjorn
wants to merge
8
commits into
22100-change-fastimport-db-name
Choose a base branch
from
22037-basic-fast-import-e2e
base: 22100-change-fastimport-db-name
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+166
−3
Open
Changes from 2 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
7b998e8
Fixture for fast_import binary is working
NanoBjorn 1423d29
Implemented basic test of fast import
NanoBjorn 550504b
Moved test_fast_import to test_import_pgdata
NanoBjorn 8a18c63
Added todo on full import test with pageserver
NanoBjorn fcc92f9
poetry run ruff format .
NanoBjorn 8e521f2
poetry run ruff check --fix .
NanoBjorn 9298fcd
Fixed initdb locale
NanoBjorn e856fae
Capture LD_LIBRARY_PATH from pytest env
NanoBjorn File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,4 +14,5 @@ | |
"fixtures.compare_fixtures", | ||
"fixtures.slow", | ||
"fixtures.reruns", | ||
"fixtures.fast_import", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
import shutil | ||
import subprocess | ||
import tempfile | ||
from collections.abc import Iterator | ||
from pathlib import Path | ||
|
||
import pytest | ||
|
||
from fixtures.log_helper import log | ||
from fixtures.neon_cli import AbstractNeonCli | ||
from fixtures.pg_version import PgVersion | ||
|
||
|
||
class FastImport(AbstractNeonCli): | ||
COMMAND = "fast_import" | ||
cmd: subprocess.CompletedProcess[str] | None = None | ||
|
||
def __init__( | ||
self, | ||
extra_env: dict[str, str] | None, | ||
binpath: Path, | ||
pg_distrib_dir: Path, | ||
pg_version: PgVersion, | ||
workdir: Path | ||
): | ||
if extra_env is None: | ||
env_vars = {} | ||
else: | ||
env_vars = extra_env.copy() | ||
|
||
if not (binpath / self.COMMAND).exists(): | ||
raise Exception(f"{self.COMMAND} binary not found at '{binpath}'") | ||
super().__init__(env_vars, binpath) | ||
|
||
pg_dir = pg_distrib_dir / pg_version.v_prefixed | ||
self.pg_distrib_dir = pg_distrib_dir | ||
self.pg_version = pg_version | ||
self.pg_bin = pg_dir / "bin" | ||
if not (self.pg_bin / "postgres").exists(): | ||
raise Exception(f"postgres binary was not found at '{self.pg_bin}'") | ||
self.pg_lib = pg_dir / "lib" | ||
|
||
if not workdir.exists(): | ||
raise Exception(f"Working directory '{workdir}' does not exist") | ||
self.workdir = workdir | ||
|
||
def run(self, | ||
pg_port: int, | ||
source_connection_string: str | None = None, | ||
s3prefix: str | None = None, | ||
interactive: bool = False, | ||
) -> subprocess.CompletedProcess[str]: | ||
if self.cmd is not None: | ||
raise Exception("Command already executed") | ||
args = [ | ||
f"--pg-bin-dir={self.pg_bin}", | ||
f"--pg-lib-dir={self.pg_lib}", | ||
f"--pg-port={pg_port}", | ||
f"--working-directory={self.workdir}", | ||
] | ||
if source_connection_string is not None: | ||
args.append(f"--source-connection-string={source_connection_string}") | ||
if s3prefix is not None: | ||
args.append(f"--s3-prefix={s3prefix}") | ||
if interactive: | ||
args.append("--interactive") | ||
|
||
self.cmd = self.raw_cli(args) | ||
return self.cmd | ||
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, *args): | ||
if self.workdir.exists(): | ||
shutil.rmtree(self.workdir) | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def fast_import( | ||
pg_version: PgVersion, | ||
test_output_dir: Path, | ||
neon_binpath: Path, | ||
pg_distrib_dir: Path, | ||
) -> Iterator[FastImport]: | ||
workdir = Path(tempfile.mkdtemp()) | ||
with FastImport(None, neon_binpath, pg_distrib_dir, pg_version, workdir) as fi: | ||
yield fi | ||
|
||
if fi.cmd is None: | ||
return | ||
|
||
# dump stdout & stderr into test log dir | ||
with open(test_output_dir / "fast_import.stdout", "w") as f: | ||
f.write(fi.cmd.stdout) | ||
with open(test_output_dir / "fast_import.stderr", "w") as f: | ||
f.write(fi.cmd.stderr) | ||
|
||
log.info('Written logs to %s', test_output_dir) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from __future__ import annotations | ||
|
||
from fixtures.fast_import import FastImport | ||
from fixtures.log_helper import log | ||
from fixtures.neon_fixtures import VanillaPostgres, PgProtocol, PgBin | ||
from fixtures.port_distributor import PortDistributor | ||
|
||
|
||
def test_fast_import( | ||
test_output_dir, | ||
vanilla_pg: VanillaPostgres, | ||
port_distributor: PortDistributor, | ||
fast_import: FastImport, | ||
): | ||
vanilla_pg.start() | ||
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);") | ||
|
||
pg_port = port_distributor.get_port() | ||
fast_import.run(pg_port, vanilla_pg.connstr()) | ||
vanilla_pg.stop() | ||
|
||
pgbin = PgBin(test_output_dir, fast_import.pg_distrib_dir, fast_import.pg_version) | ||
new_pgdata_vanilla_pg = VanillaPostgres(fast_import.workdir / "pgdata", pgbin, pg_port, False) | ||
new_pgdata_vanilla_pg.start() | ||
|
||
# database name and user are hardcoded in fast_import binary, and they are different from normal vanilla postgres | ||
conn = PgProtocol(dsn=f"postgresql://cloud_admin@localhost:{pg_port}/neondb") | ||
res = conn.safe_psql("SELECT count(*) FROM foo;") | ||
log.info(f"Result: {res}") | ||
assert res[0][0] == 10 | ||
new_pgdata_vanilla_pg.stop() | ||
|
||
|
||
# TODO: Maybe test with pageserver? | ||
# 1. run whole neon env | ||
# 2. create timeline with some s3 path??? | ||
# 3. run fast_import with s3 prefix | ||
# 4. ??? mock http where pageserver will report progress | ||
# 5. run compute on this timeline and check if data is there |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps makes sense to make this an extension to
test_pgdata_import_smoke
, which has this comment currently:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just thought that it might not worth doing it all in the same test due to increased complexity -> more complicated debugging if something is failing. Right now it is using vanilla postgres pgdata as we do in fast import, so it should be enough for testing pageserver part.
Moved mine simple test to the same file, but will keep
test_pgdata_import_smoke
the same. Also added a todo to test full import flow separately, which will complementtest_pgdata_import_smoke
andtest_fast_import_binary
, but not complicate any of those.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT?