Skip to content

Commit

Permalink
[wip][feature] Support remote and compressed index files
Browse files Browse the repository at this point in the history
 - [ ] Finish the code path for remote files
 - [ ] Add tests
 - [ ] Check that temporary files are deleted
       Add tests for this?
  • Loading branch information
mxmlnkn committed Oct 14, 2024
1 parent 1c36714 commit d52695d
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 7 deletions.
129 changes: 124 additions & 5 deletions core/ratarmountcore/SQLiteIndex.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@
import json
import os
import re
import shutil
import sqlite3
import stat
import sys
import tarfile
import tempfile
import time
import traceback
import urllib.parse

from typing import Any, AnyStr, Callable, Dict, IO, List, Optional, Tuple, Union
from dataclasses import dataclass

try:
import fsspec
except ImportError:
fsspec = None # type: ignore

try:
import indexed_gzip
except ImportError:
Expand All @@ -27,9 +34,22 @@

from .version import __version__
from .MountSource import FileInfo, createRootFileInfo
from .compressions import TAR_COMPRESSION_FORMATS
from .compressions import (
CompressionInfo,
LIBARCHIVE_FILTER_FORMATS,
TAR_COMPRESSION_FORMATS,
detectCompression,
findAvailableOpen,
)
from .SQLiteBlobFile import SQLiteBlobsFile, WriteSQLiteBlobs
from .utils import RatarmountError, IndexNotOpenError, InvalidIndexError, findModuleVersion, MismatchingIndexError
from .utils import (
CompressionError,
IndexNotOpenError,
InvalidIndexError,
RatarmountError,
MismatchingIndexError,
findModuleVersion,
)


