Skip to content

Commit

Permalink
fast import: added a test that restores into a running postgres by co…
Browse files Browse the repository at this point in the history
…nnstring
  • Loading branch information
NanoBjorn committed Jan 15, 2025
1 parent 29c566a commit 70d43af
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 20 deletions.
30 changes: 17 additions & 13 deletions compute_tools/src/bin/fast_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ struct Args {
#[clap(long)]
source_connection_string: Option<String>,
#[clap(long)]
restore_connection_string: Option<String>,
restore_connection_string: Option<String>, // will not run postgres if specified, will do pg_restore to this connection string
#[clap(short, long)]
interactive: bool,
#[clap(long)]
Expand Down Expand Up @@ -104,12 +104,6 @@ pub(crate) async fn main() -> anyhow::Result<()> {
let working_directory = args.working_directory;
let pg_bin_dir = args.pg_bin_dir;
let pg_lib_dir = args.pg_lib_dir;
let pg_port = if args.pg_port.is_some() {
args.pg_port.unwrap()
} else {
info!("pg_port not specified, using default 5432");
5432
};

// Initialize AWS clients only if s3_prefix is specified
let (aws_config, kms_client) = if args.s3_prefix.is_some() {
Expand All @@ -121,6 +115,14 @@ pub(crate) async fn main() -> anyhow::Result<()> {
};

let superuser = "cloud_admin";
let pg_port = || if args.pg_port.is_some() {
args.pg_port.unwrap()
} else {
info!("pg_port not specified, using default 5432");
5432
};

let mut run_postgres = true;

// Get connection strings either from S3 spec or direct arguments
let (source_connstring, restore_connstring) = if let Some(s3_prefix) = &args.s3_prefix {
Expand Down Expand Up @@ -163,6 +165,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {

let restore =
if let Some(restore_ciphertext) = spec.restore_connstring_ciphertext_base64 {
run_postgres = false;
let mut restore_output = kms_client
.unwrap()
.decrypt()
Expand All @@ -179,7 +182,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
.context("parse restore connection string as utf8")?
} else {
// restoring to local postgres otherwise
format!("host=localhost port={pg_port} user={superuser} dbname=neondb")
format!("host=localhost port={} user={} dbname=neondb", pg_port(), superuser)
};

(source, restore)
Expand All @@ -189,8 +192,9 @@ pub(crate) async fn main() -> anyhow::Result<()> {
(
args.source_connection_string.unwrap(),
if args.restore_connection_string.is_none() {
format!("host=localhost port={pg_port} user={superuser} dbname=neondb")
format!("host=localhost port={} user={} dbname=neondb", pg_port(), superuser)
} else {
run_postgres = false;
args.restore_connection_string.unwrap()
},
)
Expand All @@ -215,7 +219,8 @@ pub(crate) async fn main() -> anyhow::Result<()> {
}
let pgdata_dir = working_directory.join("pgdata");

let postgres_proc = if restore_connstring.contains("host=localhost") {
let postgres_proc = if run_postgres {
assert!(restore_connstring.contains("host=localhost"));
tokio::fs::create_dir(&pgdata_dir)
.await
.context("create pgdata directory")?;
Expand Down Expand Up @@ -244,7 +249,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
let mut proc = tokio::process::Command::new(pgbin)
.arg("-D")
.arg(&pgdata_dir)
.args(["-p", &format!("{pg_port}")])
.args(["-p", &format!("{}", pg_port())])
.args(["-c", "wal_level=minimal"])
.args(["-c", "shared_buffers=10GB"])
.args(["-c", "max_wal_senders=0"])
Expand Down Expand Up @@ -308,6 +313,7 @@ pub(crate) async fn main() -> anyhow::Result<()> {
}
Some(proc)
} else {
info!("restore_connection_string specified, not running postgres process");
None
};

Expand Down Expand Up @@ -366,8 +372,6 @@ pub(crate) async fn main() -> anyhow::Result<()> {

// TODO: do it in a streaming way, plenty of internal research done on this already
// TODO: do the unlogged table trick

info!("restore from working directory into vanilla postgres");
{
let mut pg_restore = tokio::process::Command::new(pg_bin_dir.join("pg_restore"))
.args(&common_args)
Expand Down
8 changes: 6 additions & 2 deletions test_runner/fixtures/fast_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ def __init__(

def run(
self,
pg_port: int,
pg_port: int | None = None,
source_connection_string: str | None = None,
restore_connection_string: str | None = None,
s3prefix: str | None = None,
interactive: bool = False,
) -> subprocess.CompletedProcess[str]:
Expand All @@ -60,11 +61,14 @@ def run(
args = [
f"--pg-bin-dir={self.pg_bin}",
f"--pg-lib-dir={self.pg_lib}",
f"--pg-port={pg_port}",
f"--working-directory={self.workdir}",
]
if pg_port is not None:
args.append(f"--pg-port={pg_port}")
if source_connection_string is not None:
args.append(f"--source-connection-string={source_connection_string}")
if restore_connection_string is not None:
args.append(f"--restore-connection-string={restore_connection_string}")
if s3prefix is not None:
args.append(f"--s3-prefix={s3prefix}")
if interactive:
Expand Down
49 changes: 44 additions & 5 deletions test_runner/regress/test_import_pgdata.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import json
import re
import tempfile
import time
from enum import Enum
from pathlib import Path

import psycopg2
import psycopg2.errors
import pytest
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response

from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.fast_import import FastImport
from fixtures.log_helper import log
Expand All @@ -18,9 +24,6 @@
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import run_only_on_postgres
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
from werkzeug.wrappers.response import Response

num_rows = 1000

Expand Down Expand Up @@ -100,13 +103,15 @@ def handler(request: Request) -> Response:
while True:
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
log.info(
f"relblock size: {relblock_size/8192} pages (target: {target_relblock_size//8192}) pages"
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
)
if relblock_size >= target_relblock_size:
break
addrows = int((target_relblock_size - relblock_size) // 8192)
assert addrows >= 1, "forward progress"
vanilla_pg.safe_psql(f"insert into t select generate_series({nrows+1}, {nrows + addrows})")
vanilla_pg.safe_psql(
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
)
nrows += addrows
expect_nrows = nrows
expect_sum = (
Expand Down Expand Up @@ -346,6 +351,40 @@ def test_fast_import_binary(
new_pgdata_vanilla_pg.stop()


@run_only_on_postgres(
[PgVersion.V14, PgVersion.V15, PgVersion.V16],
"newer control file catalog version and struct format isn't supported",
)
def test_fast_import_restore_to_connstring(
test_output_dir,
vanilla_pg: VanillaPostgres,
port_distributor: PortDistributor,
fast_import: FastImport,
pg_distrib_dir: Path,
pg_version: PgVersion,
):
vanilla_pg.start()
vanilla_pg.safe_psql("CREATE TABLE foo (a int); INSERT INTO foo SELECT generate_series(1, 10);")

pgdatadir = test_output_dir / "restore-pgdata"
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as restore_vanilla_pg:
restore_vanilla_pg.configure(["shared_preload_libraries='neon_rmgr'"])
restore_vanilla_pg.start()

fast_import.run(
source_connection_string=vanilla_pg.connstr(),
restore_connection_string=restore_vanilla_pg.connstr(),
)
vanilla_pg.stop()

# database name and user are hardcoded in fast_import binary, and they are different from normal vanilla postgres
res = restore_vanilla_pg.safe_psql("SELECT count(*) FROM foo;")
log.info(f"Result: {res}")
assert res[0][0] == 10


# TODO: Maybe test with pageserver?
# 1. run whole neon env
# 2. create timeline with some s3 path???
Expand Down

0 comments on commit 70d43af

Please sign in to comment.