Skip to content

Commit

Permalink
Fix to maintain directory structure for single files and empty dirs (#…
Browse files Browse the repository at this point in the history
…224)

* Fix for issue 214

* Fix path conversion on Windows and Linux

* Remove superfluous assignment

* Fix empty directories not being downloaded

* Fix for test failure for single file to single file downloads

* Fix for empty directory correct relative path

* Added tests for single file download and empty dir download

* Fix for python 2.7 os.makedirs

* Updated History.rst

* Updated version number
  • Loading branch information
akharit authored Jul 27, 2018
1 parent f50819a commit 381f123
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 8 deletions.
4 changes: 4 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
Release History
===============

0.0.25 (2018-07-26)
+++++++++++++++++++
* Fixed downloading of empty directories and download of directory structure with only a single file

0.0.24 (2018-07-16)
+++++++++++++++++++
* Retry policy implemented for all operations, default being Exponential Retry Policy
Expand Down
2 changes: 1 addition & 1 deletion azure/datalake/store/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# license information.
# --------------------------------------------------------------------------

__version__ = "0.0.24"
__version__ = "0.0.25"

from .core import AzureDLFileSystem
from .multithread import ADLDownloader
Expand Down
12 changes: 11 additions & 1 deletion azure/datalake/store/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(self, token=None, **kwargs):
self.kwargs = kwargs
self.connect()
self.dirs = {}
self._emptyDirs = []
AzureDLFileSystem._singleton[0] = self

@classmethod
Expand Down Expand Up @@ -179,11 +180,20 @@ def info(self, path, invalidate_cache=True, expected_error_code=None):

def _walk(self, path, invalidate_cache=True):
fi = list(self._ls(path, invalidate_cache))
self._emptyDirs = []
for apath in fi:
if apath['type'] == 'DIRECTORY':
fi.extend(self._ls(apath['name'], invalidate_cache))
sub_elements = self._ls(apath['name'], invalidate_cache)
if not sub_elements:
self._emptyDirs.append(apath)
else:
fi.extend(sub_elements)
return [f for f in fi if f['type'] == 'FILE']

def _empty_dirs_to_add(self):
""" Returns directories found empty during walk. Only for internal use"""
return self._emptyDirs

def walk(self, path='', details=False, invalidate_cache=True):
""" Get all files below given path
"""
Expand Down
19 changes: 14 additions & 5 deletions azure/datalake/store/multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,17 @@ def _setup(self):
rfiles = self.client._adlfs.walk(self.rpath, details=True, invalidate_cache=True)
else:
rfiles = self.client._adlfs.glob(self.rpath, details=True, invalidate_cache=True)
if len(rfiles) > 1:
local_rel_rpath = str(AzureDLPath(self.rpath).trim().globless_prefix)
file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'] +'.inprogress', local_rel_rpath)), f)
for f in rfiles]
elif len(rfiles) == 1:

if len(rfiles) == 1 and os.path.abspath(rfiles[0]['name']) == os.path.abspath(self.rpath):
if os.path.exists(self.lpath) and os.path.isdir(self.lpath):
file_pairs = [(os.path.join(self.lpath, os.path.basename(rfiles[0]['name'] + '.inprogress')),
rfiles[0])]
else:
file_pairs = [(self.lpath, rfiles[0])]
elif len(rfiles) >= 1:
local_rel_rpath = str(AzureDLPath(self.rpath).trim().globless_prefix)
file_pairs = [(os.path.join(self.lpath, os.path.relpath(f['name'] +'.inprogress', local_rel_rpath)), f)
for f in rfiles]
else:
raise ValueError('No files to download')

Expand Down Expand Up @@ -242,6 +243,14 @@ def touch(self, src, dst):
with open(dst, 'wb'):
pass

for empty_directory in self.client._adlfs._empty_dirs_to_add():
local_rel_rpath = str(AzureDLPath(self.rpath).trim().globless_prefix)
path = os.path.join(self.lpath, os.path.relpath(empty_directory['name'], local_rel_rpath))
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
self.client.run(nthreads, monitor, before_start=touch)

def active(self):
Expand Down
3 changes: 2 additions & 1 deletion azure/datalake/store/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,11 +489,12 @@ def run(self, nthreads=None, monitor=True, before_start=None):
self._nthreads = nthreads or self._nthreads
self._ffutures = {}
self._cfutures = {}

for src, dst in self._files:
if before_start:
before_start(self._adlfs, src, dst)
self._start(src, dst)
before_start = None

if monitor:
self.monitor()
has_errors = False
Expand Down
18 changes: 18 additions & 0 deletions tests/test_multithread.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def setup_tree(azure):
for filename in ['x.csv', 'y.csv', 'z.txt']:
with azure.open(test_dir / directory / filename, 'wb') as f:
f.write(b'123456')
azure.mkdir(test_dir / 'data/empty')
azure.mkdir(test_dir / 'data/single/single')
with azure.open(test_dir / 'data/single/single'/ 'single.txt', 'wb') as f:
f.write(b'123456')
try:
yield
finally:
Expand Down Expand Up @@ -132,6 +136,20 @@ def test_download_single_to_dir(tempdir, azure):
if os.path.isfile(fname):
os.remove(fname)

@my_vcr.use_cassette
def test_download_empty_directory(tempdir, azure):
with setup_tree(azure):
down = ADLDownloader(azure, test_dir, tempdir, 1, 2 ** 24, overwrite=True)
dirname = os.path.join(tempdir, 'data/empty')
assert os.path.isdir(dirname)

@my_vcr.use_cassette
def test_download_single_file_in_directory(tempdir, azure):
with setup_tree(azure):
down = ADLDownloader(azure, test_dir, tempdir, 1, 2 ** 24, overwrite=True)
dirname = os.path.join(tempdir, 'data/single/single')
assert os.path.isdir(dirname)
assert os.path.isfile(os.path.join(dirname,'single.txt'))

@my_vcr.use_cassette
def test_download_many(tempdir, azure):
Expand Down

0 comments on commit 381f123

Please sign in to comment.