From d929cf3f07f94f3162fb041384c9d29476d0689e Mon Sep 17 00:00:00 2001 From: akharit <38331238+akharit@users.noreply.github.com> Date: Wed, 9 Jan 2019 12:54:27 -0800 Subject: [PATCH] Release 0.0.40 (#270) * Updated history.rst and version * Possible fix for issue with infinite loop on read * Refactoring * Update pkg to pep 420 standards --- HISTORY.rst | 5 ++ MANIFEST.in | 4 +- azure/__init__.py | 2 +- azure/datalake/__init__.py | 2 +- azure/datalake/store/__init__.py | 3 +- azure/datalake/store/core.py | 2 + azure/datalake/store/multiprocessor.py | 82 +++++++++++++------------- azure_bdist_wheel.py | 54 ----------------- setup.cfg | 1 - setup.py | 16 ++--- 10 files changed, 58 insertions(+), 113 deletions(-) delete mode 100644 azure_bdist_wheel.py diff --git a/HISTORY.rst b/HISTORY.rst index a8f4b97..b7d31b2 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ Release History =============== +0.0.40 (2019-01-08) ++++++++++++++++++++ +* Fix zero length read +* Remove dependence on custom wheel and conform to PEP 420 + 0.0.39 (2018-11-14) +++++++++++++++++++ * Fix for Chunked Decoding exception thrown while reading response.content diff --git a/MANIFEST.in b/MANIFEST.in index dd6a50d..7cfb314 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,4 +1,4 @@ -recursive-include azure/datalake/store *.py +recursive-include azure/datalake/store/*.py recursive-include docs *.rst include setup.py @@ -7,6 +7,6 @@ include LICENSE.txt include MANIFEST.in include HISTORY.rst include requirements.txt -include azure_bdist_wheel.py +include azure/__init__.py prune docs/_build diff --git a/azure/__init__.py b/azure/__init__.py index de40ea7..69e3be5 100644 --- a/azure/__init__.py +++ b/azure/__init__.py @@ -1 +1 @@ -__import__('pkg_resources').declare_namespace(__name__) +__path__ = __import__('pkgutil').extend_path(__path__, __name__) diff --git a/azure/datalake/__init__.py b/azure/datalake/__init__.py index de40ea7..69e3be5 100644 --- a/azure/datalake/__init__.py +++ b/azure/datalake/__init__.py @@ -1 +1 @@ -__import__('pkg_resources').declare_namespace(__name__) +__path__ = __import__('pkgutil').extend_path(__path__, __name__) diff --git a/azure/datalake/store/__init__.py b/azure/datalake/store/__init__.py index 99062ef..6bccd5a 100644 --- a/azure/datalake/store/__init__.py +++ b/azure/datalake/store/__init__.py @@ -6,8 +6,7 @@ # license information. # -------------------------------------------------------------------------- -__version__ = "0.0.39" - +__version__ = "0.0.40" from .core import AzureDLFileSystem from .multithread import ADLDownloader diff --git a/azure/datalake/store/core.py b/azure/datalake/store/core.py index e556984..c49b873 100644 --- a/azure/datalake/store/core.py +++ b/azure/datalake/store/core.py @@ -864,6 +864,8 @@ def read(self, length=-1): self._read_blocksize() data_read = self.cache[self.loc - self.start: min(self.loc - self.start + length, self.end - self.start)] + if not data_read: # Check to catch possible server errors. Ideally shouldn't happen. + break out += data_read self.loc += len(data_read) length -= len(data_read) diff --git a/azure/datalake/store/multiprocessor.py b/azure/datalake/store/multiprocessor.py index b8b673f..4dab6ef 100644 --- a/azure/datalake/store/multiprocessor.py +++ b/azure/datalake/store/multiprocessor.py @@ -6,33 +6,32 @@ import os import logging.handlers from .exceptions import FileNotFoundError - - try: from queue import Empty # Python 3 except ImportError: from Queue import Empty # Python 2 -end_queue_sentinel = [None, None] -exception = None -exception_lock = threading.Lock() +WORKER_THREAD_PER_PROCESS = 50 +QUEUE_BUCKET_SIZE = 10 +END_QUEUE_SENTINEL = [None, None] +GLOBAL_EXCEPTION = None +GLOBAL_EXCEPTION_LOCK = threading.Lock() -threading def monitor_exception(exception_queue, process_ids): - global exception + global GLOBAL_EXCEPTION logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) while True: try: - excep = exception_queue.get(timeout=0.1) - if excep == end_queue_sentinel: + local_exception = exception_queue.get(timeout=0.1) + if local_exception == END_QUEUE_SENTINEL: break logger.log(logging.DEBUG, "Setting global exception") - exception_lock.acquire() - exception = excep - exception_lock.release() + GLOBAL_EXCEPTION_LOCK.acquire() + GLOBAL_EXCEPTION = local_exception + GLOBAL_EXCEPTION_LOCK.release() logger.log(logging.DEBUG, "Closing processes") for p in process_ids: p.terminate() @@ -41,7 +40,7 @@ def monitor_exception(exception_queue, process_ids): p.join() import thread logger.log(logging.DEBUG, "Interrupting main") - raise Exception(excep) + raise Exception(local_exception) except Empty: pass @@ -51,11 +50,11 @@ def log_listener_process(queue): try: record = queue.get(timeout=0.1) queue.task_done() - if record == end_queue_sentinel: # We send this as a sentinel to tell the listener to quit. + if record == END_QUEUE_SENTINEL: # We send this as a sentinel to tell the listener to quit. break logger = logging.getLogger(record.name) logger.handlers.clear() - logger.handle(record) # No level or filter logic applied - just do it! + logger.handle(record) # No level or filter logic applied - just do it! except Empty: # Try again pass except Exception as e: @@ -65,14 +64,12 @@ def log_listener_process(queue): def multi_processor_change_acl(adl, path=None, method_name="", acl_spec="", number_of_sub_process=None): - log_queue = multiprocessing.JoinableQueue() - exception_queue = multiprocessing.Queue() logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) - queue_bucket_size = 10 - worker_thread_num_per_process = 50 def launch_processes(number_of_processes): + if number_of_processes is None: + number_of_processes = max(2, multiprocessing.cpu_count() - 1) process_list = [] for i in range(number_of_processes): process_list.append(multiprocessing.Process(target=processor, @@ -84,46 +81,50 @@ def launch_processes(number_of_processes): def walk(walk_path): try: paths = [] - all_files = adl._ls(path=walk_path) + all_files = adl.ls(path=walk_path, detail=True) for files in all_files: if files['type'] == 'DIRECTORY': dir_processed_counter.increment() # A new directory to process walk_thread_pool.submit(walk, files['name']) paths.append(files['name']) - if len(paths) == queue_bucket_size: + if len(paths) == QUEUE_BUCKET_SIZE: file_path_queue.put(list(paths)) paths = [] if paths != []: file_path_queue.put(list(paths)) # For leftover paths < bucket_size except FileNotFoundError: pass # Continue in case the file was deleted in between - except: + except Exception: import traceback logger.exception("Failed to walk for path: " + str(walk_path) + ". Exiting!") exception_queue.put(traceback.format_exc()) finally: dir_processed_counter.decrement() # Processing complete for this directory + # Initialize concurrency primitives + log_queue = multiprocessing.JoinableQueue() + exception_queue = multiprocessing.Queue() finish_queue_processing_flag = multiprocessing.Event() file_path_queue = multiprocessing.JoinableQueue() - if number_of_sub_process == None: - number_of_sub_process = max(2, multiprocessing.cpu_count()-1) + dir_processed_counter = CountUpDownLatch() + # Start relevant threads and processes + log_listener = threading.Thread(target=log_listener_process, args=(log_queue,)) + log_listener.start() child_processes = launch_processes(number_of_sub_process) exception_monitor_thread = threading.Thread(target=monitor_exception, args=(exception_queue, child_processes)) exception_monitor_thread.start() - log_listener = threading.Thread(target=log_listener_process, args=(log_queue,)) - log_listener.start() + walk_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS) - dir_processed_counter = CountUpDownLatch() - walk_thread_pool = ThreadPoolExecutor(max_workers=worker_thread_num_per_process) - - file_path_queue.put([path]) # Root directory needs to be passed + # Root directory needs to be explicitly passed + file_path_queue.put([path]) dir_processed_counter.increment() - walk(path) # Start processing root directory - if dir_processed_counter.is_zero(): # Done processing all directories. Blocking call. + # Processing starts here + walk(path) + + if dir_processed_counter.is_zero(): # Done processing all directories. Blocking call. walk_thread_pool.shutdown() file_path_queue.close() # No new elements to add file_path_queue.join() # Wait for operations to be done @@ -135,11 +136,11 @@ def walk(walk_path): # Cleanup logger.log(logging.DEBUG, "Sending exception sentinel") - exception_queue.put(end_queue_sentinel) + exception_queue.put(END_QUEUE_SENTINEL) exception_monitor_thread.join() logger.log(logging.DEBUG, "Exception monitor thread finished") logger.log(logging.DEBUG, "Sending logger sentinel") - log_queue.put(end_queue_sentinel) + log_queue.put(END_QUEUE_SENTINEL) log_queue.join() log_queue.close() logger.log(logging.DEBUG, "Log queue closed") @@ -159,21 +160,19 @@ def processor(adl, file_path_queue, finish_queue_processing_flag, method_name, a logger.setLevel(logging.DEBUG) try: - worker_thread_num_per_process = 50 func_table = {"mod_acl": adl.modify_acl_entries, "set_acl": adl.set_acl, "rem_acl": adl.remove_acl_entries} - function_thread_pool = ThreadPoolExecutor(max_workers=worker_thread_num_per_process) + function_thread_pool = ThreadPoolExecutor(max_workers=WORKER_THREAD_PER_PROCESS) adl_function = func_table[method_name] logger.log(logging.DEBUG, "Started processor pid:"+str(os.getpid())) def func_wrapper(func, path, spec): try: func(path=path, acl_spec=spec) - except FileNotFoundError as e: + except FileNotFoundError: logger.exception("File "+str(path)+" not found") - pass # Exception is being logged in the relevant acl method. Do nothing here - except: - # TODO Raise to parent process - pass + # Complete Exception is being logged in the relevant acl method. Don't print exception here + except Exception as e: + logger.exception("File " + str(path) + " not set. Exception "+str(e)) logger.log(logging.DEBUG, "Completed running on path:" + str(path)) @@ -189,7 +188,6 @@ def func_wrapper(func, path, spec): except Exception as e: import traceback - # TODO Raise to parent process logger.exception("Exception in pid "+str(os.getpid())+"Exception: " + str(e)) exception_queue.put(traceback.format_exc()) finally: diff --git a/azure_bdist_wheel.py b/azure_bdist_wheel.py deleted file mode 100644 index 8a81d1b..0000000 --- a/azure_bdist_wheel.py +++ /dev/null @@ -1,54 +0,0 @@ -#------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for -# license information. -#-------------------------------------------------------------------------- - -from distutils import log as logger -import os.path - -from wheel.bdist_wheel import bdist_wheel -class azure_bdist_wheel(bdist_wheel): - """The purpose of this class is to build wheel a little differently than the sdist, - without requiring to build the wheel from the sdist (i.e. you can build the wheel - directly from source). - """ - - description = "Create an Azure wheel distribution" - - user_options = bdist_wheel.user_options + \ - [('azure-namespace-package=', None, - "Name of the deepest nspkg used")] - - def initialize_options(self): - bdist_wheel.initialize_options(self) - self.azure_namespace_package = None - - def finalize_options(self): - bdist_wheel.finalize_options(self) - if self.azure_namespace_package and not self.azure_namespace_package.endswith("-nspkg"): - raise ValueError("azure_namespace_package must finish by -nspkg") - - def run(self): - if not self.distribution.install_requires: - self.distribution.install_requires = [] - self.distribution.install_requires.append( - "{}>=2.0.0".format(self.azure_namespace_package)) - bdist_wheel.run(self) - - def write_record(self, bdist_dir, distinfo_dir): - if self.azure_namespace_package: - # Split and remove last part, assuming it's "nspkg" - subparts = self.azure_namespace_package.split('-')[0:-1] - folder_with_init = [os.path.join(*subparts[0:i+1]) for i in range(len(subparts))] - for azure_sub_package in folder_with_init: - init_file = os.path.join(bdist_dir, azure_sub_package, '__init__.py') - if os.path.isfile(init_file): - logger.info("manually remove {} while building the wheel".format(init_file)) - os.remove(init_file) - else: - raise ValueError("Unable to find {}. Are you sure of your namespace package?".format(init_file)) - bdist_wheel.write_record(self, bdist_dir, distinfo_dir) -cmdclass = { - 'bdist_wheel': azure_bdist_wheel, -} diff --git a/setup.cfg b/setup.cfg index ccdace2..3c6e79c 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,2 @@ [bdist_wheel] universal=1 -azure-namespace-package=azure-nspkg diff --git a/setup.py b/setup.py index b79baa4..bd63f9e 100644 --- a/setup.py +++ b/setup.py @@ -1,15 +1,8 @@ #!/usr/bin/env python -import os from setuptools import find_packages, setup from io import open import re -try: - from azure_bdist_wheel import cmdclass -except ImportError: - from distutils import log as logger - logger.warn("Wheel is not available, disabling bdist_wheel hook") - cmdclass = {} with open('README.rst', encoding='utf-8') as f: readme = f.read() @@ -44,17 +37,20 @@ 'Programming Language :: Python :: 3.6', 'License :: OSI Approved :: MIT License', ], - packages=find_packages(exclude=['tests']), + packages=find_packages(exclude=['tests', + # Exclude packages that will be covered by PEP420 or nspkg + 'azure', + ]), install_requires=[ 'cffi', 'adal>=0.4.2', - 'requests>=2.20.0' + 'requests>=2.20.0', ], extras_require={ ":python_version<'3.4'": ['pathlib2'], ":python_version<='2.7'": ['futures'], + ":python_version<'3.0'": ['azure-nspkg'], }, long_description=readme + '\n\n' + history, zip_safe=False, - cmdclass=cmdclass )