diff --git a/src/borg/archive.py b/src/borg/archive.py index 01d1617d17..7461e8ada1 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -719,6 +719,63 @@ def extract_helper(self, item, path, hlm, *, dry_run=False): # In this case, we *want* to extract twice, because there is no other way. pass + def compare_and_extract_chunks(self, item, fs_path, *, st, pi=None): + """Compare file chunks and patch if needed. Returns True if patching succeeded.""" + if st is None: + return False + try: + # First pass: Build fs chunks list + fs_chunks = [] + with backup_io("open"): + fs_file = open(fs_path, "rb") + with fs_file: + for chunk in item.chunks: + with backup_io("read"): + data = fs_file.read(chunk.size) + + fs_chunks.append(ChunkListEntry(id=self.key.id_hash(data), size=len(data))) + + # Compare chunks and collect needed chunk IDs + needed_chunks = [] + for fs_chunk, item_chunk in zip(fs_chunks, item.chunks): + if fs_chunk != item_chunk: + needed_chunks.append(item_chunk) + + # Fetch all needed chunks and iterate through ALL of them + chunk_data_iter = self.pipeline.fetch_many([chunk.id for chunk in needed_chunks], ro_type=ROBJ_FILE_STREAM) + + # Second pass: Update file + with backup_io("open"): + fs_file = open(fs_path, "rb+") + with fs_file: + for fs_chunk, item_chunk in zip(fs_chunks, item.chunks): + if fs_chunk == item_chunk: + with backup_io("seek"): + fs_file.seek(item_chunk.size, 1) + else: + chunk_data = next(chunk_data_iter) + + with backup_io("write"): + fs_file.write(chunk_data) + if pi: + pi.show(increase=len(chunk_data), info=[remove_surrogates(item.path)]) + + final_size = fs_file.tell() + with backup_io("truncate_and_attrs"): + fs_file.truncate(item.size) + fs_file.flush() + self.restore_attrs(fs_path, item, fd=fs_file.fileno()) + + if "size" in item and item.size != final_size: + raise BackupError(f"Size inconsistency detected: size {item.size}, chunks size {final_size}") + + if "chunks_healthy" in item and not item.chunks_healthy: + raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.") + + return True + except OSError: + return False + def extract_item( self, item, @@ -802,12 +859,14 @@ def same_item(item, st): return # done! we already have fully extracted this file in a previous run. elif stat.S_ISDIR(st.st_mode): os.rmdir(path) + st = None else: os.unlink(path) + st = None except UnicodeEncodeError: raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None except OSError: - pass + st = None def make_parent(path): parent_dir = os.path.dirname(path) @@ -821,6 +880,9 @@ def make_parent(path): with self.extract_helper(item, path, hlm) as hardlink_set: if hardlink_set: return + if self.compare_and_extract_chunks(item, path, st=st, pi=pi): + return + with backup_io("open"): fd = open(path, "wb") with fd: diff --git a/src/borg/testsuite/archive_test.py b/src/borg/testsuite/archive_test.py index 1157994d7d..3aee648ab4 100644 --- a/src/borg/testsuite/archive_test.py +++ b/src/borg/testsuite/archive_test.py @@ -12,7 +12,7 @@ from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics from ..archive import BackupOSError, backup_io, backup_io_iter, get_item_uid_gid from ..helpers import msgpack -from ..item import Item, ArchiveItem +from ..item import Item, ArchiveItem, ChunkListEntry from ..manifest import Manifest from ..platform import uid2user, gid2group, is_win32 @@ -132,6 +132,11 @@ def add_chunk(self, id, meta, data, stats=None, wait=True, ro_type=None): self.objects[id] = data return id, len(data) + def fetch_many(self, ids, ro_type=None): + """Mock implementation of fetch_many""" + for id in ids: + yield self.objects[id] + def test_cache_chunk_buffer(): data = [Item(path="p1"), Item(path="p2")] @@ -402,3 +407,161 @@ def test_reject_non_sanitized_item(): for path in rejected_dotdot_paths: with pytest.raises(ValueError, match="unexpected '..' element in path"): Item(path=path, user="root", group="root") + + +@pytest.fixture +def setup_extractor(tmpdir): + """Setup common test infrastructure""" + + class MockCache: + def __init__(self): + self.objects = {} + + repository = Mock() + key = PlaintextKey(repository) + manifest = Manifest(key, repository) + cache = MockCache() + + extractor = Archive(manifest=manifest, name="test", create=True) + extractor.pipeline = cache + extractor.key = key + extractor.cwd = str(tmpdir) + extractor.restore_attrs = Mock() + + # Track fetched chunks across tests + fetched_chunks = [] + + def create_mock_chunks(item_data, chunk_size=4): + chunks = [] + for i in range(0, len(item_data), chunk_size): + chunk_data = item_data[i : i + chunk_size] + chunk_id = key.id_hash(chunk_data) + chunks.append(ChunkListEntry(id=chunk_id, size=len(chunk_data))) + cache.objects[chunk_id] = chunk_data + + item = Mock(spec=["chunks", "size", "__contains__", "get"]) + item.chunks = chunks + item.size = len(item_data) + item.__contains__ = lambda self, item: item == "size" + + return item, str(tmpdir.join("test.txt")) + + def mock_fetch_many(chunk_ids, ro_type=None): + fetched_chunks.extend(chunk_ids) + return iter([cache.objects[chunk_id] for chunk_id in chunk_ids]) + + def clear_fetched_chunks(): + fetched_chunks.clear() + + def get_fetched_chunks(): + return fetched_chunks + + cache.fetch_many = mock_fetch_many + + return extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks + + +@pytest.mark.parametrize( + "name, item_data, fs_data, expected_fetched_chunks", + [ + ( + "no_changes", + b"1111", # One complete chunk, no changes needed + b"1111", # Identical content + 0, # No chunks should be fetched + ), + ( + "single_chunk_change", + b"11112222", # Two chunks + b"1111XXXX", # Second chunk different + 1, # Only second chunk should be fetched + ), + ( + "cross_boundary_change", + b"11112222", # Two chunks + b"111XX22", # Change crosses chunk boundary + 2, # Both chunks need update + ), + ( + "exact_multiple_chunks", + b"11112222333", # Three chunks (last one partial) + b"1111XXXX333", # Middle chunk different + 1, # Only middle chunk fetched + ), + ( + "first_chunk_change", + b"11112222", # Two chunks + b"XXXX2222", # First chunk different + 1, # Only first chunk should be fetched + ), + ( + "all_chunks_different", + b"11112222", # Two chunks + b"XXXXYYYY", # Both chunks different + 2, # Both chunks should be fetched + ), + ( + "partial_last_chunk", + b"111122", # One full chunk + partial + b"1111XX", # Partial chunk different + 1, # Only second chunk should be fetched + ), + ( + "fs_file_shorter", + b"11112222", # Two chunks in archive + b"111122", # Shorter on disk - missing part of second chunk + 1, # Should fetch second chunk + ), + ( + "fs_file_longer", + b"11112222", # Two chunks in archive + b"1111222233", # Longer on disk + 0, # Should fetch no chunks since content matches up to archive length + ), + ( + "empty_archive_file", + b"", # Empty in archive + b"11112222", # Content on disk + 0, # No chunks to compare = no chunks to fetch + ), + ( + "empty_fs_file", + b"11112222", # Two chunks in archive + b"", # Empty on disk + 2, # Should fetch all chunks since file is empty + ), + ], +) +def test_compare_and_extract_chunks(setup_extractor, name, item_data, fs_data, expected_fetched_chunks): + """Test chunk comparison and extraction""" + extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks = setup_extractor + clear_fetched_chunks() + + chunk_size = 4 + item, target_path = create_mock_chunks(item_data, chunk_size=chunk_size) + + original_chunk_ids = [chunk.id for chunk in item.chunks] + + with open(target_path, "wb") as f: + f.write(fs_data) + + st = os.stat(target_path) + result = extractor.compare_and_extract_chunks(item, target_path, st=st) + assert result + + fetched_chunks = get_fetched_chunks() + assert len(fetched_chunks) == expected_fetched_chunks + + # For single chunk changes, verify it's the correct chunk + if expected_fetched_chunks == 1: + item_chunks = [item_data[i : i + chunk_size] for i in range(0, len(item_data), chunk_size)] + fs_chunks = [fs_data[i : i + chunk_size] for i in range(0, len(fs_data), chunk_size)] + + # Find which chunk should have changed by comparing item_data with fs_data + for i, (item_chunk, fs_chunk) in enumerate(zip(item_chunks, fs_chunks)): + if item_chunk != fs_chunk: + assert fetched_chunks[0] == original_chunk_ids[i] + break + + with open(target_path, "rb") as f: + assert f.read() == item_data