Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to pathlib and improve type checking #65

Merged
merged 6 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 20 additions & 19 deletions src/backy/backends/chunked/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(
self.data = None

def _read_existing(self) -> None:
if self.data is not None:
if self.data:
return
# Prepare working with the chunk. We keep the data in RAM for
# easier random access combined with transparent compression.
Expand Down Expand Up @@ -82,7 +82,7 @@ def read(self, offset: int, size: int = -1) -> Tuple[bytes, int]:
Return the data and the remaining size that should be read.
"""
self._read_existing()
assert self.data is not None
assert self.data

self.data.seek(offset)
data = self.data.read(size)
Expand All @@ -107,7 +107,7 @@ def write(self, offset: int, data: bytes) -> Tuple[int, bytes]:
chunk_stats["write_full"] += 1
else:
self._read_existing()
assert self.data is not None
assert self.data
self.data.seek(offset)
self.data.write(data)
chunk_stats["write_partial"] += 1
Expand All @@ -121,29 +121,30 @@ def flush(self) -> Optional[Hash]:
"""
if self.clean:
return None
assert self.data is not None
assert self.data
# I'm not using read() here to a) avoid cache accounting and b)
# use a faster path to get the data.
self.hash = hash(self.data.getvalue())
target = self.store.chunk_path(self.hash)
needs_forced_write = (
self.store.force_writes and self.hash not in self.store.seen_forced
)
if self.hash not in self.store.known or needs_forced_write:
# Create the tempfile in the right directory to increase locality
# of our change - avoid renaming between multiple directories to
# reduce traffic on the directory nodes.
fd, tmpfile_name = tempfile.mkstemp(dir=target.parent)
posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED) # type: ignore
with os.fdopen(fd, mode="wb") as f:
data = lzo.compress(self.data.getvalue())
f.write(data)
# Micro-optimization: chmod before rename to help against
# metadata flushes and then changing metadata again.
os.chmod(tmpfile_name, 0o440)
os.rename(tmpfile_name, target)
self.store.seen_forced.add(self.hash)
self.store.known.add(self.hash)
if self.hash not in self.store.seen:
if needs_forced_write or not target.exists():
# Create the tempfile in the right directory to increase locality
# of our change - avoid renaming between multiple directories to
# reduce traffic on the directory nodes.
fd, tmpfile_name = tempfile.mkstemp(dir=target.parent)
posix_fadvise(fd, 0, 0, os.POSIX_FADV_DONTNEED) # type: ignore
with os.fdopen(fd, mode="wb") as f:
data = lzo.compress(self.data.getvalue())
f.write(data)
# Micro-optimization: chmod before rename to help against
# metadata flushes and then changing metadata again.
os.chmod(tmpfile_name, 0o440)
os.rename(tmpfile_name, target)
self.store.seen_forced.add(self.hash)
self.store.seen.add(self.hash)
self.clean = True
return self.hash

Expand Down
14 changes: 7 additions & 7 deletions src/backy/backends/chunked/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class Store(object):

path: Path
seen_forced: set[Hash]
known: set[Hash]
seen: set[Hash]
log: BoundLogger

def __init__(self, path: Path, log: BoundLogger):
Expand All @@ -36,8 +36,7 @@ def __init__(self, path: Path, log: BoundLogger):
if not self.path.joinpath("store").exists():
self.convert_to_v2()

self.known = set(self.ls())
self.log.debug("init", known_chunks=len(self.known))
self.seen = set()

def convert_to_v2(self) -> None:
self.log.info("to-v2")
Expand All @@ -61,11 +60,12 @@ def ls(self) -> Iterable[Hash]:
def purge(self, used_chunks: Set[Hash]) -> None:
# This assumes exclusive lock on the store. This is guaranteed by
# backy's main locking.
to_delete = self.known - used_chunks
self.log.info("purge", purging=len(to_delete))
for file_hash in sorted(to_delete):
self.log.info("purge")
for file_hash in self.ls():
if file_hash in used_chunks:
continue
self.chunk_path(file_hash).unlink(missing_ok=True)
self.known -= to_delete
self.seen.discard(file_hash)

