From fa6c42c57e1754a5b36d3e45573520130d380f84 Mon Sep 17 00:00:00 2001 From: Kathia Barahona Date: Tue, 19 Nov 2024 13:30:24 +0100 Subject: [PATCH] monitor pg connection when uploading chunks --- pghoard/basebackup/base.py | 12 ++++++---- pghoard/basebackup/chunks.py | 27 +++++++++++++++++---- pghoard/basebackup/delta.py | 38 ++++++++++++++++++++++++------ pghoard/pgutil.py | 14 ++++++++++- test/basebackup/test_basebackup.py | 30 +++++++++++++++++++++-- 5 files changed, 102 insertions(+), 19 deletions(-) diff --git a/pghoard/basebackup/base.py b/pghoard/basebackup/base.py index 260bd850..20089618 100644 --- a/pghoard/basebackup/base.py +++ b/pghoard/basebackup/base.py @@ -35,6 +35,7 @@ set_subprocess_stdout_and_stderr_nonblocking, terminate_subprocess ) from pghoard.compressor import CompressionEvent +from pghoard.pgutil import check_if_pg_connection_is_alive from pghoard.transfer import UploadEvent BASEBACKUP_NAME = "pghoard_base_backup" @@ -543,6 +544,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = self.log.debug("Connecting to database to start backup process") connection_string = connection_string_using_pgpass(self.connection_info) with psycopg2.connect(connection_string) as db_conn: + conn_polling = lambda: check_if_pg_connection_is_alive(db_conn) cursor = db_conn.cursor() if self.pg_version_server < 90600: @@ -589,6 +591,7 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = for item in self.find_files_to_backup(pgdata=pgdata, tablespaces=tablespaces) if not item[1].endswith(".pem") # Exclude such files like "dh1024.pem" ), + conn_polling=conn_polling, ) chunks_count = len(chunk_files) control_files_metadata_extra["chunks"] = chunk_files @@ -607,11 +610,12 @@ def run_local_tar_basebackup(self, delta: bool = False, with_delta_stats: bool = # Tar up the chunks and submit them for upload; note that we start from chunk 1 here; chunk 0 # is reserved for special files and metadata and will be generated last. chunk_files = self.chunk_uploader.create_and_upload_chunks( - chunks, - data_file_format, - compressed_base, + chunks=chunks, + data_file_format=data_file_format, + temp_base_dir=compressed_base, delta_stats=delta_stats, - file_type=FileType.Basebackup_chunk + file_type=FileType.Basebackup_chunk, + conn_polling=conn_polling, ) total_size_plain = sum(item["input_size"] for item in chunk_files) diff --git a/pghoard/basebackup/chunks.py b/pghoard/basebackup/chunks.py index caa10d64..68213bfd 100644 --- a/pghoard/basebackup/chunks.py +++ b/pghoard/basebackup/chunks.py @@ -62,8 +62,16 @@ class DeltaStats: class ChunkUploader: def __init__( - self, metrics: Metrics, chunks_on_disk: int, encryption_data: EncryptionData, compression_data: CompressionData, - site_config: Dict[str, Any], site: str, is_running: Callable[[], bool], transfer_queue: TransferQueue + self, + *, + metrics: Metrics, + chunks_on_disk: int, + encryption_data: EncryptionData, + compression_data: CompressionData, + site_config: Dict[str, Any], + site: str, + is_running: Callable[[], bool], + transfer_queue: TransferQueue, ): self.log = logging.getLogger("ChunkUploader") self.metrics = metrics @@ -216,9 +224,15 @@ def handle_single_chunk( chunks, index: int, temp_dir: Path, + conn_polling: Callable[[], bool], delta_stats: Optional[DeltaStats] = None, - file_type: FileType = FileType.Basebackup_chunk + file_type: FileType = FileType.Basebackup_chunk, ) -> Dict[str, Any]: + # if the chunk is dependent on a PG connection and connection + # is not alive then abort the task + if not conn_polling(): + raise RuntimeError("ERROR: PostgreSQL connection was lost during backup process.") + one_chunk_files = chunks[index] chunk_name, input_size, result_size = self.tar_one_file( callback_queue=chunk_callback_queue, @@ -260,12 +274,14 @@ def wait_for_chunk_transfer_to_complete( def create_and_upload_chunks( self, + *, chunks, data_file_format: Callable[[int], str], + conn_polling: Callable[[], bool], temp_base_dir: Path, delta_stats: Optional[DeltaStats] = None, file_type: FileType = FileType.Basebackup_chunk, - chunks_max_progress: float = 100.0 + chunks_max_progress: float = 100.0, ) -> List[Dict[str, Any]]: start_time = time.monotonic() chunk_files = [] @@ -299,8 +315,9 @@ def create_and_upload_chunks( chunks=chunks, index=i, temp_dir=temp_base_dir, + conn_polling=conn_polling, delta_stats=delta_stats, - file_type=file_type + file_type=file_type, ) pending_compress_and_encrypt_tasks.append(task) self.chunks_on_disk += 1 diff --git a/pghoard/basebackup/delta.py b/pghoard/basebackup/delta.py index 58e2b274..bf64762e 100644 --- a/pghoard/basebackup/delta.py +++ b/pghoard/basebackup/delta.py @@ -55,10 +55,21 @@ class HasReadAndSeek(HasRead, HasSeek, Protocol): class DeltaBaseBackup: def __init__( - self, *, storage: BaseTransfer, site: str, site_config: Dict[str, Any], transfer_queue: TransferQueue, - metrics: Metrics, encryption_data: EncryptionData, compression_data: CompressionData, - get_remote_basebackups_info: Callable[[str], List[Dict[str, Any]]], parallel: int, temp_base_dir: Path, - compressed_base: Path, chunk_uploader: ChunkUploader, data_file_format: Callable[[int], str] + self, + *, + storage: BaseTransfer, + site: str, + site_config: Dict[str, Any], + transfer_queue: TransferQueue, + metrics: Metrics, + encryption_data: EncryptionData, + compression_data: CompressionData, + get_remote_basebackups_info: Callable[[str], List[Dict[str, Any]]], + parallel: int, + temp_base_dir: Path, + compressed_base: Path, + chunk_uploader: ChunkUploader, + data_file_format: Callable[[int], str], ): self.log = logging.getLogger("DeltaBaseBackup") self.storage = storage @@ -384,11 +395,17 @@ def _split_files_for_upload( return delta_chunks, todo_hexdigests - def _upload_chunks(self, delta_chunks, chunks_max_progress: float) -> Tuple[UploadedFilesMetric, List[Dict[str, Any]]]: + def _upload_chunks( + self, + delta_chunks, + chunks_max_progress: float, + conn_polling: Callable[[], bool], + ) -> Tuple[UploadedFilesMetric, List[Dict[str, Any]]]: """Upload small files grouped into chunks to save on latency and requests costs""" chunk_files = self.chunk_uploader.create_and_upload_chunks( chunks=delta_chunks, data_file_format=self.data_file_format, + conn_polling=conn_polling, temp_base_dir=self.compressed_base, file_type=FileType.Basebackup_delta_chunk, chunks_max_progress=chunks_max_progress, @@ -426,7 +443,10 @@ def _read_delta_sizes(self, snapshot_result: SnapshotResult) -> Tuple[UploadedFi return digests_metric, embed_metric def run( - self, pgdata: str, src_iterate_func: Callable[[], Iterable[BackupPath]] + self, + pgdata: str, + src_iterate_func: Callable[[], Iterable[BackupPath]], + conn_polling: Callable[[], bool], ) -> Tuple[int, int, BackupManifest, int, List[Dict[str, Any]]]: # NOTE: Hard links work only in the same FS, therefore using hopefully the same FS in PG home folder delta_dir = os.path.join(os.path.dirname(pgdata), "basebackup_delta") @@ -459,7 +479,11 @@ def run( sum(len(chunk) for chunk in delta_chunks) ) chunks_max_progress = delta_chunks_count * 100.0 / (delta_chunks_count + len(todo_hexdigests)) - chunks_metric, chunk_files = self._upload_chunks(delta_chunks, chunks_max_progress=chunks_max_progress) + chunks_metric, chunk_files = self._upload_chunks( + delta_chunks, + chunks_max_progress=chunks_max_progress, + conn_polling=conn_polling, + ) self.log.info( "Submitting hashes for upload: %r, total hashes in the snapshot: %r", len(todo_hexdigests), diff --git a/pghoard/pgutil.py b/pghoard/pgutil.py index f379f58b..471c234b 100644 --- a/pghoard/pgutil.py +++ b/pghoard/pgutil.py @@ -5,9 +5,10 @@ Copyright (c) 2015 Ohmu Ltd See LICENSE for details """ - from urllib.parse import parse_qs, urlparse +from psycopg2.extensions import TRANSACTION_STATUS_ACTIVE, TRANSACTION_STATUS_IDLE + def create_connection_string(connection_info): return " ".join("{}='{}'".format(k, str(v).replace("'", "\\'")) for k, v in sorted(connection_info.items())) @@ -92,3 +93,14 @@ def parse_connection_string_libpq(connection_string): value, connection_string = rem, "" fields[key] = value return fields + + +def check_if_pg_connection_is_alive(db_conn) -> bool: + if db_conn.closed: + return False + + status = db_conn.get_transaction_status() + if status not in [TRANSACTION_STATUS_ACTIVE, TRANSACTION_STATUS_IDLE]: + return False + + return True diff --git a/test/basebackup/test_basebackup.py b/test/basebackup/test_basebackup.py index 961e524d..7f5f7278 100644 --- a/test/basebackup/test_basebackup.py +++ b/test/basebackup/test_basebackup.py @@ -88,7 +88,7 @@ def test_find_files(self, db): def create_test_files(): # Create two temporary files on top level and one in global/ that we'll unlink while iterating with open(top1, "w") as t1, open(top2, "w") as t2, \ - open(sub1, "w") as s1, open(sub2, "w") as s2, open(sub3, "w") as s3: + open(sub1, "w") as s1, open(sub2, "w") as s2, open(sub3, "w") as s3: t1.write("t1\n") t2.write("t2\n") s1.write("s1\n") @@ -932,7 +932,7 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d return meta, b"some content" with patch.object(pgb, "get_remote_basebackups_info") as mock_get_remote_basebackups_info, \ - patch("pghoard.basebackup.base.download_backup_meta_file", new=fake_download_backup_meta_file): + patch("pghoard.basebackup.base.download_backup_meta_file", new=fake_download_backup_meta_file): mock_get_remote_basebackups_info.return_value = [{ "name": f"backup{idx}", "metadata": { @@ -946,3 +946,29 @@ def fake_download_backup_meta_file(basebackup_path: str, **kwargs): # pylint: d "7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e27": 8192, "7e0c70d50c0ccd9ca4cb8c6837fbfffb4ef7e885aa1c6370fcfc307541a03e28": 800 } + + @pytest.mark.parametrize( + "backup_mode", + [BaseBackupMode.local_tar, BaseBackupMode.delta, BaseBackupMode.local_tar_delta_stats], + ) + def test_create_basebackup_lost_pg_connection(self, db, pghoard, backup_mode: BaseBackupMode): + with patch("pghoard.basebackup.base.check_if_pg_connection_is_alive", return_value=False): + pghoard.create_backup_site_paths(pghoard.test_site) + basebackup_path = os.path.join(pghoard.config["backup_location"], pghoard.test_site, "basebackup") + q: Queue[CallbackEvent] = Queue() + + pghoard.config["backup_sites"][pghoard.test_site]["basebackup_mode"] = backup_mode + pghoard.config["backup_sites"][pghoard.test_site]["active_backup_mode"] = "archive_command" + + now = datetime.datetime.now(datetime.timezone.utc) + metadata = { + "backup-reason": BackupReason.scheduled, + "backup-decision-time": now.isoformat(), + "normalized-backup-time": now.isoformat(), + } + pghoard.create_basebackup(pghoard.test_site, db.user, basebackup_path, q, metadata) + result = q.get(timeout=60) + + assert result.success is False + assert result.exception and isinstance(result.exception, RuntimeError) + assert result.exception.args[0] == "ERROR: PostgreSQL connection was lost during backup process."