Skip to content

Commit

Permalink
Implement chunk comparison and selective extraction
Browse files Browse the repository at this point in the history
Add tests

Change logic for chunk comaprison and extraction

Refactor `compare_and_extract_chunks` and improve test coverage

To do:
- Remove additional comments after approval.

Thank you for helping me with my first file system operations contribution!

Refactor `compare_and_extract_chunks`

remove unnecessary `st` parameter

Fix file state tracking with st = None after unlink/rmdir
  • Loading branch information
alighazi288 committed Jan 20, 2025
1 parent b9498ca commit 564b040
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 2 deletions.
64 changes: 63 additions & 1 deletion src/borg/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,63 @@ def extract_helper(self, item, path, hlm, *, dry_run=False):
# In this case, we *want* to extract twice, because there is no other way.
pass

def compare_and_extract_chunks(self, item, fs_path, *, st, pi=None):
"""Compare file chunks and patch if needed. Returns True if patching succeeded."""
if st is None:
return False
try:
# First pass: Build fs chunks list
fs_chunks = []
with backup_io("open"):
fs_file = open(fs_path, "rb")
with fs_file:
for chunk in item.chunks:
with backup_io("read"):
data = fs_file.read(chunk.size)

fs_chunks.append(ChunkListEntry(id=self.key.id_hash(data), size=len(data)))

# Compare chunks and collect needed chunk IDs
needed_chunks = []
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
if fs_chunk != item_chunk:
needed_chunks.append(item_chunk)

# Fetch all needed chunks and iterate through ALL of them
chunk_data_iter = self.pipeline.fetch_many([chunk.id for chunk in needed_chunks], ro_type=ROBJ_FILE_STREAM)

# Second pass: Update file
with backup_io("open"):
fs_file = open(fs_path, "rb+")
with fs_file:
for fs_chunk, item_chunk in zip(fs_chunks, item.chunks):
if fs_chunk == item_chunk:
with backup_io("seek"):
fs_file.seek(item_chunk.size, 1)
else:
chunk_data = next(chunk_data_iter)

with backup_io("write"):
fs_file.write(chunk_data)
if pi:
pi.show(increase=len(chunk_data), info=[remove_surrogates(item.path)])

final_size = fs_file.tell()
with backup_io("truncate_and_attrs"):
fs_file.truncate(item.size)
fs_file.flush()
self.restore_attrs(fs_path, item, fd=fs_file.fileno())

if "size" in item and item.size != final_size:
raise BackupError(f"Size inconsistency detected: size {item.size}, chunks size {final_size}")

if "chunks_healthy" in item and not item.chunks_healthy:
raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.")

return True
except OSError:
return False