def getSqliteTables(connection: sqlite3.Connection):
Expand Down Expand Up @@ -214,6 +234,9 @@ def __init__(
self.sqlConnection: Optional[sqlite3.Connection] = None
# Will hold the actually opened valid path to an index file
self.indexFilePath: Optional[str] = None
# This is true if the index file found was compressed or an URL and had to be downloaded
# and/or extracted into a temporary folder.
self.indexFilePathDeleteOnClose: bool = False
self.encoding = encoding
self.possibleIndexFilePaths = SQLiteIndex.getPossibleIndexFilePaths(
indexFilePath, indexFolders, archiveFilePath, ignoreCurrentFolder
Expand All @@ -224,6 +247,7 @@ def __init__(
self.indexMinimumFileCount = indexMinimumFileCount
self.backendName = backendName
self._insertedRowCount = 0
self._temporaryIndexFile = None

assert self.backendName

Expand Down Expand Up @@ -279,7 +303,6 @@ def openExisting(self, checkMetadata: Optional[Callable[[Dict[str, Any]], None]]
"""Tries to find an already existing index."""
for indexPath in self.possibleIndexFilePaths:
if self._tryLoadIndex(indexPath, checkMetadata=checkMetadata):
self.indexFilePath = indexPath
break

def openInMemory(self):
Expand Down Expand Up @@ -327,6 +350,8 @@ def close(self):
pass
self.sqlConnection = None

self._setIndexFilePath(None)

def getConnection(self) -> sqlite3.Connection:
if self.sqlConnection:
return self.sqlConnection
Expand Down Expand Up @@ -952,6 +977,29 @@ def indexIsLoaded(self) -> bool:

return True

def _setIndexFilePath(self, indexFilePath: Optional[str], deleteOnClose: bool = False):
# This is called from __del__, so we need to account for this being called when something
# in the constructor raises an exception and not all members of self exist.
if (
getattr(self, 'indexFilePath', None)
and getattr(self, 'indexFilePathDeleteOnClose', False)
and os.path.isfile(self.indexFilePath)
):
try:
os.remove(self.indexFilePath)
except Exception as exception:
if self.printDebug >= 1:
print(
"[Warning] Failed to remove temporarily downloaded and/or extracted index file at:",
self.indexFilePath,
"because of:",
exception,
)

if hasattr(self, 'indexFilePath') and hasattr(self, 'indexFilePathDeleteOnClose'):
self.indexFilePath = indexFilePath
self.indexFilePathDeleteOnClose = deleteOnClose

def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Dict[str, Any]], None]]) -> None:
"""
Loads the given index SQLite database and checks it for validity raising an exception if it is invalid.
Expand All @@ -964,7 +1012,73 @@ def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Di
if self.indexIsLoaded():
return

self.sqlConnection = self._openSqlDb(indexFilePath)
# Download and/or extract the file to a temporary file if necessary.

# Strip file:// prefix to avoid useless copies to the temporary directory.
fileURLPrefix = 'file://'
while indexFilePath.startswith(fileURLPrefix):
indexFilePath = indexFilePath[len(fileURLPrefix) :]

temporaryFolder = os.environ.get("RATARMOUNT_INDEX_TMPDIR", None)

def _detectCompression(file) -> Optional[Tuple[str, CompressionInfo]]:
compressionsToTest = TAR_COMPRESSION_FORMATS.copy()
compressionsToTest.update(LIBARCHIVE_FILTER_FORMATS)
compression = detectCompression(file, printDebug=self.printDebug, compressionsToTest=compressionsToTest)
if compression and compression in compressionsToTest:
return compression, compressionsToTest[compression]
return None

if '://' in indexFilePath:
if fsspec is None:
raise RatarmountError(
"Detected an URL for the index path but fsspec could not be imported!\n"
"Try installing it with 'pip install fsspec' or 'pip install ratarmount[full]'."
)

# TODO Maybe manual deletion not even necessary when using tempfile correctly?
self._temporaryIndexFile = tempfile.NamedTemporaryFile(suffix=".tmp.downloaded.index", dir=temporaryFolder)
temporaryIndexFilePath = self._temporaryIndexFile.name
with fsspec.open(indexFilePath) as file, open(temporaryIndexFilePath, 'wb') as targetFile:
result = _detectCompression(file)
if result:
compression, info = result
if self.printDebug >= 2:
print(f"[Info] Detected {compression}-compressed remote index: {indexFilePath}")

# decompressedFile = info.open(file)
# TODO IMPLEMENT. See non-remote version below and reuse code.
# shutil.copyfileobj(file, targetFile)
else:
shutil.copyfileobj(file, targetFile)
else:
with open(indexFilePath, 'rb') as file:
result = _detectCompression(file)
if result:
compression, info = result
if self.printDebug >= 2:
print(f"[Info] Detected {compression}-compressed index.")

formatOpen = findAvailableOpen(compression)
if not formatOpen:
moduleNames = [module.name for module in TAR_COMPRESSION_FORMATS[compression].modules]
raise CompressionError(
f"Cannot open a {compression} compressed index file {indexFilePath} "
f"without any of these modules: {moduleNames}"
)

self._temporaryIndexFile = tempfile.NamedTemporaryFile(
suffix=".tmp.sqlite.index", dir=temporaryFolder
)
temporaryIndexFilePath = self._temporaryIndexFile.name
with formatOpen(file) as decompressedFile, open(temporaryIndexFilePath, 'wb') as targetFile:
shutil.copyfileobj(decompressedFile, targetFile)
else:
temporaryIndexFilePath = indexFilePath

# Done downloading and/or extracting the SQLite index.

self.sqlConnection = self._openSqlDb(temporaryIndexFilePath)
tables = getSqliteTables(self.sqlConnection)
versions = None
try:
Expand Down Expand Up @@ -1036,7 +1150,12 @@ def _loadIndex(self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Di
pass

if self.printDebug >= 1:
print(f"Successfully loaded offset dictionary from {str(indexFilePath)}")
message = "Successfully loaded offset dictionary from " + str(indexFilePath)
if temporaryIndexFilePath != indexFilePath:
message += " temporarily downloaded/decompressed into: " + str(temporaryIndexFilePath)
print(message)

self._setIndexFilePath(temporaryIndexFilePath)

def _tryLoadIndex(
self, indexFilePath: AnyStr, checkMetadata: Optional[Callable[[Dict[str, Any]], None]] = None
Expand Down
7 changes: 5 additions & 2 deletions core/ratarmountcore/compressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,10 @@ def getGzipInfo(fileobj: IO[bytes]) -> Optional[Tuple[str, int]]:


def detectCompression(
fileobj: IO[bytes], prioritizedBackends: Optional[List[str]], printDebug: int = 0
fileobj: IO[bytes],
prioritizedBackends: Optional[List[str]] = None,
printDebug: int = 0,
compressionsToTest: Dict[str, CompressionInfo] = TAR_COMPRESSION_FORMATS,
) -> Optional[str]:
# isinstance(fileobj, io.IOBase) does not work for everything, e.g., for paramiko.sftp_file.SFTPFile
# because it does not inherit from io.IOBase. Therefore, do duck-typing and test for required methods.
Expand All @@ -594,7 +597,7 @@ def detectCompression(
return None

oldOffset = fileobj.tell()
for compressionId, compression in TAR_COMPRESSION_FORMATS.items():
for compressionId, compression in compressionsToTest.items():
# The header check is a necessary condition not a sufficient condition.
# Especially for gzip, which only has 2 magic bytes, false positives might happen.
# Therefore, only use the magic bytes based check if the module could not be found
Expand Down

0 comments on commit d52695d

Please sign in to comment.