From 006df24a8cf7d7e47a06141301712a7aa9a68ae7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Mon, 23 Sep 2024 11:26:13 -0400 Subject: [PATCH 1/2] implement _get_file() --- ipfsspec/async_ipfs.py | 22 ++++++++++++++++++++++ test/test_async.py | 6 ++++++ 2 files changed, 28 insertions(+) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index ad5e8d3..18a0f51 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -12,6 +12,8 @@ from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper from fsspec.exceptions import FSTimeoutError +from fsspec.callbacks import DEFAULT_CALLBACK +from fsspec.utils import isfilelike from multiformats import CID, multicodec from . import unixfsv1 @@ -293,6 +295,26 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): session = await self.set_session() return (await self.gateway.cat(path, session))[start:end] + async def _get_file( + self, rpath, lpath, chunk_size=5 * 2**20, callback=DEFAULT_CALLBACK, **kwargs + ): + # TODO: implement chunked retrieval + logger.debug(rpath) + + if isfilelike(lpath): + outfile = lpath + else: + outfile = open(lpath, "wb") # noqa: ASYNC101, ASYNC230 + + try: + content = await self._cat_file(rpath) + outfile.write(content) + callback.set_size(len(content)) + callback.relative_update(len(content)) + finally: + if not isfilelike(lpath): + outfile.close() + async def _info(self, path, **kwargs): path = self._strip_protocol(path) session = await self.set_session() diff --git a/test/test_async.py b/test/test_async.py index 0567a1f..ebaf6f3 100644 --- a/test/test_async.py +++ b/test/test_async.py @@ -80,6 +80,12 @@ async def test_cat_file(fs): assert res == REF_CONTENT[3:7] +@pytest.mark.asyncio +async def test_get_file(fs, tmp_path): + await fs._get_file(TEST_ROOT + "/default", tmp_path / "default") + assert open(tmp_path / "default", "rb").read() == REF_CONTENT + + @pytest.mark.asyncio async def test_exists(fs): res = await fs._exists(TEST_ROOT + "/default") From 7d7b0a9021156d8f0e5ef9d696de76ec7c94b954 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20K=C3=B6lling?= Date: Mon, 23 Sep 2024 11:53:37 -0400 Subject: [PATCH 2/2] implement chunking for _get_file() --- ipfsspec/async_ipfs.py | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/ipfsspec/async_ipfs.py b/ipfsspec/async_ipfs.py index 18a0f51..879ce14 100644 --- a/ipfsspec/async_ipfs.py +++ b/ipfsspec/async_ipfs.py @@ -4,6 +4,7 @@ import weakref from functools import lru_cache from pathlib import Path +from contextlib import asynccontextmanager import warnings import asyncio @@ -120,6 +121,17 @@ async def cat(self, path, session): self._raise_not_found_for_status(res, path) return await res.read() + @asynccontextmanager + async def iter_chunked(self, path, session, chunk_size): + res = await self.get(path, session) + async with res: + self._raise_not_found_for_status(res, path) + try: + size = int(res.headers["content-length"]) + except (ValueError, KeyError): + size = None + yield size, res.content.iter_chunked(chunk_size) + async def ls(self, path, session, detail=False): res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"}) self._raise_not_found_for_status(res, path) @@ -298,8 +310,9 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): async def _get_file( self, rpath, lpath, chunk_size=5 * 2**20, callback=DEFAULT_CALLBACK, **kwargs ): - # TODO: implement chunked retrieval logger.debug(rpath) + rpath = self._strip_protocol(rpath) + session = await self.set_session() if isfilelike(lpath): outfile = lpath @@ -307,10 +320,11 @@ async def _get_file( outfile = open(lpath, "wb") # noqa: ASYNC101, ASYNC230 try: - content = await self._cat_file(rpath) - outfile.write(content) - callback.set_size(len(content)) - callback.relative_update(len(content)) + async with self.gateway.iter_chunked(rpath, session, chunk_size) as (size, chunks): + callback.set_size(size) + async for chunk in chunks: + outfile.write(chunk) + callback.relative_update(len(chunk)) finally: if not isfilelike(lpath): outfile.close()