Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make exclusive/atomic writes formally possible #1749

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ async def _copy(
continue
raise ex

async def _pipe_file(self, path, value, **kwargs):
async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
raise NotImplementedError

async def _pipe(self, path, value=None, batch_size=None, **kwargs):
Expand Down Expand Up @@ -517,7 +517,7 @@ async def _cat_ranges(
coros, batch_size=batch_size, nofiles=True, return_exceptions=True
)

async def _put_file(self, lpath, rpath, **kwargs):
async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
raise NotImplementedError

async def _put(
Expand Down
4 changes: 4 additions & 0 deletions fsspec/implementations/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,12 @@ async def _put_file(
chunk_size=5 * 2**20,
callback=DEFAULT_CALLBACK,
method="post",
mode="overwrite",
**kwargs,
):
if mode != "overwrite":
raise NotImplementedError("Exclusive write")

async def gen_chunks():
# Support passing arbitrary file-like objects
# and use them instead of streams.
Expand Down
7 changes: 5 additions & 2 deletions fsspec/implementations/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,13 @@ def makedirs(self, path, exist_ok=False):
if not exist_ok:
raise

def pipe_file(self, path, value, **kwargs):
def pipe_file(self, path, value, mode="overwrite", **kwargs):
"""Set the bytes of given file

Avoids copies of the data if possible
"""
self.open(path, "wb", data=value)
mode = "xb" if mode == "create" else "wb"
self.open(path, mode=mode, data=value)

def rmdir(self, path):
path = self._strip_protocol(path)
Expand Down Expand Up @@ -178,6 +179,8 @@ def _open(
**kwargs,
):
path = self._strip_protocol(path)
if "x" in mode and self.exists(path):
raise FileExistsError
if path in self.pseudo_dirs:
raise IsADirectoryError(path)
parent = path
Expand Down
8 changes: 6 additions & 2 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,13 +1181,17 @@ async def _rm_file(self, path, **kwargs):
) # ignores FileNotFound, just as well for directories
self.dircache.clear() # this is a bit heavy handed

async def _pipe_file(self, path, data):
async def _pipe_file(self, path, data, mode="overwrite", **kwargs):
if mode == "create" and self.exists(path):
raise FileExistsError
# can be str or bytes
self.references[path] = data
self.dircache.clear() # this is a bit heavy handed

async def _put_file(self, lpath, rpath, **kwargs):
async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
# puts binary
if mode == "create" and self.exists(rpath):
raise FileExistsError
with open(lpath, "rb") as f:
self.references[rpath] = f.read()
self.dircache.clear() # this is a bit heavy handed
Expand Down
18 changes: 12 additions & 6 deletions fsspec/implementations/tests/test_smb.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

pytest.importorskip("smbprotocol")


def delay_rerun(*args):
time.sleep(0.1)
return True


# ruff: noqa: F821

if os.environ.get("WSL_INTEROP"):
Expand Down Expand Up @@ -72,7 +78,7 @@ def smb_params(request):
stop_docker(container)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_simple(smb_params):
adir = "/home/adir"
adir2 = "/home/adir/otherdir/"
Expand All @@ -89,7 +95,7 @@ def test_simple(smb_params):
assert not fsmb.exists(adir)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_auto_mkdir(smb_params):
adir = "/home/adir"
adir2 = "/home/adir/otherdir/"
Expand All @@ -116,7 +122,7 @@ def test_auto_mkdir(smb_params):
assert not fsmb.exists(another_dir)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_with_url(smb_params):
if smb_params["port"] is None:
smb_url = "smb://{username}:{password}@{host}/home/someuser.txt"
Expand All @@ -131,7 +137,7 @@ def test_with_url(smb_params):
assert read_result == b"hello"


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_transaction(smb_params):
afile = "/home/afolder/otherdir/afile"
afile2 = "/home/afolder/otherdir/afile2"
Expand All @@ -152,14 +158,14 @@ def test_transaction(smb_params):
assert fsmb.find(adir) == [afile, afile2]


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_makedirs_exist_ok(smb_params):
fsmb = fsspec.get_filesystem_class("smb")(**smb_params)
fsmb.makedirs("/home/a/b/c")
fsmb.makedirs("/home/a/b/c", exist_ok=True)


@pytest.mark.flaky(reruns=2, reruns_delay=2)
@pytest.mark.flaky(max_runs=3, rerun_filter=delay_rerun)
def test_rename_from_upath(smb_params):
fsmb = fsspec.get_filesystem_class("smb")(**smb_params)
fsmb.makedirs("/home/a/b/c", exist_ok=True)
Expand Down
46 changes: 29 additions & 17 deletions fsspec/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,8 +780,12 @@ def cat_file(self, path, start=None, end=None, **kwargs):
return f.read(end - f.tell())
return f.read()

def pipe_file(self, path, value, **kwargs):
def pipe_file(self, path, value, mode="overwrite", **kwargs):
"""Set the bytes of given file"""
if mode == "create" and self.exists(path):
# non-atomic but simple way; or could use "xb" in open(), which is likely
# not as well supported
raise FileExistsError
with self.open(path, "wb", **kwargs) as f:
f.write(value)

Expand Down Expand Up @@ -973,8 +977,12 @@ def get(
with callback.branched(rpath, lpath) as child:
self.get_file(rpath, lpath, callback=child, **kwargs)

def put_file(self, lpath, rpath, callback=DEFAULT_CALLBACK, **kwargs):
def put_file(
self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs
):
"""Copy single file to remote"""
if mode == "create" and self.exists(rpath):
raise FileExistsError
if os.path.isdir(lpath):
self.makedirs(rpath, exist_ok=True)
return None
Expand Down Expand Up @@ -1264,6 +1272,9 @@ def open(
Target file
mode: str like 'rb', 'w'
See builtin ``open()``
Mode "x" (exclusive write) may be implemented by the backend. Even if
it is, whether it is checked up front or on commit, and whether it is
atomic is implementation-dependent.
block_size: int
Some indication of buffering - this is a value in bytes
cache_options : dict, optional
Expand Down Expand Up @@ -1797,7 +1808,7 @@ def discard(self):

def info(self):
"""File information about this path"""
if "r" in self.mode:
if self.readable():
return self.details
else:
raise ValueError("Info not available while writing")
Expand Down Expand Up @@ -1844,7 +1855,7 @@ def write(self, data):
data: bytes
Set of bytes to be written.
"""
if self.mode not in {"wb", "ab"}:
if not self.writable():
raise ValueError("File not in write mode")
if self.closed:
raise ValueError("I/O operation on closed file.")
Expand Down Expand Up @@ -1877,7 +1888,7 @@ def flush(self, force=False):
if force:
self.forced = True

if self.mode not in {"wb", "ab"}:
if self.readable():
# no-op to flush on read-mode
return

Expand Down Expand Up @@ -2026,29 +2037,30 @@ def close(self):
return
if self.closed:
return
if self.mode == "rb":
self.cache = None
else:
if not self.forced:
self.flush(force=True)

if self.fs is not None:
self.fs.invalidate_cache(self.path)
self.fs.invalidate_cache(self.fs._parent(self.path))
try:
if self.mode == "rb":
self.cache = None
else:
if not self.forced:
self.flush(force=True)

self.closed = True
if self.fs is not None:
self.fs.invalidate_cache(self.path)
self.fs.invalidate_cache(self.fs._parent(self.path))
finally:
self.closed = True

def readable(self):
"""Whether opened for reading"""
return self.mode == "rb" and not self.closed
return "r" in self.mode and not self.closed

def seekable(self):
"""Whether is seekable (only in read mode)"""
return self.readable()

def writable(self):
"""Whether opened for writing"""
return self.mode in {"wb", "ab"} and not self.closed
return self.mode in {"wb", "ab", "xb"} and not self.closed

def __del__(self):
if not self.closed:
Expand Down
8 changes: 8 additions & 0 deletions fsspec/tests/abstract/put.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,14 @@ def test_put_directory_without_files_with_same_name_prefix(
assert fs.isfile(fs_join(fs_target, "subdir", "subfile.txt"))
assert fs.isfile(fs_join(fs_target, "subdir.txt"))

def test_pipe_exclusive(self, fs, fs_target):
fs.pipe_file(fs_target, b"data")
assert fs.cat_file(fs_target) == b"data"
with pytest.raises(FileExistsError):
fs.pipe_file(fs_target, b"data", mode="create")
fs.pipe_file(fs_target, b"new data", mode="overwrite")
assert fs.cat_file(fs_target) == b"new data"

def test_copy_with_source_and_destination_as_list(
self, fs, fs_target, fs_join, local_join, local_10_files_with_hashed_names
):
Expand Down
Loading