Skip to content

S3 streaming #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 23 additions & 165 deletions fs_s3fs/_s3fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@
from datetime import datetime
import io
import itertools
import os
from ssl import SSLError
import tempfile
import threading
import mimetypes

import boto3
from botocore.exceptions import ClientError, EndpointConnectionError

import six
from six import text_type

from fs import ResourceType
from fs.base import FS
Expand All @@ -28,6 +25,9 @@
from fs.subfs import SubFS
from fs.path import basename, dirname, forcedir, join, normpath, relpath
from fs.time import datetime_to_epoch
from fs.tools import copy_file_data

from ._s3fs_file import S3InputFile, S3OutputFile


def _make_repr(class_name, *args, **kwargs):
Expand Down Expand Up @@ -57,115 +57,6 @@ def __repr__(self):
return "{}({})".format(class_name, ", ".join(arguments))


class S3File(io.IOBase):
"""Proxy for a S3 file."""

@classmethod
def factory(cls, filename, mode, on_close):
"""Create a S3File backed with a temporary file."""
_temp_file = tempfile.TemporaryFile()
proxy = cls(_temp_file, filename, mode, on_close=on_close)
return proxy

def __repr__(self):
return _make_repr(
self.__class__.__name__, self.__filename, text_type(self.__mode)
)

def __init__(self, f, filename, mode, on_close=None):
self._f = f
self.__filename = filename
self.__mode = mode
self._on_close = on_close

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
self.close()

@property
def raw(self):
return self._f

def close(self):
if self._on_close is not None:
self._on_close(self)

@property
def closed(self):
return self._f.closed

def fileno(self):
return self._f.fileno()

def flush(self):
return self._f.flush()

def isatty(self):
return self._f.asatty()

def readable(self):
return self.__mode.reading

def readline(self, limit=-1):
return self._f.readline(limit)

def readlines(self, hint=-1):
if hint == -1:
return self._f.readlines(hint)
else:
size = 0
lines = []
for line in iter(self._f.readline, b""):
lines.append(line)
size += len(line)
if size > hint:
break
return lines

def seek(self, offset, whence=os.SEEK_SET):
if whence not in (os.SEEK_CUR, os.SEEK_END, os.SEEK_SET):
raise ValueError("invalid value for 'whence'")
self._f.seek(offset, whence)
return self._f.tell()

def seekable(self):
return True

def tell(self):
return self._f.tell()

def writable(self):
return self.__mode.writing

def writelines(self, lines):
return self._f.writelines(lines)

def read(self, n=-1):
if not self.__mode.reading:
raise IOError("not open for reading")
return self._f.read(n)

def readall(self):
return self._f.readall()

def readinto(self, b):
return self._f.readinto()

def write(self, b):
if not self.__mode.writing:
raise IOError("not open for reading")
self._f.write(b)
return len(b)

def truncate(self, size=None):
if size is None:
size = self._f.tell()
self._f.truncate(size)
return size


@contextlib.contextmanager
def s3errors(path):
"""Translate S3 errors to FSErrors."""
Expand Down Expand Up @@ -528,28 +419,14 @@ def openbin(self, path, mode="r", buffering=-1, **options):
_key = self._path_to_key(_path)

if _mode.create:

def on_close_create(s3file):
"""Called when the S3 file closes, to upload data."""
if self.strict:
try:
s3file.raw.seek(0)
with s3errors(path):
self.client.upload_fileobj(
s3file.raw,
self._bucket_name,
_key,
ExtraArgs=self._get_upload_args(_key),
)
finally:
s3file.raw.close()

try:
dir_path = dirname(_path)
if dir_path != "/":
_dir_key = self._path_to_dir_key(dir_path)
self._get_object(dir_path, _dir_key)
except errors.ResourceNotFound:
raise errors.ResourceNotFound(path)
dir_path = dirname(_path)
if dir_path != "/":
_dir_key = self._path_to_dir_key(dir_path)
self._get_object(dir_path, _dir_key)
except errors.ResourceNotFound:
raise errors.ResourceNotFound(path)

try:
info = self._getinfo(path)
Expand All @@ -561,50 +438,31 @@ def on_close_create(s3file):
if info.is_dir:
raise errors.FileExpected(path)

s3file = S3File.factory(path, _mode, on_close=on_close_create)
obj = self.s3.Object(self._bucket_name, _key)
s3_file = S3OutputFile(
obj,
upload_kwargs=self._get_upload_args(_key)
)
if _mode.appending:
try:
with s3errors(path):
self.client.download_fileobj(
self._bucket_name,
_key,
s3file.raw,
ExtraArgs=self.download_args,
s3_in_file = S3InputFile(obj)
copy_file_data(
s3_in_file,
s3_file,
chunk_size=s3_file._min_part_size,
)
except errors.ResourceNotFound:
pass
else:
s3file.seek(0, os.SEEK_END)

return s3file
return s3_file

if self.strict:
info = self.getinfo(path)
if info.is_dir:
raise errors.FileExpected(path)

def on_close(s3file):
"""Called when the S3 file closes, to upload the data."""
try:
if _mode.writing:
s3file.raw.seek(0, os.SEEK_SET)
with s3errors(path):
self.client.upload_fileobj(
s3file.raw,
self._bucket_name,
_key,
ExtraArgs=self._get_upload_args(_key),
)
finally:
s3file.raw.close()

s3file = S3File.factory(path, _mode, on_close=on_close)
with s3errors(path):
self.client.download_fileobj(
self._bucket_name, _key, s3file.raw, ExtraArgs=self.download_args
)
s3file.seek(0, os.SEEK_SET)
return s3file
obj = self.s3.Object(self._bucket_name, _key)
return S3InputFile(obj)

def remove(self, path):
self.check()
Expand Down
Loading