Skip to content

Commit

Permalink
Merge pull request #40 from andrewfulton9/pr36
Browse files Browse the repository at this point in the history
Adds GCSPath
  • Loading branch information
andrewfulton9 authored Jan 8, 2022
2 parents 0b44a7b + 4ca8353 commit adb330d
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 17 deletions.
4 changes: 3 additions & 1 deletion environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@ channels:
- conda-forge
dependencies:
- python==3.8
- fsspec==0.8.4
- fsspec==2021.11.1
# optional
- requests
- s3fs
- jupyter
- ipython
- pytest
- vcrpy
- pylint
- flake8
- pyarrow
- moto
- pip
- pip:
- hadoop-test-cluster
- gcsfs
- nox

2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def install(session):

@nox.session(python=False)
def smoke(session):
session.install(*"pytest aiohttp requests".split())
session.install(*"pytest aiohttp requests gcsfs".split())
session.run(*"pytest --skiphdfs upath".split())


Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ test = [
"pylint",
"pytest",
"requests",
"s3fs",
"s3fs"
]

[tool.flit.scripts]
Expand Down
7 changes: 5 additions & 2 deletions upath/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def wrapper(*args, **kwargs):
return wrapper

def _transform_arg_paths(self, args, kwargs):
"""formats the path properly for the filesystem backend."""
"""Formats the path properly for the filesystem backend."""
args = list(args)
first_arg = args.pop(0)
if not kwargs.get("path"):
Expand All @@ -53,7 +53,7 @@ def _transform_arg_paths(self, args, kwargs):
return args, kwargs

def _format_path(self, s):
"""placeholder method for subclassed filesystems"""
"""Placeholder method for subclassed filesystems"""
return s

def __getattribute__(self, item):
Expand Down Expand Up @@ -273,6 +273,9 @@ def is_file(self):
return True
return False

def chmod(self, mod):
raise NotImplementedError

def rename(self, target):
# can be implemented, but may be tricky
raise NotImplementedError
Expand Down
50 changes: 50 additions & 0 deletions upath/implementations/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import upath.core
import os
import re


class _GCSAccessor(upath.core._FSSpecAccessor):
def __init__(self, parsed_url, *args, **kwargs):
super().__init__(parsed_url, *args, **kwargs)

def _format_path(self, s):
"""
netloc has already been set to project via `GCSPath._init`
"""
s = os.path.join(self._url.netloc, s.lstrip("/"))
return s


# project is not part of the path, but is part of the credentials
class GCSPath(upath.core.UPath):
_default_accessor = _GCSAccessor

def _init(self, *args, template=None, **kwargs):
# ensure that the bucket is part of the netloc path
if kwargs.get("bucket") and kwargs.get("_url"):
bucket = kwargs.pop("bucket")
kwargs["_url"] = kwargs["_url"]._replace(netloc=bucket)
super()._init(*args, template=template, **kwargs)

def _sub_path(self, name):
"""gcs returns path as `{bucket}/<path>` with listdir
and glob, so here we can add the netloc to the sub string
so it gets subbed out as well
"""
sp = self.path
subed = re.sub(f"^({self._url.netloc})?/?({sp}|{sp[1:]})/?", "", name)
return subed

def joinpath(self, *args):
if self._url.netloc:
return super().joinpath(*args)
# handles a bucket in the path
else:
path = args[0]
if isinstance(path, list):
args_list = list(*args)
else:
args_list = path.split(self._flavour.sep)
bucket = args_list.pop(0)
self._kwargs["bucket"] = bucket
return super().joinpath(*tuple(args_list))
19 changes: 12 additions & 7 deletions upath/implementations/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,33 @@ def __init__(self, parsed_url, *args, **kwargs):
super().__init__(parsed_url, *args, **kwargs)

def _format_path(self, s):
"""If the filesystem backend doesn't have a root_marker, strip the
leading slash of a path and add the bucket
"""
s = os.path.join(self._url.netloc, s.lstrip("/"))
return s


class S3Path(upath.core.UPath):
_default_accessor = _S3Accessor

def _init(self, *args, template=None, **kwargs):
# ensure that the bucket is part of the netloc path
if kwargs.get("bucket") and kwargs.get("_url"):
bucket = kwargs.pop("bucket")
kwargs["_url"] = kwargs["_url"]._replace(netloc=bucket)

super()._init(*args, template=template, **kwargs)

def _sub_path(self, name):
"""s3fs returns path as `{bucket}/<path>` with listdir
and glob, so here we can add the netloc to the sub string
so it gets subbed out as well
"""
sp = self.path
subed = re.sub(f"^{self._url.netloc}/({sp}|{sp[1:]})/?", "", name)
subed = re.sub(f"^({self._url.netloc})?/?({sp}|{sp[1:]})/?", "", name)
return subed

def _init(self, *args, template=None, **kwargs):
if kwargs.get("bucket") and kwargs.get("_url"):
bucket = kwargs.pop("bucket")
kwargs["_url"] = kwargs["_url"]._replace(netloc=bucket)
super()._init(*args, template=template, **kwargs)

def joinpath(self, *args):
if self._url.netloc:
return super().joinpath(*args)
Expand Down
5 changes: 4 additions & 1 deletion upath/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@


class _Registry:
from upath.implementations import hdfs, http, memory, s3
from upath.implementations import hdfs, http, memory, s3, gcs

http = http.HTTPPath
hdfs = hdfs.HDFSPath
s3a = s3.S3Path
s3 = s3.S3Path
memory = memory.MemoryPath
gs = gcs.GCSPath
gcs = gcs.GCSPath

def __getitem__(self, item):
implemented_path = getattr(self, item, None)
Expand Down
82 changes: 78 additions & 4 deletions upath/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import shlex
import time
import sys

