diff --git a/fsspec/asyn.py b/fsspec/asyn.py index de41839ea..16623e3d9 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -408,7 +408,7 @@ async def _copy( continue raise ex - async def _pipe_file(self, path, value, **kwargs): + async def _pipe_file(self, path, value, mode="overwrite", **kwargs): raise NotImplementedError async def _pipe(self, path, value=None, batch_size=None, **kwargs): @@ -517,7 +517,7 @@ async def _cat_ranges( coros, batch_size=batch_size, nofiles=True, return_exceptions=True ) - async def _put_file(self, lpath, rpath, **kwargs): + async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs): raise NotImplementedError async def _put( diff --git a/fsspec/implementations/http.py b/fsspec/implementations/http.py index 47dfb88f9..ed1794f9b 100644 --- a/fsspec/implementations/http.py +++ b/fsspec/implementations/http.py @@ -273,8 +273,12 @@ async def _put_file( chunk_size=5 * 2**20, callback=DEFAULT_CALLBACK, method="post", + mode="overwrite", **kwargs, ): + if mode != "overwrite": + raise NotImplementedError("Exclusive write") + async def gen_chunks(): # Support passing arbitrary file-like objects # and use them instead of streams. diff --git a/fsspec/implementations/memory.py b/fsspec/implementations/memory.py index 3b6f0d204..291c806b1 100644 --- a/fsspec/implementations/memory.py +++ b/fsspec/implementations/memory.py @@ -126,12 +126,13 @@ def makedirs(self, path, exist_ok=False): if not exist_ok: raise - def pipe_file(self, path, value, **kwargs): + def pipe_file(self, path, value, mode="overwrite", **kwargs): """Set the bytes of given file Avoids copies of the data if possible """ - self.open(path, "wb", data=value) + mode = "xb" if mode == "create" else "wb" + self.open(path, mode=mode, data=value) def rmdir(self, path): path = self._strip_protocol(path) @@ -178,6 +179,8 @@ def _open( **kwargs, ): path = self._strip_protocol(path) + if "x" in mode and self.exists(path): + raise FileExistsError if path in self.pseudo_dirs: raise IsADirectoryError(path) parent = path diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index 79bff141a..42d8ff3bd 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -1181,13 +1181,17 @@ async def _rm_file(self, path, **kwargs): ) # ignores FileNotFound, just as well for directories self.dircache.clear() # this is a bit heavy handed - async def _pipe_file(self, path, data): + async def _pipe_file(self, path, data, mode="overwrite", **kwargs): + if mode == "create" and self.exists(path): + raise FileExistsError # can be str or bytes self.references[path] = data self.dircache.clear() # this is a bit heavy handed - async def _put_file(self, lpath, rpath, **kwargs): + async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs): # puts binary + if mode == "create" and self.exists(rpath): + raise FileExistsError with open(lpath, "rb") as f: self.references[rpath] = f.read() self.dircache.clear() # this is a bit heavy handed diff --git a/fsspec/implementations/tests/test_smb.py b/fsspec/implementations/tests/test_smb.py index a83e3cc91..a0b73ff95 100644 --- a/fsspec/implementations/tests/test_smb.py +++ b/fsspec/implementations/tests/test_smb.py @@ -14,6 +14,12 @@ pytest.importorskip("smbprotocol") + +def delay_rerun(*args): + time.sleep(0.1) + return True + + # ruff: noqa: F821 if os.environ.get("WSL_INTEROP"): @@ -72,7 +78,7 @@ def smb_params(request): stop_docker(container) -@pytest.mark.flaky(reruns=2, reruns_delay=2) +@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun) def test_simple(smb_params): adir = "/home/adir" adir2 = "/home/adir/otherdir/" @@ -89,7 +95,7 @@ def test_simple(smb_params): assert not fsmb.exists(adir) -@pytest.mark.flaky(reruns=2, reruns_delay=2) +@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun) def test_auto_mkdir(smb_params): adir = "/home/adir" adir2 = "/home/adir/otherdir/" @@ -116,7 +122,7 @@ def test_auto_mkdir(smb_params): assert not fsmb.exists(another_dir) -@pytest.mark.flaky(reruns=2, reruns_delay=2) +@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun) def test_with_url(smb_params): if smb_params["port"] is None: smb_url = "smb://{username}:{password}@{host}/home/someuser.txt" @@ -131,7 +137,7 @@ def test_with_url(smb_params): assert read_result == b"hello" -@pytest.mark.flaky(reruns=2, reruns_delay=2) +@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun) def test_transaction(smb_params): afile = "/home/afolder/otherdir/afile" afile2 = "/home/afolder/otherdir/afile2" @@ -152,14 +158,14 @@ def test_transaction(smb_params): assert fsmb.find(adir) == [afile, afile2] -@pytest.mark.flaky(reruns=2, reruns_delay=2) +@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun) def test_makedirs_exist_ok(smb_params): fsmb = fsspec.get_filesystem_class("smb")(**smb_params) fsmb.makedirs("/home/a/b/c") fsmb.makedirs("/home/a/b/c", exist_ok=True) -@pytest.mark.flaky(reruns=2, reruns_delay=2) +@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun) def test_rename_from_upath(smb_params): fsmb = fsspec.get_filesystem_class("smb")(**smb_params) fsmb.makedirs("/home/a/b/c", exist_ok=True) diff --git a/fsspec/spec.py b/fsspec/spec.py index 9659f2e98..81d2f599a 100644 --- a/fsspec/spec.py +++ b/fsspec/spec.py @@ -780,8 +780,12 @@ def cat_file(self, path, start=None, end=None, **kwargs): return f.read(end - f.tell()) return f.read() - def pipe_file(self, path, value, **kwargs): + def pipe_file(self, path, value, mode="overwrite", **kwargs): """Set the bytes of given file""" + if mode == "create" and self.exists(path): + # non-atomic but simple way; or could use "xb" in open(), which is likely + # not as well supported + raise FileExistsError with self.open(path, "wb", **kwargs) as f: f.write(value) @@ -973,8 +977,12 @@ def get( with callback.branched(rpath, lpath) as child: self.get_file(rpath, lpath, callback=child, **kwargs) - def put_file(self, lpath, rpath, callback=DEFAULT_CALLBACK, **kwargs): + def put_file( + self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs + ): """Copy single file to remote""" + if mode == "create" and self.exists(rpath): + raise FileExistsError if os.path.isdir(lpath): self.makedirs(rpath, exist_ok=True) return None @@ -1264,6 +1272,9 @@ def open( Target file mode: str like 'rb', 'w' See builtin ``open()`` + Mode "x" (exclusive write) may be implemented by the backend. Even if + it is, whether it is checked up front or on commit, and whether it is + atomic is implementation-dependent. block_size: int Some indication of buffering - this is a value in bytes cache_options : dict, optional @@ -1797,7 +1808,7 @@ def discard(self): def info(self): """File information about this path""" - if "r" in self.mode: + if self.readable(): return self.details else: raise ValueError("Info not available while writing") @@ -1844,7 +1855,7 @@ def write(self, data): data: bytes Set of bytes to be written. """ - if self.mode not in {"wb", "ab"}: + if not self.writable(): raise ValueError("File not in write mode") if self.closed: raise ValueError("I/O operation on closed file.") @@ -1877,7 +1888,7 @@ def flush(self, force=False): if force: self.forced = True - if self.mode not in {"wb", "ab"}: + if self.readable(): # no-op to flush on read-mode return @@ -2026,21 +2037,22 @@ def close(self): return if self.closed: return - if self.mode == "rb": - self.cache = None - else: - if not self.forced: - self.flush(force=True) - - if self.fs is not None: - self.fs.invalidate_cache(self.path) - self.fs.invalidate_cache(self.fs._parent(self.path)) + try: + if self.mode == "rb": + self.cache = None + else: + if not self.forced: + self.flush(force=True) - self.closed = True + if self.fs is not None: + self.fs.invalidate_cache(self.path) + self.fs.invalidate_cache(self.fs._parent(self.path)) + finally: + self.closed = True def readable(self): """Whether opened for reading""" - return self.mode == "rb" and not self.closed + return "r" in self.mode and not self.closed def seekable(self): """Whether is seekable (only in read mode)""" @@ -2048,7 +2060,7 @@ def seekable(self): def writable(self): """Whether opened for writing""" - return self.mode in {"wb", "ab"} and not self.closed + return self.mode in {"wb", "ab", "xb"} and not self.closed def __del__(self): if not self.closed: diff --git a/fsspec/tests/abstract/put.py b/fsspec/tests/abstract/put.py index 9fc349977..21bd1aa2b 100644 --- a/fsspec/tests/abstract/put.py +++ b/fsspec/tests/abstract/put.py @@ -568,6 +568,14 @@ def test_put_directory_without_files_with_same_name_prefix( assert fs.isfile(fs_join(fs_target, "subdir", "subfile.txt")) assert fs.isfile(fs_join(fs_target, "subdir.txt")) + def test_pipe_exclusive(self, fs, fs_target): + fs.pipe_file(fs_target, b"data") + assert fs.cat_file(fs_target) == b"data" + with pytest.raises(FileExistsError): + fs.pipe_file(fs_target, b"data", mode="create") + fs.pipe_file(fs_target, b"new data", mode="overwrite") + assert fs.cat_file(fs_target) == b"new data" + def test_copy_with_source_and_destination_as_list( self, fs, fs_target, fs_join, local_join, local_10_files_with_hashed_names ):