diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dce4dbe..971897c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -64,6 +64,7 @@ repos: rev: 21.12b0 hooks: - id: black + additional_dependencies: ['click<8.1'] - repo: https://github.com/PyCQA/pylint rev: v2.12.2 diff --git a/disk_objectstore/cli.py b/disk_objectstore/cli.py index 86c484f..a539e39 100644 --- a/disk_objectstore/cli.py +++ b/disk_objectstore/cli.py @@ -122,3 +122,91 @@ def optimize( container.clean_storage(vacuum=vacuum) size = sum(f.stat().st_size for f in dostore.path.glob("**/*") if f.is_file()) click.echo(f"Final container size: {round(size/1000, 2)} Mb") + + +@main.group("archive") +def archive(): + """ + Interface for managing archived packs. + + An archived pack is essentially an ZIP file and will no longer be considered for writing new + data. Once an pack is archived, it can be moved to other locations, for example, networked storages. + """ + + +@archive.command("list") +@pass_dostore +def archive_list(dosstore: ContainerContext): + """List all archives in the container""" + + with dosstore.container as container: + location = container.get_archive_locations() + click.echo(json.dumps(location, indent=2)) + + +@archive.command("update-location") +@click.argument("pack_id") +@click.argument("new_location") +@click.option("--force", help="Skip checks if passed", is_flag=True, default=False) +@pass_dostore +def archive_update_location( + dosstore: ContainerContext, pack_id: str, new_location: str, force: bool +): + """ + Update the location of archive files + + NOTE: relative path is interpreted as relative to the root folder of the container. + """ + + with dosstore.container as container: + container._update_archive_location( # pylint: disable=protected-access + pack_id, new_location, force + ) + click.echo(f"Updated location of pack {pack_id} to {new_location}") + + +@archive.command("create") +@click.argument("pack_id") +@click.option( + "--validate/--no-validate", + show_default=True, + help="Validate the created archive or not.", +) +@click.option( + "--trim-names/--no-trim-names", + show_default=True, + help="Trim the filenames in the archive, reduce storage overheads.", +) +@pass_dostore +def archive_create( + dosstore: ContainerContext, pack_id: str, validate: bool, trim_names: bool +): + """Turn the pack_id into an archive pack""" + with dosstore.container as container: + archives = container.get_archived_pack_ids(return_str=True) + if pack_id in archives: + raise click.Abort(f"Pack {pack_id} is already archived!") + container.archive_pack( + pack_id, run_read_test=validate, trim_filenames=trim_names + ) + location = container.get_archive_locations()[pack_id] + click.echo(f"Successfully archvied pack {pack_id} at {location}") + + +@archive.command("extract") +@click.argument("archive_path") +@click.argument("destination", type=click.Path(exists=False)) +@pass_dostore +def archive_extract( + dosstore: ContainerContext, + archive_path: str, + destination: str, +): + """ + Extract an existing archive and renames the files in the destination folder + the same that used for the loose objects, so that they can be imported into a another container. + """ + + with dosstore.container as container: + container.lossen_archive(archive_path, destination) + click.echo(f"Objects from {archive_path} have been extracted to {destination}") diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 63f9b8f..e8dfe8d 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -1,15 +1,17 @@ """ The main implementation of the ``Container`` class of the object store. """ -# pylint: disable=too-many-lines import io import json import os import shutil import uuid import warnings +import zipfile from collections import defaultdict, namedtuple from contextlib import contextmanager + +# pylint: disable=too-many-lines from enum import Enum from pathlib import Path from typing import ( @@ -37,7 +39,7 @@ from sqlalchemy.sql import func from sqlalchemy.sql.expression import delete, select, text, update -from .database import Obj, get_session +from .database import Obj, Pack, PackState, get_session from .exceptions import InconsistentContent, NotExistent, NotInitialised from .utils import ( CallbackStreamWrapper, @@ -57,6 +59,7 @@ get_stream_decompresser, is_known_hash, merge_sorted, + minimum_length_without_duplication, nullcontext, rename_callback, safe_flush_to_disk, @@ -249,12 +252,29 @@ def _get_pack_path_from_pack_id( assert self._is_valid_pack_id( pack_id, allow_repack_pack=allow_repack_pack ), f"Invalid pack ID {pack_id}" + + # Are we trying to read from an archived pack? + archived = self.get_archived_pack_ids(return_str=True) + if pack_id in archived: + return self._get_archive_path_from_pack_id(pack_id) + return os.path.join(self._get_pack_folder(), pack_id) def _get_pack_index_path(self) -> str: """Return the path to the SQLite file containing the index of packed objects.""" return os.path.join(self._folder, f"packs{self._PACK_INDEX_SUFFIX}") + def get_archived_pack_ids(self, return_str=False) -> Union[List[int], List[str]]: + """Return ids of the archived packs""" + session = self._get_cached_session() + archived = session.execute( + select(Pack.pack_id).filter_by(state=PackState.ARCHIVED.value) + ).all() + # Convert the string representation back to int + if return_str is False: + return [int(entry[0]) for entry in archived] + return [entry[0] for entry in archived] + def _get_pack_id_to_write_to(self) -> int: """Return the pack ID to write the next object. @@ -267,14 +287,20 @@ def _get_pack_id_to_write_to(self) -> int: """ # Default to zero if not set (e.g. if it's None) pack_id = self._current_pack_id or 0 + + # Find all archived pack_ids + # We do not expect this to change over the concurrent operation, so more efficient to do it in one go + archived_int_ids = self.get_archived_pack_ids() while True: pack_path = self._get_pack_path_from_pack_id(pack_id) - if not os.path.exists(pack_path): - # Use this ID - the pack file does not exist yet - break - if os.path.getsize(pack_path) < self.pack_size_target: - # Use this ID - the pack file is not "full" yet - break + # Check if we are trying to accessing an archive pack + if pack_id not in archived_int_ids: + if not os.path.exists(pack_path): + # Use this ID - the pack file does not exist yet + break + if os.path.getsize(pack_path) < self.pack_size_target: + # Use this ID - the pack file is not "full" yet + break # Try the next pack pack_id += 1 @@ -391,6 +417,7 @@ def init_container( for folder in [ self._get_pack_folder(), + self._get_archive_folder(), self._get_loose_folder(), self._get_duplicates_folder(), self._get_sandbox_folder(), @@ -597,6 +624,8 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too obj_reader: StreamReadBytesType + archived_pack_int_ids = self.get_archived_pack_ids() + if len(hashkeys_set) <= self._MAX_CHUNK_ITERATE_LENGTH: # Operate in chunks, due to the SQLite limits # (see comment above the definition of self._IN_SQL_MAX_LENGTH) @@ -660,6 +689,8 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too ) if metadata.compressed: obj_reader = self._get_stream_decompresser()(obj_reader) + if pack_int_id in archived_pack_int_ids: + obj_reader.set_zip_mode() yield metadata.hashkey, obj_reader, meta else: yield metadata.hashkey, meta @@ -799,7 +830,12 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too length=metadata.length, ) if metadata.compressed: + # We reading from archived pack we need to set it to ZIP mode. + # This is because the stream in the ZIP file does not contain zlib + # header/trailer (WBITS=-15) obj_reader = self._get_stream_decompresser()(obj_reader) + if pack_int_id in archived_pack_int_ids: + obj_reader.set_zip_mode() yield metadata.hashkey, obj_reader, meta else: yield metadata.hashkey, meta @@ -1147,6 +1183,24 @@ def lock_pack( """ assert self._is_valid_pack_id(pack_id, allow_repack_pack=allow_repack_pack) + # Check that this is not an archived pack + session = self._get_cached_session() + this_pack = ( + session.execute(select(Pack).filter(Pack.pack_id == pack_id)) + .scalars() + .first() + ) + if this_pack is not None and this_pack.state == PackState.ARCHIVED.value: + raise ValueError( + f"Pack {pack_id} is archived, so it cannot be locked for writing!" + ) + + # This is a new pack, so we update the Pack table in the SQLite database + if this_pack is None: + this_pack = Pack(state=PackState.ACTIVE.value, pack_id=pack_id) + session.add(this_pack) + session.commit() + # Open file in exclusive mode lock_file = os.path.join(self._get_pack_folder(), f"{pack_id}.lock") pack_file = self._get_pack_path_from_pack_id( @@ -1185,7 +1239,7 @@ def _list_loose(self) -> Iterator[str]: continue yield first_level - def _list_packs(self) -> Iterator[str]: + def _list_packs(self, include_archived=True) -> Iterator[str]: """Iterate over packs. .. note:: this returns a generator of the pack IDs. @@ -1198,6 +1252,10 @@ def _list_packs(self) -> Iterator[str]: if self._is_valid_pack_id(fname): yield fname + # Include also archived packs + if include_archived: + yield from self.get_archived_pack_ids(return_str=True) + def list_all_objects(self) -> Iterator[str]: """Iterate of all object hashkeys. @@ -2310,6 +2368,8 @@ def callback(self, action, value): session = self._get_cached_session() + is_archived = pack_id in self.get_archived_pack_ids() + if callback: # If we have a callback, compute the total count of objects in this pack total = session.scalar( @@ -2339,7 +2399,10 @@ def callback(self, action, value): fhandle=pack_handle, offset=offset, length=length ) if compressed: + obj_reader = self._get_stream_decompresser()(obj_reader) + if is_archived: + obj_reader.set_zip_mode() computed_hash, computed_size = compute_hash_and_size( obj_reader, self.hash_type @@ -2515,7 +2578,7 @@ def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None: self.repack_pack(pack_id, compress_mode=compress_mode) self._vacuum() - def repack_pack( + def repack_pack( # pylint: disable=too-many-branches, too-many-statements self, pack_id: str, compress_mode: CompressMode = CompressMode.KEEP ) -> None: """Perform a repack of a given pack object. @@ -2523,18 +2586,22 @@ def repack_pack( This is a maintenance operation. :param compress_mode: must be a valid CompressMode enum type. - Currently, the only implemented mode is KEEP, meaning that it - preserves the same compression (this means that repacking is *much* faster - as it can simply transfer the bytes without decompressing everything first, - and recompressing it back again). """ - if compress_mode != CompressMode.KEEP: - raise NotImplementedError("Only keep method currently implemented") + # if compress_mode != CompressMode.KEEP: + # raise NotImplementedError("Only keep method currently implemented") assert ( pack_id != self._REPACK_PACK_ID ), f"The specified pack_id '{pack_id}' is invalid, it is the one used for repacking" + # We cannot repack a packed file since the DEFLATE stream in the ZIP is not the same + # as that written by zlib.compress, due to the requirement of WBITS=-15 for those in the + # ZIP file, while those written in unpacked file has WBITS=15 (with zlib header/trailers) + # This limitation can be lifted if other CompressMode is supported. + assert ( + pack_id not in self.get_archived_pack_ids() + ), f"The specified pack_id '{pack_id}' is archived, but repacking archived packs is not currently implemented." + # Check that it does not exist assert not os.path.exists( self._get_pack_path_from_pack_id( @@ -2578,26 +2645,53 @@ def repack_pack( # Since I am assuming above that the method is `KEEP`, I will just transfer # the bytes. Otherwise I have to properly take into account compression in the # source and in the destination. + read_handle: Union[PackedObjectReader, ZlibStreamDecompresser] read_handle = PackedObjectReader(read_pack, offset, length) obj_dict = {} obj_dict["id"] = rowid obj_dict["hashkey"] = hashkey obj_dict["pack_id"] = self._REPACK_PACK_ID - obj_dict["compressed"] = compressed obj_dict["size"] = size obj_dict["offset"] = write_pack_handle.tell() + if compress_mode == CompressMode.KEEP: + obj_dict["compressed"] = compressed + elif compress_mode == CompressMode.YES: + obj_dict["compressed"] = True + elif compress_mode == CompressMode.NO: + obj_dict["compressed"] = False # Transfer data in chunks. # No need to rehash - it's the same container so the same hash. - # Not checking the compression on source or destination - we are assuming - # for now that the mode is KEEP. + # Compression cases + # Original Mode Action + # - KEEP Copy in chunks + # yes KEEP Copy in chunks + # no KEEP Copy in chunks + # yes NO Copy in chunks, decompress the stream + # no NO Copy in chunks + # yes YES Copy in chunks + # no YES Copy in chunks, compress when writing + if compress_mode == CompressMode.YES and not compressed: + compressobj = self._get_compressobj_instance() + else: + compressobj = None + + # Read compressed, but write uncompressed + if compressed is True and compress_mode == CompressMode.NO: + read_handle = self._get_stream_decompresser()(read_handle) + while True: chunk = read_handle.read(self._CHUNKSIZE) if chunk == b"": # Returns an empty bytes object on EOF. break - write_pack_handle.write(chunk) + if compressobj is not None: + write_pack_handle.write(compressobj.compress(chunk)) + else: + write_pack_handle.write(chunk) + if compressobj is not None: + write_pack_handle.write(compressobj.flush()) obj_dict["length"] = write_pack_handle.tell() - obj_dict["offset"] # Appending for later bulk commit @@ -2621,7 +2715,7 @@ def repack_pack( obj_dicts = [] # Now we can safely delete the old object. I just check that there is no object still - # refencing the old pack, to be sure. + # referencing the old pack, to be sure. one_object_in_pack = session.execute( select(Obj.id).where(Obj.pack_id == pack_id).limit(1) ).all() @@ -2662,3 +2756,291 @@ def repack_pack( # We are now done. The temporary pack is gone, and the old `pack_id` # has now been replaced with an udpated, repacked pack. + + def archive_pack( + self, pack_id, run_read_test=True, trim_filenames=True + ): # pylint: disable=too-many-locals + """ + Archive the pack - transfer the pack into a ZIP archive + + This is accepted as a slow operation. + The destination may not be on the same file system, e.g. the archive folder may be on a + different (networked) file system. + + :param pack_id: ID of the pack to be archived + :param run_read_test: Test reading the archive file before committing changes. + :trim_filenames: Trim the filenames in the ZIP archive created so it only contains the first + few characters of the hash. + + """ + + pack_id = str(pack_id) + session = self._get_cached_session() + one_object_in_pack = session.execute( + select(Obj.id).where(Obj.pack_id == pack_id).limit(1) + ).all() + # No thing to archive for + if not one_object_in_pack: + # No objects. Clean up the pack file, if it exists. + if os.path.exists(self._get_pack_path_from_pack_id(pack_id)): + os.remove(self._get_pack_path_from_pack_id(pack_id)) + return + # Gather all information of the objects to be stored + stmt = ( + select( + Obj.id, Obj.hashkey, Obj.size, Obj.offset, Obj.length, Obj.compressed + ) + .where(Obj.pack_id == pack_id) + .order_by(Obj.offset) + ) + obj_dicts = [] + for rowid, hashkey, size, offset, length, compressed in session.execute(stmt): + obj_dict = {} + obj_dict["id"] = rowid + obj_dict["hashkey"] = hashkey + obj_dict["size"] = size + obj_dict["compressed"] = compressed + obj_dict["length"] = length + obj_dict["offset"] = offset + + # Process the file name of the zipped files + if self.loose_prefix_len: + obj_dict["filename"] = os.path.join( + hashkey[: self.loose_prefix_len], + hashkey[self.loose_prefix_len :], + ) + else: + obj_dict["filename"] = hashkey + obj_dicts.append(obj_dict) + # We now have a list of objects to be stored archived in the ZIP file + + # Check if we can trim the filename to save some overhead + # NOTE: If the names are trimmed, the archvie files should not be unzipped into the same + # folder + if trim_filenames is True: + lname = minimum_length_without_duplication( + [x["filename"] for x in obj_dicts] + ) + for obj_dict in obj_dicts: + obj_dict["filename"] = obj_dict["filename"][:lname] + + assert self.compression_algorithm.startswith( + "zlib" + ), "Cannot make archive with compression algorithm other than zlib" + compress_level = int(self.compression_algorithm.split("+")[1]) + + # Proceed with writing out the archive file + with open( + self._get_archive_path_from_pack_id(pack_id), "wb" + ) as write_pack_handle: + with zipfile.ZipFile( + write_pack_handle, + "w", + compression=zipfile.ZIP_DEFLATED, + compresslevel=compress_level, + ) as archive: + # Iterate all objects in the pack to be archived + for obj_dict in obj_dicts: + zinfo = zipfile.ZipInfo(filename=obj_dict["filename"]) + zinfo.file_size = obj_dict["size"] + # Set the correct compresslevel + self._set_compresslevel(zinfo) + with archive.open(zinfo, "w") as zip_write_handle: + with self.get_object_stream( + obj_dict["hashkey"] + ) as source_handle: + shutil.copyfileobj(source_handle, zip_write_handle) + # Update the obj_dict according to the zinfo files that have been set + obj_dict["length"] = zinfo.compress_size + # Offset was the current location minus the size of the compressed stream written + obj_dict["offset"] = write_pack_handle.tell() - zinfo.compress_size + obj_dict["compressed"] = True + + # The writing of the zip file is now completed, now we update the database + # but before that we valid the archvie file, just to be sure + # Validate the archvie file just created + if run_read_test is True: + test_result = self._validate_archvie_file( + self._get_archive_path_from_pack_id(pack_id), obj_dicts + ) + if test_result is False: + raise ValueError("Created archive file does not pass read tests") + + # Reading from the archive file works, now update the database + # I record the original "pack" file's path, since it will be changed once + # the archvied pack is active + original_pack_path = self._get_pack_path_from_pack_id(pack_id) + + # Update the pack table such that the pack is "archived" + + session.execute( + update(Pack) + .where(Pack.pack_id == pack_id) + .values( + state=PackState.ARCHIVED.value, + ) + ) + + # Update the Obj table with new length and compression + for obj_dict in obj_dicts: + obj_dict.pop("filename") + session.bulk_update_mappings(Obj, obj_dicts) + + # Commit change in compress, offset, length, for each object in the Obj table + session.commit() + + # We can now delete the original pack file + os.unlink(original_pack_path) + + def _get_archive_path_from_pack_id(self, pack_id: Union[str, int]) -> str: + """ + Return the path to the archived pack. + + The name includes the container_id, as the archive fold can be mounted from other + file systems. + + There are three possible cases: + + 1. The pack is not archived, return the default path + 2. The pack is archived, but not explicity path, return the default one + 3. The pack is archvied, but ha an explicity path, return the stored path + + """ + pack_id = str(pack_id) + session = self._get_cached_session() + pack = ( + session.execute(select(Pack).where(Pack.pack_id == pack_id)) + .scalars() + .first() + ) + # No location set - use the default one + if not pack or not pack.location: + return os.path.join( + self._get_archive_folder(), self.container_id + "-" + pack_id + ".zip" + ) + # We have a location set, it is an relative path we set it relative to the container folder + if not Path(pack.location).is_absolute(): + return os.path.join(self._folder, pack.location) + + return pack.location + + def _get_archive_folder(self) -> str: + """Return folder of the default `archive` folder""" + return os.path.join(self._folder, "archives") + + def _validate_archvie_file( + self, fpath: Union[Path, str], obj_dicts: List[Dict[str, Any]] + ) -> bool: + """ + Test reading from an archive file using the interface in this package. + + :param fpath: Path to the archive file + :obj_dicts: A list of dictionary containing the information of each objects to be read + + :returns: True if everything is OK, False if mismatch in hash is found. + """ + if len(obj_dicts) < 1: + return False + with open(fpath, "rb") as fhandle: + for obj_dict in obj_dicts: + reader = get_stream_decompresser(self.compression_algorithm)( + PackedObjectReader(fhandle, obj_dict["offset"], obj_dict["length"]) + ) + reader.set_zip_mode() + hashkey, size = compute_hash_and_size(reader, self.hash_type) + if hashkey != obj_dict["hashkey"] or size != obj_dict["size"]: + return False + return True + + def _set_compresslevel(self, zinfo): + # The compression used in the archvie must be the same as that defined by the container + if self.compression_algorithm.startswith("zlib"): + zinfo.compress_type = zipfile.ZIP_DEFLATED + zinfo._compresslevel = int( # pylint: disable=protected-access + self.compression_algorithm.split("+")[1] + ) + else: + raise ValueError( + "Compression algorithm is in-compatible with pack archiving." + ) + + def lossen_archive(self, archive_file: Union[str, Path], dst: Union[str, Path]): + """ + Extract archived pack into a destination folder as loose objects + + The destination folder must not exist to begin with. + This method renames the file names (which may be truncated) to their full hash values. + The destination folder can be copied into "loose" folder of a container which effectively + imports the data from the archive into that container. + """ + assert not Path(dst).exists(), "Destination folder already exists!" + with zipfile.ZipFile(archive_file, "r") as zfile: + zfile.extractall(path=dst) + + # Walk the directory and rename files according to the configuration of this container + for dirname, _, files in list(os.walk(dst)): + for file in files: + fullname = os.path.join(dirname, file) + with open(fullname, "rb") as fhandle: + hash_value, _ = compute_hash_and_size( + fhandle, hash_type=self.hash_type + ) + # Rename the file + new_filename = hash_value[self.loose_prefix_len :] + # Make parent directories if not exist already + Path(new_filename).parent.mkdir(exist_ok=True) + # Rename the files + os.rename(fullname, os.path.join(dirname, new_filename)) + + def get_archive_locations(self) -> Dict[str, str]: + """ + Return the location of each archived packs + """ + paths = {} + for pack_id in self.get_archived_pack_ids(return_str=True): + paths[str(pack_id)] = self._get_archive_path_from_pack_id(pack_id) + return paths + + def _update_archive_location(self, pack_id, location, force=False): + """ + Set the location of archive packs + """ + location = Path(location) + + if str(pack_id) not in self.get_archived_pack_ids(return_str=True): + raise ValueError(f"Pack {pack_id} is not an archived pack.") + + if not force and not Path(location).exists(): + raise ValueError(f"Suppied pack location {location} does not exist!") + + # Update the location and validate the new archive + + session = self._get_cached_session() + result = session.execute( + update(Pack) + .where(Pack.pack_id == str(pack_id)) + .values(location=str(location)) + ) + # Check we have actually updated the content + assert result.rowcount == 1 # type: ignore + + if not force: + # See if we can read the first few objects from the new archive + objs = ( + session.execute(select(Obj).filter_by(pack_id=int(pack_id)).limit(10)) + .scalars() + .all() + ) + try: + for obj in objs: + with self.get_object_stream(obj.hashkey) as stream: + hashkey, _ = compute_hash_and_size(stream, self.hash_type) + assert ( + hashkey == obj.hashkey + ), "Error reading object from the new archive location!" + except Exception as error: + session.rollback() + raise error + + # All good - commit the changes + session.commit() diff --git a/disk_objectstore/database.py b/disk_objectstore/database.py index 73acc06..59fa9ee 100644 --- a/disk_objectstore/database.py +++ b/disk_objectstore/database.py @@ -1,4 +1,5 @@ """Models for the container index file (SQLite DB).""" +import enum import os from typing import Optional @@ -31,6 +32,24 @@ class Obj(Base): # pylint: disable=too-few-public-methods ) # integer ID of the pack in which this entry is stored +class PackState(enum.Enum): + """Enum for valid sate of seal packs""" + + ARCHIVED = "Archived" + ACTIVE = "Active" + + +class Pack(Base): # pylint: disable=too-few-public-methods + """The table for storing the state of pack files. If missing, it means that the pack is currently active""" + + __tablename__ = "db_pack" + + id = Column(Integer, primary_key=True) + pack_id = Column(String, primary_key=False) + state = Column(String, nullable=False, unique=False) + location = Column(String, nullable=True) + + def get_session( path: str, create: bool = False, raise_if_missing: bool = False ) -> Optional[Session]: diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index 5622aa0..d4859ff 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -653,6 +653,14 @@ def __init__(self, compressed_stream: StreamSeekBytesType) -> None: self._internal_buffer = b"" self._pos = 0 + def set_zip_mode(self): + """Switch to ZIP mode - decompresss with WBITS=-15""" + self._decompressor = self.decompressobj_class(-15) + + def set_zlib_mode(self): + """Switch to normal operation mode""" + self._decompressor = self.decompressobj_class() + @property def mode(self) -> str: return getattr(self._compressed_stream, "mode", "rb") @@ -1271,3 +1279,17 @@ def merge_sorted(iterator1: Iterable[Any], iterator2: Iterable[Any]) -> Iterator for item, _ in detect_where_sorted(iterator1, iterator2): # Whereever it is (only left, only right, on both) I return the object. yield item + + +def minimum_length_without_duplication(names, min_length=8): + """ + Find how many characters is needed to ensure there is no conflict among a set of filenames + """ + length = min_length - 1 + length_ok = False + while not length_ok: + length += 1 + trimmed = {name[:length] for name in names} + length_ok = len(trimmed) == len(names) + + return length diff --git a/tests/test_container.py b/tests/test_container.py index da1b1df..587e17e 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -1,5 +1,5 @@ """Test of the object-store container module.""" -# pylint: disable=too-many-lines,protected-access +# pylint: disable=too-many-lines,protected-access, redefined-outer-name import functools import hashlib import io @@ -9,12 +9,16 @@ import shutil import stat import tempfile +import zipfile +from pathlib import Path import psutil import pytest import disk_objectstore.exceptions as exc from disk_objectstore import CompressMode, Container, ObjectType, database, utils +from disk_objectstore.container import select, update +from disk_objectstore.database import Pack, PackState COMPRESSION_ALGORITHMS_TO_TEST = ["zlib+1", "zlib+9"] @@ -1070,7 +1074,7 @@ def test_sizes( def test_get_objects_stream_closes(temp_container, generate_random_data): """Test that get_objects_stream_and_meta closes intermediate streams. - I also check that at most one additional file is open at any given time. + I also check that at most two additional file is open at any given time. .. note: apparently, at least on my Mac, even if I forget to close a file, this is automatically closed when it goes out of scope - so I add also the test that, inside the loop, at most one more file is open. @@ -1093,7 +1097,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): obj_md5s.keys(), skip_if_missing=True ): # I don't use the triplets - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 # Check that at the end nothing is left open assert len(current_process.open_files()) == start_open_files @@ -1103,7 +1107,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): ) as triplets: # I loop over the triplets, but I don't do anything for _ in triplets: - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 # Check that at the end nothing is left open assert len(current_process.open_files()) == start_open_files @@ -1114,7 +1118,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): ) as triplets: # I loop over the triplets, but I don't do anything for _, stream, _ in triplets: - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 stream.read() # Check that at the end nothing is left open @@ -1131,7 +1135,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): with temp_container.get_objects_stream_and_meta(obj_md5s.keys()): # I don't use the triplets - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 # Check that at the end nothing is left open assert len(current_process.open_files()) == start_open_files @@ -1139,7 +1143,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): with temp_container.get_objects_stream_and_meta(obj_md5s.keys()) as triplets: # I loop over the triplets, but I don't do anything for _ in triplets: - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 # Check that at the end nothing is left open assert len(current_process.open_files()) == start_open_files @@ -1150,7 +1154,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): ) as triplets: # I loop over the triplets, but I don't do anything for _, stream, _ in triplets: - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 stream.read() # Check that at the end nothing is left open assert len(current_process.open_files()) == start_open_files @@ -3266,6 +3270,85 @@ def test_repack(temp_dir): temp_container.close() +def test_repack_compress_modes(temp_dir): + """ + Test the repacking functionality and handling of CompressMode. + """ + temp_container = Container(temp_dir) + temp_container.init_container(clear=True, pack_size_target=39) + + # data of 10 bytes each. Will fill two packs. + data = [ + b"-123456789", + b"a123456789", + b"b123456789", + b"c123456789", + b"d123456789", + b"e123456789", + b"f123456789", + b"g123456789", + b"h123456789", + ] + compress_flags = [False, True, True, False, False, False, True, True, False] + + hashkeys = [] + # Add them one by one, so I am sure in wich pack they go + for datum, compress in zip(data, compress_flags): + hashkeys.append( + temp_container.add_objects_to_pack([datum], compress=compress)[0] + ) + + assert temp_container.get_object_meta(hashkeys[0])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[1])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[2])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[3])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[4])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[5])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[6])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[7])["pack_id"] == 2 + assert temp_container.get_object_meta(hashkeys[8])["pack_id"] == 2 + + # I check which packs exist + assert sorted(temp_container._list_packs()) == [ + "0", + "1", + "2", + ] + + # I now repack + temp_container.repack_pack(0, compress_mode=CompressMode.NO) + assert temp_container.get_object_meta(hashkeys[0])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[0])["pack_compressed"] is False + assert temp_container.get_object_meta(hashkeys[1])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[1])["pack_compressed"] is False + assert temp_container.get_object_meta(hashkeys[2])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[2])["pack_compressed"] is False + + temp_container.repack_pack(1, compress_mode=CompressMode.YES) + assert temp_container.get_object_meta(hashkeys[3])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[3])["pack_compressed"] is True + assert temp_container.get_object_meta(hashkeys[4])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[4])["pack_compressed"] is True + assert temp_container.get_object_meta(hashkeys[5])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[5])["pack_compressed"] is True + assert temp_container.get_object_meta(hashkeys[6])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[6])["pack_compressed"] is True + + temp_container.repack_pack(1, compress_mode=CompressMode.KEEP) + assert temp_container.get_object_meta(hashkeys[7])["pack_id"] == 2 + assert temp_container.get_object_meta(hashkeys[7])["pack_compressed"] is True + assert temp_container.get_object_meta(hashkeys[8])["pack_id"] == 2 + assert temp_container.get_object_meta(hashkeys[8])["pack_compressed"] is False + + # Check that the content is still correct + # Should not raise + errors = temp_container.validate() + assert not any(errors.values()) + + # Important before exiting from the tests + temp_container.close() + + def test_not_implemented_repacks(temp_container): """Check the error for not implemented repack methods.""" # We need to have at least one pack @@ -3325,3 +3408,262 @@ def test_unknown_compressers(temp_container, compression_algorithm): temp_container.init_container( clear=True, compression_algorithm=compression_algorithm ) + + +@pytest.fixture +def container_with_archive(temp_dir): + """Return an contain with pack 1 archived""" + temp_container = Container(temp_dir) + temp_container.init_container(clear=True, pack_size_target=39) + + # data of 10 bytes each. Will fill two packs. + data = [ + b"-123456789", + b"a123456789", + b"b123456789", + b"c123456789", + b"d123456789", + b"e123456789", + b"f123456789", + b"g123456789", + b"h123456789", + ] + + hashkeys = [] + # Add them one by one, so I am sure in wich pack they go + for datum in data: + hashkeys.append(temp_container.add_objects_to_pack([datum])[0]) + + assert temp_container.get_object_meta(hashkeys[0])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[1])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[2])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[3])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[4])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[5])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[6])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[7])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[8])["pack_id"] == 2 + + # I check which packs exist + assert sorted(temp_container._list_packs()) == [ + "0", + "1", + "2", + ] + + counts = temp_container.count_objects() + assert counts["packed"] == len(data) + size = temp_container.get_total_size() + assert size["total_size_packed"] == 10 * len(data) + assert size["total_size_packfiles_on_disk"] == 10 * len(data) + temp_container.archive_pack(1) + + yield temp_dir, temp_container, data, hashkeys + temp_container.close() + + +def test_archive(container_with_archive): + """Test the repacking functionality.""" + temp_dir, temp_container, data, hashkeys = container_with_archive + + assert temp_container._get_pack_path_from_pack_id(1).endswith("zip") + + # Can still read everything + for idx, value in enumerate(data): + assert temp_container.get_object_content(hashkeys[idx]) == value + + # Validate the hashes + temp_container._validate_hashkeys_pack(1) + + size = temp_container.get_total_size() + # Due to the overhead, packs on the disk now takes more space.... + assert size["total_size_packfiles_on_disk"] > 10 * len(data) + + # Check that the Zipfile generated is valid + with zipfile.ZipFile(temp_container._get_pack_path_from_pack_id(1)) as zfile: + assert len(zfile.infolist()) == 4 + zfile.testzip() + + # Test loosen the objects and then import them into a new container + temp_container.lossen_archive( + temp_container._get_pack_path_from_pack_id(1), + os.path.join(temp_dir, "temp_loose"), + ) + temp_container2 = Container(os.path.join(temp_dir, "sub")) + temp_container2.init_container() + + shutil.rmtree(os.path.join(temp_dir, "sub/loose")) + shutil.copytree( + os.path.join(temp_dir, "temp_loose"), + os.path.join(temp_dir, "sub/loose"), + ) + assert temp_container2.count_objects()["loose"] == 4 + temp_container2.pack_all_loose() + temp_container2.clean_storage() + assert temp_container2.count_objects()["packed"] == 4 + assert temp_container2.count_objects()["loose"] == 0 + size2 = temp_container2.get_total_size() + size2["total_size_packfiles_on_disk"] = 10 * 4 + # Validate that we are all good + for value in temp_container2.validate().values(): + assert not value + + +def test_archive_path_settings(container_with_archive): + """Setting getting/setting container paths""" + _, temp_container, _, _ = container_with_archive + + assert "1" in temp_container.get_archive_locations() + assert temp_container.get_archive_locations()["1"].endswith( + f"{temp_container.container_id}-1.zip" + ) + + location = Path(temp_container.get_archive_locations()["1"]) + new_location = location.with_name("11.zip") + # Update the location + with pytest.raises(ValueError): + temp_container._update_archive_location(1, new_location) + assert temp_container.get_archive_locations()["1"] == str(location) + + # New location exists, but contains invalid data + Path(new_location).write_text("aa") + with pytest.raises(ValueError): + temp_container._update_archive_location(1, new_location) + assert temp_container.get_archive_locations()["1"] == str(location) + + # Not valid, but we forced it + temp_container._update_archive_location(1, new_location, force=True) + assert temp_container.get_archive_locations()["1"] == str(new_location) + temp_container._update_archive_location(1, location) + + # This should work + os.unlink(new_location) + location.rename(new_location) + temp_container._update_archive_location(1, new_location) + assert temp_container.get_archive_locations()["1"] == str(new_location) + + +def test_get_archive_path(temp_container): + """ + Test getting archvie path + """ + + # New pack + path = temp_container._get_archive_path_from_pack_id(999) + assert ( + path + == os.path.join( + temp_container._get_archive_folder(), temp_container.container_id + ) + + "-999.zip" + ) + + # Existing active pack + session = temp_container._get_cached_session() + pack = Pack(pack_id="998", state=PackState.ACTIVE.value) + session.add(pack) + session.commit() + path = temp_container._get_archive_path_from_pack_id(998) + assert ( + path + == os.path.join( + temp_container._get_archive_folder(), temp_container.container_id + ) + + "-998.zip" + ) + + # Existing archive pack + pack = Pack(pack_id="997", state=PackState.ARCHIVED.value) + session.add(pack) + session.commit() + path = temp_container._get_archive_path_from_pack_id(997) + assert ( + path + == os.path.join( + temp_container._get_archive_folder(), temp_container.container_id + ) + + "-997.zip" + ) + + temp_dir = temp_container._folder + abs_archive = os.path.join(temp_dir, "temp_archive/2.zip") + pack = Pack(pack_id="996", state=PackState.ARCHIVED.value, location=abs_archive) + session.add(pack) + session.commit() + path = temp_container._get_archive_path_from_pack_id(996) + assert path == abs_archive + + # Relative path is relative to the container base folder + pack = Pack( + pack_id="995", state=PackState.ARCHIVED.value, location="archive2/archive.zip" + ) + session.add(pack) + session.commit() + path = temp_container._get_archive_path_from_pack_id(995) + assert path == os.path.join(temp_container._folder, "archive2/archive.zip") + + temp_container.close() + + +def test_get_pack_id_with_archive(temp_dir): + """Test get_pack_id_to_write_to with archived packs""" + + temp_container = Container(temp_dir) + temp_container.init_container(clear=True, pack_size_target=39) + + # data of 10 bytes each. Will fill two packs. + data = [ + b"-123456789", + b"a123456789", + b"b123456789", + b"c123456789", + b"d123456789", + b"e123456789", + b"f123456789", + b"g123456789", + b"h123456789", + ] + + hashkeys = [] + # Add them one by one, so I am sure in wich pack they go + for datum in data: + hashkeys.append(temp_container.add_objects_to_pack([datum])[0]) + + session = temp_container._get_cached_session() + packs = session.execute(select(Pack)).scalars().all() + # Check there should be three packs + assert len(packs) == 3 + + # Three packs should be recorded in the Pack table as well + assert ( + len( + session.execute( + select(Pack.pack_id).filter_by(state=PackState.ACTIVE.value) + ).all() + ) + == 3 + ) + + # Now, the next object should writ to pack 2, since it is not ful + assert temp_container._get_pack_id_to_write_to() == 2 + # Mark the third pack as ARCHIVE + to_archive = packs[-1].pack_id + session.execute( + update(Pack) + .where(Pack.pack_id == str(to_archive)) + .values( + state=PackState.ARCHIVED.value, + location=os.path.join(temp_dir, f"{to_archive}.zip"), + ) + ) + session.commit() + + # Writing to pack 2 is not allowed anymore + assert temp_container._get_pack_id_to_write_to() == 3 + + # Getting the "pack_path" for pack 2 should now point to the custom location + assert temp_container._get_pack_path_from_pack_id(to_archive) == os.path.join( + temp_dir, f"{to_archive}.zip" + ) + + temp_container.close() diff --git a/tests/test_utils.py b/tests/test_utils.py index 1c85731..a96a189 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1680,3 +1680,10 @@ def test_unknown_compressers(): utils.get_compressobj_instance(invalid) with pytest.raises(ValueError): utils.get_stream_decompresser(invalid) + + +def test_minimum_length_compute(): + """Test finding minmum string length without duplication""" + assert utils.minimum_length_without_duplication(["a", "b"]) == 8 + assert utils.minimum_length_without_duplication(["aaaaaaaa", "bbbbbbbb"]) == 8 + assert utils.minimum_length_without_duplication(["aaaaaaaac", "aaaaaaaab"]) == 9