def chunk_path(self, hash: Hash) -> Path:
dir1 = hash[:2]
Expand Down
8 changes: 4 additions & 4 deletions src/backy/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,13 @@ async def purge_old_files(self):
# thread.
while True:
self.log.info("purge-scanning")
for candidate in self.base_dir.iterdir():
if not candidate.is_dir() or candidate.is_symlink():
for candidate in os.scandir(self.base_dir):
if not candidate.is_dir(follow_symlinks=False):
continue
self.log.debug("purge-candidate", candidate=candidate)
self.log.debug("purge-candidate", candidate=candidate.path)
reference_time = time.time() - 3 * 31 * 24 * 60 * 60
if not has_recent_changes(candidate, reference_time):
self.log.info("purging", candidate=candidate)
self.log.info("purging", candidate=candidate.path)
shutil.rmtree(candidate)
self.log.info("purge-finished")
await asyncio.sleep(24 * 60 * 60)
Expand Down
22 changes: 9 additions & 13 deletions src/backy/sources/ceph/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,13 @@ def backup(self, target: BackyBackend) -> None:
break
self.diff(target, parent)

def diff(self, target_: BackyBackend, parent: Revision) -> None:
def diff(self, target: BackyBackend, parent: Revision) -> None:
self.log.info("diff")
snap_from = "backy-" + parent.uuid
snap_to = "backy-" + self.revision.uuid
s = self.rbd.export_diff(self._image_name + "@" + snap_to, snap_from)
t = target_.open("r+b", parent)
with s as source, t as target:
bytes = source.integrate(target, snap_from, snap_to)
with s as source, target.open("r+b", parent) as target_:
bytes = source.integrate(target_, snap_from, snap_to)
self.log.info("diff-integration-finished")

self.revision.stats["bytes_written"] = bytes
Expand All @@ -107,19 +106,18 @@ def diff(self, target_: BackyBackend, parent: Revision) -> None:

self.revision.stats["chunk_stats"] = chunk_stats

def full(self, target_: BackyBackend) -> None:
def full(self, target: BackyBackend) -> None:
self.log.info("full")
s = self.rbd.export(
"{}/{}@backy-{}".format(self.pool, self.image, self.revision.uuid)
)
t = target_.open("r+b")
copied = 0
with s as source, t as target:
with s as source, target.open("r+b") as target_:
while True:
buf = source.read(4 * backy.utils.MiB)
if not buf:
break
target.write(buf)
target_.write(buf)
copied += len(buf)
self.revision.stats["bytes_written"] = copied

Expand All @@ -128,19 +126,17 @@ def full(self, target_: BackyBackend) -> None:

self.revision.stats["chunk_stats"] = chunk_stats

def verify(self, target_: BackyBackend) -> bool:
def verify(self, target: BackyBackend) -> bool:
s = self.rbd.image_reader(
"{}/{}@backy-{}".format(self.pool, self.image, self.revision.uuid)
)
t = target_.open("rb")

self.revision.stats["ceph-verification"] = "partial"

