From d52695d99752bc9927a44feac258980be570fe74 Mon Sep 17 00:00:00 2001 From: mxmlnkn Date: Tue, 15 Oct 2024 00:55:27 +0200 Subject: [PATCH] [wip][feature] Support remote and compressed index files - [ ] Finish the code path for remote files - [ ] Add tests - [ ] Check that temporary files are deleted Add tests for this? --- core/ratarmountcore/SQLiteIndex.py | 129 ++++++++++++++++++++++++++-- core/ratarmountcore/compressions.py | 7 +- 2 files changed, 129 insertions(+), 7 deletions(-) diff --git a/core/ratarmountcore/SQLiteIndex.py b/core/ratarmountcore/SQLiteIndex.py index 5960eeab..a5a456da 100644 --- a/core/ratarmountcore/SQLiteIndex.py +++ b/core/ratarmountcore/SQLiteIndex.py @@ -4,10 +4,12 @@ import json import os import re +import shutil import sqlite3 import stat import sys import tarfile +import tempfile import time import traceback import urllib.parse @@ -15,6 +17,11 @@ from typing import Any, AnyStr, Callable, Dict, IO, List, Optional, Tuple, Union from dataclasses import dataclass +try: + import fsspec +except ImportError: + fsspec = None # type: ignore + try: import indexed_gzip except ImportError: @@ -27,9 +34,22 @@ from .version import __version__ from .MountSource import FileInfo, createRootFileInfo -from .compressions import TAR_COMPRESSION_FORMATS +from .compressions import ( + CompressionInfo, + LIBARCHIVE_FILTER_FORMATS, + TAR_COMPRESSION_FORMATS, + detectCompression, + findAvailableOpen, +) from .SQLiteBlobFile import SQLiteBlobsFile, WriteSQLiteBlobs -from .utils import RatarmountError, IndexNotOpenError, InvalidIndexError, findModuleVersion, MismatchingIndexError +from .utils import ( + CompressionError, + IndexNotOpenError, + InvalidIndexError, + RatarmountError, + MismatchingIndexError, + findModuleVersion, +) def getSqliteTables(connection: sqlite3.Connection): @@ -214,6 +234,9 @@ def __init__( self.sqlConnection: Optional[sqlite3.Connection] = None # Will hold the actually opened valid path to an index file self.indexFilePath: Optional[str] = None + # This is true if the index file found was compressed or an URL and had to be downloaded + # and/or extracted into a temporary folder. + self.indexFilePathDeleteOnClose: bool = False self.encoding = encoding self.possibleIndexFilePaths = SQLiteIndex.getPossibleIndexFilePaths( indexFilePath, indexFolders, archiveFilePath, ignoreCurrentFolder @@ -224,6 +247,7 @@ def __init__( self.indexMinimumFileCount = indexMinimumFileCount self.backendName = backendName self._insertedRowCount = 0 + self._temporaryIndexFile = None assert self.backendName @@ -279,7 +303,6 @@ def openExisting(self, checkMetadata: Optional[Callable[[Dict[str, Any]], None]] """Tries to find an already existing index.""" for indexPath in self.possibleIndexFilePaths: if self._tryLoadIndex(indexPath, checkMetadata=checkMetadata): - self.indexFilePath = indexPath break def openInMemory(self): @@ -327,6 +350,8 @@ def close(self): pass self.sqlConnection = None + self._setIndexFilePath(None) + def getConnection(self) -> sqlite3.Connection: if self.sqlConnection: return self.sqlConnection @@ -952,6 +977,29 @@ def indexIsLoaded(self) -> bool: return True + def _setIndexFilePath(self, indexFilePath: Optional[str], deleteOnClose: bool = False): + # This is called from __del__, so we need to account for this being called when something + # in the constructor raises an exception and not all members of self exist. + if ( + getattr(self, 'indexFilePath', None) + and getattr(self, 'indexFilePathDeleteOnClose', False) + and os.path.isfile(self.indexFilePath) + ): + try: + os.remove(self.indexFilePath) + except Exception as exception: + if self.printDebug >= 1: + print( + "[Warning] Failed to remove temporarily downloaded and/or extracted index file at:", + self.indexFilePath, + "because of:", + exception, + ) + + if hasattr(self, 'indexFilePath') and hasattr(self, 'indexFilePathDeleteOnClose'): + self.indexFilePath = indexFilePath + self.indexFilePathDeleteOnClose = deleteOnClose + def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Dict[str, Any]], None]]) -> None: """ Loads the given index SQLite database and checks it for validity raising an exception if it is invalid. @@ -964,7 +1012,73 @@ def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Di if self.indexIsLoaded(): return - self.sqlConnection = self._openSqlDb(indexFilePath) + # Download and/or extract the file to a temporary file if necessary. + + # Strip file:// prefix to avoid useless copies to the temporary directory. + fileURLPrefix = 'file://' + while indexFilePath.startswith(fileURLPrefix): + indexFilePath = indexFilePath[len(fileURLPrefix) :] + + temporaryFolder = os.environ.get("RATARMOUNT_INDEX_TMPDIR", None) + + def _detectCompression(file) -> Optional[Tuple[str, CompressionInfo]]: + compressionsToTest = TAR_COMPRESSION_FORMATS.copy() + compressionsToTest.update(LIBARCHIVE_FILTER_FORMATS) + compression = detectCompression(file, printDebug=self.printDebug, compressionsToTest=compressionsToTest) + if compression and compression in compressionsToTest: + return compression, compressionsToTest[compression] + return None + + if '://' in indexFilePath: + if fsspec is None: + raise RatarmountError( + "Detected an URL for the index path but fsspec could not be imported!\n" + "Try installing it with 'pip install fsspec' or 'pip install ratarmount[full]'." + ) + + # TODO Maybe manual deletion not even necessary when using tempfile correctly? + self._temporaryIndexFile = tempfile.NamedTemporaryFile(suffix=".tmp.downloaded.index", dir=temporaryFolder) + temporaryIndexFilePath = self._temporaryIndexFile.name + with fsspec.open(indexFilePath) as file, open(temporaryIndexFilePath, 'wb') as targetFile: + result = _detectCompression(file) + if result: + compression, info = result + if self.printDebug >= 2: + print(f"[Info] Detected {compression}-compressed remote index: {indexFilePath}") + + # decompressedFile = info.open(file) + # TODO IMPLEMENT. See non-remote version below and reuse code. + # shutil.copyfileobj(file, targetFile) + else: + shutil.copyfileobj(file, targetFile) + else: + with open(indexFilePath, 'rb') as file: + result = _detectCompression(file) + if result: + compression, info = result + if self.printDebug >= 2: + print(f"[Info] Detected {compression}-compressed index.") + + formatOpen = findAvailableOpen(compression) + if not formatOpen: + moduleNames = [module.name for module in TAR_COMPRESSION_FORMATS[compression].modules] + raise CompressionError( + f"Cannot open a {compression} compressed index file {indexFilePath} " + f"without any of these modules: {moduleNames}" + ) + + self._temporaryIndexFile = tempfile.NamedTemporaryFile( + suffix=".tmp.sqlite.index", dir=temporaryFolder + ) + temporaryIndexFilePath = self._temporaryIndexFile.name + with formatOpen(file) as decompressedFile, open(temporaryIndexFilePath, 'wb') as targetFile: + shutil.copyfileobj(decompressedFile, targetFile) + else: + temporaryIndexFilePath = indexFilePath + + # Done downloading and/or extracting the SQLite index. + + self.sqlConnection = self._openSqlDb(temporaryIndexFilePath) tables = getSqliteTables(self.sqlConnection) versions = None try: @@ -1036,7 +1150,12 @@ def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Di pass if self.printDebug >= 1: - print(f"Successfully loaded offset dictionary from {str(indexFilePath)}") + message = "Successfully loaded offset dictionary from " + str(indexFilePath) + if temporaryIndexFilePath != indexFilePath: + message += " temporarily downloaded/decompressed into: " + str(temporaryIndexFilePath) + print(message) + + self._setIndexFilePath(temporaryIndexFilePath) def _tryLoadIndex( self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Dict[str, Any]], None]] = None diff --git a/core/ratarmountcore/compressions.py b/core/ratarmountcore/compressions.py index 52815e81..9a3a7f52 100644 --- a/core/ratarmountcore/compressions.py +++ b/core/ratarmountcore/compressions.py @@ -571,7 +571,10 @@ def getGzipInfo(fileobj: IO[bytes]) -> Optional[Tuple[str, int]]: def detectCompression( - fileobj: IO[bytes], prioritizedBackends: Optional[List[str]], printDebug: int = 0 + fileobj: IO[bytes], + prioritizedBackends: Optional[List[str]] = None, + printDebug: int = 0, + compressionsToTest: Dict[str, CompressionInfo] = TAR_COMPRESSION_FORMATS, ) -> Optional[str]: # isinstance(fileobj, io.IOBase) does not work for everything, e.g., for paramiko.sftp_file.SFTPFile # because it does not inherit from io.IOBase. Therefore, do duck-typing and test for required methods. @@ -594,7 +597,7 @@ def detectCompression( return None oldOffset = fileobj.tell() - for compressionId, compression in TAR_COMPRESSION_FORMATS.items(): + for compressionId, compression in compressionsToTest.items(): # The header check is a necessary condition not a sufficient condition. # Especially for gzip, which only has 2 magic bytes, false positives might happen. # Therefore, only use the magic bytes based check if the module could not be found