Skip to content

Commit

Permalink
test streaming implementation (issue ResearchObject#205)
Browse files Browse the repository at this point in the history
  • Loading branch information
dnlbauer committed Jan 15, 2025
1 parent e22ff83 commit 57a6d27
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 15 deletions.
21 changes: 21 additions & 0 deletions rocrate/memory_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from io import RawIOBase

class MemoryBuffer(RawIOBase):
""" Memory buffer provides a writable stream that can be read back.
Automatically resets after reading. """
def __init__(self):
self._buffer = b""

def writable(self):
return True

def write(self, b):
if self.closed:
raise RuntimeError("Stream war closed before writing!")
self._buffer += b
return len(b)

def read(self):
chunk = self._buffer
self._buffer = b""
return chunk
10 changes: 10 additions & 0 deletions rocrate/model/data_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,20 @@
# limitations under the License.


from typing import Generator
from .entity import Entity


class DataEntity(Entity):

def write(self, base_path):
# TODO we could implement write() here to use stream of child classes.
pass

def stream(self) -> Generator[tuple[str, bytes], None, None]:
""" Stream the data from the source. Each chunk of the data is yielded as a tuple
containing the name of the destination file relative to the crate and the chunk of data.
The destination file name is required because a DataEntity can be a file or a
collection of files (Dataset).
"""
pass
54 changes: 54 additions & 0 deletions rocrate/model/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
# limitations under the License.

import errno
from io import BytesIO, StringIO
import os
import shutil
from pathlib import Path
from typing import Generator
from urllib.request import urlopen

import requests

from .file_or_dir import FileOrDir
from ..utils import is_url, iso_now

Expand Down Expand Up @@ -77,3 +81,53 @@ def __get_parts(self, out_path):
part_out_path = out_path / part
with urlopen(part_uri) as r, open(part_out_path, 'wb') as f:
shutil.copyfileobj(r, f)

def stream_parts(self):
out_path = Path(self.id) # relative output path
if is_url(str(self.source)):
if self.validate_url and not self.fetch_remote:
with urlopen(self.source) as _:
self._jsonld['sdDatePublished'] = iso_now()
if self.fetch_remote:
base = self.source.rstrip("/")
for entry in self._jsonld.get("hasPart", []):
try:
part = entry["@id"]
except KeyError:
continue
if is_url(part) or part.startswith("/"):
raise RuntimeError(f"'{self.source}': part '{part}' is not a relative path")
part_uri = f"{base}/{part}"
part_out_path = out_path / part
yield from self.__stream_file(part_uri, part_out_path)
elif self.source is not None:
if not Path(self.source).exists():
raise FileNotFoundError(
errno.ENOENT, os.strerror(errno.ENOENT), str(self.source)
)
if not self.crate.source: # TODO what does this line do??
# code from rocrate.__copy_unlisted
for root, _, files in os.walk(self.source):
root = Path(root)
for name in files:
source = root / name
rel = source.relative_to(self.source)
dest = out_path / rel
yield from self.__stream_file(source, dest)

def __stream_file(self, source, dest):
# from file.py
if isinstance(source, (BytesIO, StringIO)):
read = self.source.read()
while len(read) > 0:
yield read
read = self.source.read()
elif is_url(str(source)):
response = requests.get(source, stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=8192):
yield str(dest), chunk
else:
with open(source, 'rb') as f:
for chunk in f:
yield str(dest), chunk
42 changes: 42 additions & 0 deletions rocrate/model/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# limitations under the License.

from pathlib import Path
from typing import Generator
import requests
import shutil
import urllib.request
Expand Down Expand Up @@ -78,3 +79,44 @@ def write(self, base_path):
shutil.copy(self.source, out_file_path)
if self.record_size:
self._jsonld['contentSize'] = str(out_file_path.stat().st_size)

def stream(self) -> Generator[tuple[str, bytes], None, None]:
# TODO code duplication from self.write. Refactor to use stream?
size = 0
if isinstance(self.source, (BytesIO, StringIO)):
read = self.source.read()
while len(read) > 0:
yield self.id, read
size += len(read)
read = self.source.read()
elif is_url(str(self.source)):
if self.fetch_remote or self.validate_url:
if self.validate_url:
if self.source.startswith("http"):
with requests.head(self.source) as response:
self._jsonld.update({
'contentSize': response.headers.get('Content-Length'),
'encodingFormat': response.headers.get('Content-Type')
})
if not self.fetch_remote:
date_published = response.headers.get("Last-Modified", iso_now())
self._jsonld['sdDatePublished'] = date_published
if self.fetch_remote:
self._jsonld['contentUrl'] = str(self.source)
response = requests.get(self.source, stream=True)
response.raise_for_status()
for chunk in response.iter_content(chunk_size=8192):
yield self.id, chunk
size += len(chunk)
elif self.source is None:
# Allows to record a File entity whose @id does not exist, see #73
warnings.warn(f"No source for {self.id}")
else:
with open(self.source, 'rb') as f:
for chunk in f:
yield self.id, chunk
size += len(chunk)

if self.record_size:
self._jsonld['contentSize'] = str(size)

10 changes: 7 additions & 3 deletions rocrate/model/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,13 @@ def generate(self):

def write(self, base_path):
write_path = Path(base_path) / self.id
as_jsonld = self.generate()
with open(write_path, 'w', encoding='utf-8') as outfile:
json.dump(as_jsonld, outfile, indent=4, sort_keys=True)
with open(write_path, 'wb') as outfile:
for chunk in self.stream():
outfile.write(chunk)

def stream(self):
content = self.generate()
yield str.encode(json.dumps(content, indent=4, sort_keys=True), encoding='utf-8')

@property
def root(self) -> Dataset:
Expand Down
12 changes: 9 additions & 3 deletions rocrate/model/preview.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ def write(self, dest_base):
super().write(dest_base)
else:
write_path = Path(dest_base) / self.id
out_html = self.generate_html()
with open(write_path, 'w', encoding='utf-8') as outfile:
outfile.write(out_html)
with open(write_path, 'wb') as outfile:
for chunk in self.stream():
outfile.write(chunk)

def stream(self):
if self.source:
super().stream()
else:
yield str.encode(self.generate_html(), encoding='utf-8')
47 changes: 38 additions & 9 deletions rocrate/rocrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from bz2 import compress
import errno
import uuid
import zipfile
Expand All @@ -31,6 +32,8 @@
from pathlib import Path
from urllib.parse import urljoin

from rocrate.memory_buffer import MemoryBuffer

from .model import (
ComputationalWorkflow,
ComputerLanguage,
Expand Down Expand Up @@ -469,15 +472,41 @@ def write(self, base_path):

def write_zip(self, out_path):
out_path = Path(out_path)
if out_path.suffix == ".zip":
out_path = out_path.parent / out_path.stem
tmp_dir = tempfile.mkdtemp(prefix="rocrate_")
try:
self.write(tmp_dir)
archive = shutil.make_archive(out_path, "zip", tmp_dir)
finally:
shutil.rmtree(tmp_dir)
return archive
with open(out_path, "wb") as out_file:
for chunk in self.stream_zip():
out_file.write(chunk)
return out_path

# TODO use context manager? https://docs.python.org/3/library/contextlib.html
def stream_zip(self):
buffer = MemoryBuffer()
with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive:
for writeable_entity in self.data_entities + self.default_entities:
if isinstance(writeable_entity, Dataset):
current_file_path = None
current_out_file = None
for path, chunk in writeable_entity.stream_parts():
if path != current_file_path:
if current_out_file is not None:
current_out_file.close()
current_file_path = path
current_out_file = archive.open(path, mode="w")
print(current_file_path, current_out_file)
current_out_file.write(chunk)
yield buffer.read()
if current_out_file is not None:
current_out_file.close()
else:
rel_path = writeable_entity.id
with archive.open(rel_path, mode="w") as out_file:
for chunk in writeable_entity.stream():
out_file.write(chunk)
yield buffer.read()

yield buffer.read()
buffer.close()



def add_workflow(
self, source=None, dest_path=None, fetch_remote=False, validate_url=False, properties=None,
Expand Down

0 comments on commit 57a6d27

Please sign in to comment.