Skip to content

Commit

Permalink
fix: fixing the directory() issue due to uploading the local_prefix i…
Browse files Browse the repository at this point in the history
…nstead of GCS prefix. (#41)

first contribution here. Think I found the error. 

This works for my test Snakefile: 

```
rule all:
    input:
        "data/done.txt",
        "data2/done.txt"

rule get_directories:
    input:
        expand(
            "data/{directory}",
            directory = ["a", "b", "c"]
        ),
    output:
        touch("data/done.txt")

    
rule make_a_directory:
    output:
        directory("data/{directory}")
    shell:
        """
        mkdir -p {output}
        touch {output}/test.txt
        """

rule get_files:
    input:
        expand(
            "data2/{directory}.txt",
            directory = ["a", "b", "c"]
        )
    output:
        touch("data2/done.txt")
        
rule make_a_file:
    output:
        "data2/{directory}.txt"
    shell:
        """
        mkdir -p $(dirname {output})
        touch {output}
        """
```

I also tried to add another test case which seems to work. 
I see that the test suites in the storage plugins use `testClasses`. 
Would adding actual snakefiles and a script to run them also help or
does that not follow the plugin standards?


 

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Introduced new methods for managing Google Cloud Storage:
`ensure_bucket_exists` and `upload_directory`.
	- Added a test case for handling non-empty directories.

- **Bug Fixes**
- Enhanced the deletion process for directories in Google Cloud Storage.

- **Chores**
- Created a `.gitignore` file to improve repository cleanliness by
excluding unnecessary files.
- Updated GitHub Actions CI configuration for improved test output
visibility.

- **Tests**
- Enhanced test suite with additional debugging output and a focus on
file operations.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Johannes Köster <[email protected]>
  • Loading branch information
jjjermiah and johanneskoester authored Aug 19, 2024
1 parent c7ba28e commit 27c80dc
Show file tree
Hide file tree
Showing 6 changed files with 662 additions and 522 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:
poetry run python tests/test_fake_gcs.py
- name: Run pytest
run: poetry run coverage run -m pytest -v tests/tests.py
run: poetry run coverage run -m pytest -vv -s tests/tests.py

- name: Run Coverage
run: poetry run coverage report -m
17 changes: 17 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# coverage

.coverage

# .pytest_cache

pytest_cache


# __pycache__ and any .pyc files

__pycache__
*.pyc

**/__pycache__/
**/*.pyc

1,042 changes: 558 additions & 484 deletions poetry.lock

Large diffs are not rendered by default.

82 changes: 60 additions & 22 deletions snakemake_storage_plugin_gcs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
get_constant_prefix,
Mtime,
)
from snakemake_interface_common.logging import get_logger

from urllib.parse import urlparse
import base64
import os

from pathlib import Path
import google.cloud.exceptions
from google.cloud import storage
from google.api_core import retry
Expand Down Expand Up @@ -266,6 +268,7 @@ def __post_init__(self):
self.key = parsed.path.lstrip("/")
self._local_suffix = self._local_suffix_from_key(self.key)
self._is_dir = None
self.logger = get_logger()

def cleanup(self):
# Close any open connections, unmount stuff, etc.
Expand All @@ -288,8 +291,8 @@ async def inventory(self, cache: IOCacheStorageInterface):
- cache.size
"""
if self.get_inventory_parent() in cache.exists_in_storage:
# bucket has been inventorized before, stop here
return
# bucket has been inventorized before, stop here
return

# check if bucket exists
if not self.bucket.exists():
Expand Down Expand Up @@ -381,29 +384,20 @@ def store_object(self):
TODO: note from vsoch - I'm not sure I read this function name right,
but I didn't find an equivalent "upload" function so I thought this might
be it. The original function comment is below.
be it. The original function comment is below.
"""
# Ensure that the object is stored at the location specified by
# self.local_path().
try:
if not self.bucket.exists():
self.client.create_bucket(self.bucket)
self.ensure_bucket_exists()

# Distinguish between single file, and folder
f = self.local_path()
if os.path.isdir(f):
# Ensure the "directory" exists
self.blob.upload_from_string(
"", content_type="application/x-www-form-urlencoded;charset=UTF-8"
)
for root, _, files in os.walk(f):
for filename in files:
filename = os.path.join(root, filename)
bucket_path = filename.lstrip(self.bucket.name).lstrip("/")
blob = self.bucket.blob(bucket_path)
blob.upload_from_filename(filename)
local_object = self.local_path()
if os.path.isdir(local_object):
self.upload_directory(local_directory_path=local_object)
else:
self.blob.upload_from_filename(f)
self.blob.upload_from_filename(local_object)

except google.cloud.exceptions.Forbidden as e:
raise WorkflowError(
e,
Expand All @@ -413,13 +407,57 @@ def store_object(self):
"--scopes (see Snakemake documentation).",
)

def ensure_bucket_exists(self) -> None:
"""
Check that the bucket exists, if not create it.
"""
if not self.bucket.exists():
self.client.create_bucket(self.bucket)

def upload_directory(self, local_directory_path: Path):
"""
Upload a directory to the storage.
"""
self.ensure_bucket_exists()

# if the local directory is empty, we need to create a blob
# with no content to represent the directory
if not os.listdir(local_directory_path):
self.blob.upload_from_string(
"", content_type="application/x-www-form-urlencoded;charset=UTF-8"
)

for root, _, files in os.walk(local_directory_path):
for filename in files:
relative_filepath = os.path.join(root, filename)
local_prefix = self.provider.local_prefix.as_posix()

# remove the prefix ("".snakemake/storage/gcs/{bucket_name}/)
# this gives us the path to the file relative to the bucket
bucket_file_path = (
relative_filepath.removeprefix(local_prefix)
.lstrip("/")
.removeprefix(self.bucket_name)
.lstrip("/")
)

blob = self.bucket.blob(bucket_file_path)
blob.upload_from_filename(relative_filepath)

@retry.Retry(predicate=google_cloud_retry_predicate)
def remove(self):
def remove(self) -> None:
"""
Remove the object from the storage.
"""
# This was a total guess lol
self.blob.delete()
if self.is_directory():
prefix = self.key
if not prefix.endswith("/"):
prefix += "/"
blobs = self.client.list_blobs(self.bucket_name, prefix=prefix)
for blob in blobs:
blob.delete()
else:
self.blob.delete()

# The following to methods are only required if the class inherits from
# StorageObjectGlob.
Expand Down
2 changes: 1 addition & 1 deletion tests/test_fake_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

# Create a new Bucket
bucket = client.bucket("snakemake-test-bucket")

try:
client.create_bucket(bucket)
except Conflict:
Expand All @@ -54,6 +55,5 @@
blob = bucket.blob(file_name)
blob.upload_from_string(contents)


assert not bucket.blob("foo").exists()
print(list(bucket.list_blobs()))
39 changes: 25 additions & 14 deletions tests/tests.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os

# from unittest.mock import MagicMock, patch
import shutil
from typing import List, Optional, Type
import uuid

Expand All @@ -20,7 +19,7 @@

class TestStorage(TestStorageBase):
__test__ = True
files_only = False
files_only = True #

def get_query(self, tmp_path) -> str:
return "gcs://snakemake-test-bucket/test-file.txt"
Expand All @@ -41,35 +40,47 @@ def get_storage_provider_settings(self) -> Optional[StorageProviderSettingsBase]
def get_example_args(self) -> List[str]:
return []

def test_storage_dbg(self, tmp_path):
assert not (
self.store_only and self.retrieve_only
), "store_only and retrieve_only may not be True at the same time"
# TODO remove if this is now in the base class
def test_storage_nonempty_directory(self, tmp_path):
# make a directory
tmpdir = "test_nonemptydir"

obj = self._get_obj(tmp_path, self.get_query(tmp_path))
# store the directory
obj = self._get_obj(tmp_path, f"gcs://snakemake-test-bucket/{tmpdir}")

stored = False
try:
if not self.retrieve_only:
obj.local_path().parent.mkdir(parents=True, exist_ok=True)
with open(obj.local_path(), "w") as f:
obj.local_path().mkdir(parents=True, exist_ok=True)

assert obj.is_directory()

print(obj.local_path())
print("Writing a file in the directory")
# write a file in the directory
with open(obj.local_path() / "testfile.txt", "w") as f:
f.write("test")
f.flush()

assert obj.bucket.exists()
assert obj.local_path().exists() and obj.local_path().is_dir()
print("Storing the directory")

obj.store_object()
stored = True
obj.local_path().unlink()

assert obj.exists()
print(obj.mtime())
print(obj.size())

if not self.store_only:
obj.local_path().parent.mkdir(parents=True, exist_ok=True)
obj.retrieve_object()
file = obj.local_path() / "testfile.txt"
assert file.exists()
print(file.read_text())

finally:
if not self.retrieve_only and stored and self.delete:
obj.remove()
shutil.rmtree(obj.local_path())

def test_list_candidate_matches(self, tmp_path):
obj = self._get_obj(tmp_path, "gcs://snakemake-test-bucket/")
Expand Down

0 comments on commit 27c80dc

Please sign in to comment.