def extract_item(
self,
item,
Expand Down Expand Up @@ -802,12 +859,14 @@ def same_item(item, st):
return # done! we already have fully extracted this file in a previous run.
elif stat.S_ISDIR(st.st_mode):
os.rmdir(path)
st = None
else:
os.unlink(path)
st = None
except UnicodeEncodeError:
raise self.IncompatibleFilesystemEncodingError(path, sys.getfilesystemencoding()) from None
except OSError:
pass
st = None

def make_parent(path):
parent_dir = os.path.dirname(path)
Expand All @@ -821,6 +880,9 @@ def make_parent(path):
with self.extract_helper(item, path, hlm) as hardlink_set:
if hardlink_set:
return
if self.compare_and_extract_chunks(item, path, st=st, pi=pi):
return

with backup_io("open"):
fd = open(path, "wb")
with fd:
Expand Down
165 changes: 164 additions & 1 deletion src/borg/testsuite/archive_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ..archive import Archive, CacheChunkBuffer, RobustUnpacker, valid_msgpacked_dict, ITEM_KEYS, Statistics
from ..archive import BackupOSError, backup_io, backup_io_iter, get_item_uid_gid
from ..helpers import msgpack
from ..item import Item, ArchiveItem
from ..item import Item, ArchiveItem, ChunkListEntry
from ..manifest import Manifest
from ..platform import uid2user, gid2group, is_win32

Expand Down Expand Up @@ -132,6 +132,11 @@ def add_chunk(self, id, meta, data, stats=None, wait=True, ro_type=None):
self.objects[id] = data
return id, len(data)

def fetch_many(self, ids, ro_type=None):
"""Mock implementation of fetch_many"""
for id in ids:
yield self.objects[id]


def test_cache_chunk_buffer():
data = [Item(path="p1"), Item(path="p2")]
Expand Down Expand Up @@ -402,3 +407,161 @@ def test_reject_non_sanitized_item():
for path in rejected_dotdot_paths:
with pytest.raises(ValueError, match="unexpected '..' element in path"):
Item(path=path, user="root", group="root")


@pytest.fixture
def setup_extractor(tmpdir):
"""Setup common test infrastructure"""

class MockCache:
def __init__(self):
self.objects = {}

repository = Mock()
key = PlaintextKey(repository)
manifest = Manifest(key, repository)
cache = MockCache()

extractor = Archive(manifest=manifest, name="test", create=True)
extractor.pipeline = cache
extractor.key = key
extractor.cwd = str(tmpdir)
extractor.restore_attrs = Mock()

# Track fetched chunks across tests
fetched_chunks = []

def create_mock_chunks(item_data, chunk_size=4):
chunks = []
for i in range(0, len(item_data), chunk_size):
chunk_data = item_data[i : i + chunk_size]
chunk_id = key.id_hash(chunk_data)
chunks.append(ChunkListEntry(id=chunk_id, size=len(chunk_data)))
cache.objects[chunk_id] = chunk_data

item = Mock(spec=["chunks", "size", "__contains__", "get"])
item.chunks = chunks
item.size = len(item_data)
item.__contains__ = lambda self, item: item == "size"

return item, str(tmpdir.join("test.txt"))

def mock_fetch_many(chunk_ids, ro_type=None):
fetched_chunks.extend(chunk_ids)
return iter([cache.objects[chunk_id] for chunk_id in chunk_ids])

def clear_fetched_chunks():
fetched_chunks.clear()

def get_fetched_chunks():
return fetched_chunks

cache.fetch_many = mock_fetch_many

return extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks


@pytest.mark.parametrize(
"name, item_data, fs_data, expected_fetched_chunks",
[
(
"no_changes",
b"1111", # One complete chunk, no changes needed
b"1111", # Identical content
0, # No chunks should be fetched
),
(
"single_chunk_change",
b"11112222", # Two chunks
b"1111XXXX", # Second chunk different
1, # Only second chunk should be fetched
),
(
"cross_boundary_change",
b"11112222", # Two chunks
b"111XX22", # Change crosses chunk boundary
2, # Both chunks need update
),
(
"exact_multiple_chunks",
b"11112222333", # Three chunks (last one partial)
b"1111XXXX333", # Middle chunk different
1, # Only middle chunk fetched
),
(
"first_chunk_change",
b"11112222", # Two chunks
b"XXXX2222", # First chunk different
1, # Only first chunk should be fetched
),
(
"all_chunks_different",
b"11112222", # Two chunks
b"XXXXYYYY", # Both chunks different
2, # Both chunks should be fetched
),
(
"partial_last_chunk",
b"111122", # One full chunk + partial
b"1111XX", # Partial chunk different
1, # Only second chunk should be fetched
),
(
"fs_file_shorter",
b"11112222", # Two chunks in archive
b"111122", # Shorter on disk - missing part of second chunk
1, # Should fetch second chunk
),
(
"fs_file_longer",
b"11112222", # Two chunks in archive
b"1111222233", # Longer on disk
0, # Should fetch no chunks since content matches up to archive length
),
(
"empty_archive_file",
b"", # Empty in archive
b"11112222", # Content on disk
0, # No chunks to compare = no chunks to fetch
),
(
"empty_fs_file",
b"11112222", # Two chunks in archive
b"", # Empty on disk
2, # Should fetch all chunks since file is empty
),
],
)
def test_compare_and_extract_chunks(setup_extractor, name, item_data, fs_data, expected_fetched_chunks):
"""Test chunk comparison and extraction"""
extractor, key, cache, tmpdir, create_mock_chunks, get_fetched_chunks, clear_fetched_chunks = setup_extractor
clear_fetched_chunks()

chunk_size = 4
item, target_path = create_mock_chunks(item_data, chunk_size=chunk_size)

original_chunk_ids = [chunk.id for chunk in item.chunks]

with open(target_path, "wb") as f:
f.write(fs_data)

st = os.stat(target_path)
result = extractor.compare_and_extract_chunks(item, target_path, st=st)
assert result

fetched_chunks = get_fetched_chunks()
assert len(fetched_chunks) == expected_fetched_chunks

# For single chunk changes, verify it's the correct chunk
if expected_fetched_chunks == 1:
item_chunks = [item_data[i : i + chunk_size] for i in range(0, len(item_data), chunk_size)]
fs_chunks = [fs_data[i : i + chunk_size] for i in range(0, len(fs_data), chunk_size)]

# Find which chunk should have changed by comparing item_data with fs_data
for i, (item_chunk, fs_chunk) in enumerate(zip(item_chunks, fs_chunks)):
if item_chunk != fs_chunk:
assert fetched_chunks[0] == original_chunk_ids[i]
break

with open(target_path, "rb") as f:
assert f.read() == item_data

0 comments on commit 564b040

Please sign in to comment.