from gcsfs.core import GCSFileSystem

import pytest
from fsspec.implementations.local import LocalFileSystem
from fsspec.registry import register_implementation, _registry

import fsspec
import requests


def pytest_addoption(parser):
parser.addoption(
Expand Down Expand Up @@ -45,13 +48,13 @@ def clear_registry():
_registry.clear()


@pytest.fixture()
@pytest.fixture(scope="function")
def tempdir(clear_registry):
with tempfile.TemporaryDirectory() as tempdir:
yield tempdir


@pytest.fixture()
@pytest.fixture(scope="function")
def local_testdir(tempdir, clear_registry):
tmp = Path(tempdir)
tmp.mkdir(exist_ok=True)
Expand Down Expand Up @@ -153,7 +156,7 @@ def s3_server():
time.sleep(0.1) # pragma: no cover
anon = False
s3so = dict(
client_kwargs={"endpoint_url": endpoint_uri}, use_listings_cache=False
client_kwargs={"endpoint_url": endpoint_uri}, use_listings_cache=True
)
yield anon, s3so
proc.terminate()
Expand All @@ -176,3 +179,74 @@ def s3(s3_server, tempdir, local_testdir):
else:
s3.mkdir(str(x))
yield anon, s3so


def stop_docker(container):
cmd = shlex.split('docker ps -a -q --filter "name=%s"' % container)
cid = subprocess.check_output(cmd).strip().decode()
if cid:
subprocess.call(["docker", "rm", "-f", "-v", cid])


TEST_PROJECT = os.environ.get("GCSFS_TEST_PROJECT", "test_project")


@pytest.fixture(scope="module")
def docker_gcs():
if "STORAGE_EMULATOR_HOST" in os.environ:
# assume using real API or otherwise have a server already set up
yield os.environ["STORAGE_EMULATOR_HOST"]
return
container = "gcsfs_test"
cmd = (
"docker run -d -p 4443:4443 --name gcsfs_test fsouza/fake-gcs-server:latest -scheme " # noqa: E501
"http -public-host http://localhost:4443 -external-url http://localhost:4443" # noqa: E501
)
stop_docker(container)
subprocess.check_output(shlex.split(cmd))
url = "http://0.0.0.0:4443"
timeout = 10
while True:
try:
r = requests.get(url + "/storage/v1/b")
if r.ok:
print("url: ", url)
yield url
break
except Exception as e: # noqa: E722
timeout -= 1
if timeout < 0:
raise SystemError from e
time.sleep(1)
stop_docker(container)


@pytest.fixture
def gcs(docker_gcs, tempdir, local_testdir, populate=True):
# from gcsfs.credentials import GoogleCredentials
GCSFileSystem.clear_instance_cache()
gcs = fsspec.filesystem("gcs", endpoint_url=docker_gcs)
try:
# ensure we're empty.
try:
gcs.rm("tmp", recursive=True)
except FileNotFoundError:
pass
try:
gcs.mkdir("tmp")
print("made tmp dir")
except Exception:
pass
if populate:
for x in Path(local_testdir).glob("**/*"):
if x.is_file():
gcs.upload(str(x), str(x))
else:
gcs.mkdir(str(x))
gcs.invalidate_cache()
yield docker_gcs
finally:
try:
gcs.rm(gcs.find("tmp"))
except: # noqa: E722
pass
64 changes: 64 additions & 0 deletions upath/tests/implementations/test_gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import pytest
import sys

from upath import UPath
from upath.implementations.gcs import GCSPath
from upath.errors import NotDirectoryError
from upath.tests.cases import BaseTests


@pytest.mark.skipif(sys.platform.startswith("win"), reason="Windows bad")
@pytest.mark.usefixtures("path")
class TestGCSPath(BaseTests):
@pytest.fixture(autouse=True, scope="function")
def path(self, local_testdir, gcs):
scheme = "gs:/"
self.path = UPath(f"{scheme}{local_testdir}", endpoint_url=gcs)
self.endpoint_url = gcs

def test_is_GCSPath(self):
assert isinstance(self.path, GCSPath)

def test_mkdir(self):
new_dir = self.path.joinpath("new_dir")
new_dir.joinpath("test.txt").touch()
assert new_dir.exists()

def test_glob(self, pathlib_base):
mock_glob = list(self.path.glob("**.txt"))
path_glob = list(pathlib_base.glob("**/*.txt"))

assert len(mock_glob) == len(path_glob)
assert all(
map(lambda m: m.path in [str(p)[4:] for p in path_glob], mock_glob)
)

def test_rmdir(self, local_testdir):
dirname = "rmdir_test"
mock_dir = self.path.joinpath(dirname)
mock_dir.joinpath("test.txt").write_text("hello")
mock_dir.fs.invalidate_cache()
mock_dir.rmdir()
assert not mock_dir.exists()
with pytest.raises(NotDirectoryError):
self.path.joinpath("file1.txt").rmdir()

def test_fsspec_compat(self):
fs = self.path.fs
scheme = self.path._url.scheme
content = b"a,b,c\n1,2,3\n4,5,6"

p1 = f"{scheme}:///tmp/output1.csv"
upath1 = UPath(p1, endpoint_url=self.endpoint_url)
upath1.write_bytes(content)
with fs.open(p1) as f:
assert f.read() == content
upath1.unlink()

# write with fsspec, read with upath
p2 = f"{scheme}:///tmp/output2.csv"
with fs.open(p2, "wb") as f:
f.write(content)
upath2 = UPath(p2, endpoint_url=self.endpoint_url)
assert upath2.read_bytes() == content
upath2.unlink()

0 comments on commit adb330d

Please sign in to comment.