with s as source, t as target:
with s as source, target.open("rb") as target_:
self.log.info("verify")
return backy.utils.files_are_roughly_equal(
source,
target,
target_,
report=lambda s, t, o: self.revision.backup.quarantine.add_report(
QuarantineReport(s, t, o)
),
Expand Down
4 changes: 2 additions & 2 deletions src/backy/sources/ceph/tests/test_ceph_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ def test_choose_full_without_parent(ceph_rbd_imagesource, backup, log):

revision = Revision.create(backup, set(), log)

with source(revision):
source.backup(revision.backend)
with source(revision) as s:
s.backup(revision.backend)

assert not source.diff.called
assert source.full.called
Expand Down
16 changes: 7 additions & 9 deletions src/backy/sources/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,27 @@ def ready(self) -> bool:
return False
return True

def backup(self, target_: "backy.backends.BackyBackend") -> None:
def backup(self, target: "backy.backends.BackyBackend") -> None:
self.log.debug("backup")
s = open(self.filename, "rb")
parent = self.revision.get_parent()
t = target_.open("r+b", parent)
with s as source, t as target:
with s as source, target.open("r+b", parent) as target_:
if self.cow and parent:
self.log.info("backup-sparse")
bytes = copy_overwrite(source, target)
bytes = copy_overwrite(source, target_)
else:
self.log.info("backup-full")
bytes = copy(source, target)
bytes = copy(source, target_)

self.revision.stats["bytes_written"] = bytes

def verify(self, target_: "backy.backends.BackyBackend") -> bool:
def verify(self, target: "backy.backends.BackyBackend") -> bool:
self.log.info("verify")
s = open(self.filename, "rb")
t = target_.open("rb")
with s as source, t as target:
with s as source, target.open("rb") as target_:
return files_are_equal(
source,
target,
target_,
report=lambda s, t, o: self.revision.backup.quarantine.add_report(
QuarantineReport(s, t, o)
),
Expand Down
2 changes: 1 addition & 1 deletion src/backy/tests/test_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_backup_corrupted(simple_file_config):
backup.backup({"daily"})

store = backup.history[0].backend.store
chunk_path = store.chunk_path(next(iter(store.known)))
chunk_path = store.chunk_path(next(iter(store.seen)))
os.chmod(chunk_path, 0o664)
with open(chunk_path, "wb") as f:
f.write(b"invalid")
Expand Down
32 changes: 16 additions & 16 deletions src/backy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import tempfile
import time
import typing
from pathlib import Path
from os import DirEntry
from typing import IO, Callable, Iterable, List, Optional, TypeVar
from zoneinfo import ZoneInfo

Expand Down Expand Up @@ -146,7 +146,7 @@ def open_new(self, mode):
def open_copy(self, mode):
"""Open an existing file, make a copy first, rename on close."""
self.open_new("wb")
ctheune marked this conversation as resolved.
Show resolved Hide resolved
assert self.f is not None
assert self.f
if os.path.exists(self.filename):
cp_reflink(self.filename, self.f.name)
self.f.close()
Expand Down Expand Up @@ -176,36 +176,36 @@ def use_write_protection(self):

@property
def name(self):
assert self.f is not None
assert self.f
return self.f.name

def read(self, *args, **kw):
assert self.f is not None
assert self.f
data = self.f.read(*args, **kw)
if self.encoding:
data = data.decode(self.encoding)
return data

def write(self, data):
assert self.f is not None
assert self.f
if self.encoding:
data = data.encode(self.encoding)
self.f.write(data)

def seek(self, offset, whence=0):
assert self.f is not None
assert self.f
return self.f.seek(offset, whence)

def tell(self):
assert self.f is not None
assert self.f
return self.f.tell()

def truncate(self, size=None):
assert self.f is not None
assert self.f
return self.f.truncate(size)

def fileno(self):
assert self.f is not None
assert self.f
return self.f.fileno()


Expand Down Expand Up @@ -440,25 +440,25 @@ def min_date():
return datetime.datetime.min.replace(tzinfo=ZoneInfo("UTC"))


def has_recent_changes(entry: Path, reference_time: float):
def has_recent_changes(entry: DirEntry, reference_time: float):
# This is not efficient on a first look as we may stat things twice, but it
# makes the recursion easier to read and the VFS will be caching this
# anyway.
# However, I want to perform a breadth-first analysis as the theory is that
# higher levels will propagate changed mtimes do to new/deleted files
# instead of just modified files in our case and looking at stats when
# traversing a directory level is faster than going depth first.
st = entry.stat(follow_symlinks=False)
if st.st_mtime >= reference_time:
return True
if not entry.is_dir() or entry.is_symlink():
if not entry.is_dir(follow_symlinks=False):
return False
if entry.stat(follow_symlinks=False).st_mtime >= reference_time:
return True
candidates = list(os.scandir(entry.path))
# First pass: stat all direct entries
for candidate in entry.iterdir():
for candidate in candidates:
if candidate.stat(follow_symlinks=False).st_mtime >= reference_time:
return True
# Second pass: start traversing
for candidate in entry.iterdir():
for candidate in candidates:
ctheune marked this conversation as resolved.
Show resolved Hide resolved
if has_recent_changes(candidate, reference_time):
return True
return False
Expand Down
Loading