From 9fb595a1cbeed662bedc29c60c06c70615567124 Mon Sep 17 00:00:00 2001 From: Arjun Barrett Date: Wed, 3 Jul 2024 14:08:32 -0700 Subject: [PATCH 1/5] fix fifo hangs --- python/vmaf/core/executor.py | 121 ++++++++++++++---------------- python/vmaf/core/raw_extractor.py | 25 ++---- 2 files changed, 61 insertions(+), 85 deletions(-) diff --git a/python/vmaf/core/executor.py b/python/vmaf/core/executor.py index d2da1375d..fb032a963 100644 --- a/python/vmaf/core/executor.py +++ b/python/vmaf/core/executor.py @@ -276,24 +276,6 @@ def _get_workfile_yuv_type(asset): assert asset.ref_yuv_type == asset.dis_yuv_type, "YUV types for ref and dis do not match." return asset.ref_yuv_type - def _wait_for_workfiles(self, asset): - # wait til workfile paths being generated - for i in range(10): - if os.path.exists(asset.ref_workfile_path) and os.path.exists(asset.dis_workfile_path): - break - sleep(0.1) - else: - raise RuntimeError("ref or dis video workfile path {ref} or {dis} is missing.".format(ref=asset.ref_workfile_path, dis=asset.dis_workfile_path)) - - def _wait_for_procfiles(self, asset): - # wait til procfile paths being generated - for i in range(10): - if os.path.exists(asset.ref_procfile_path) and os.path.exists(asset.dis_procfile_path): - break - sleep(0.1) - else: - raise RuntimeError("ref or dis video procfile path {ref} or {dis} is missing.".format(ref=asset.ref_procfile_path, dis=asset.dis_procfile_path)) - def _prepare_log_file(self, asset): log_file_path = self._get_log_file_path(asset) @@ -434,30 +416,36 @@ def _run_on_asset(self, asset): return result def _open_workfiles(self, asset): - self._open_ref_workfile(asset, fifo_mode=False) - self._open_dis_workfile(asset, fifo_mode=False) + self._open_ref_workfile(asset, open_sem=None, fifo_mode=False) + self._open_dis_workfile(asset, open_sem=None, fifo_mode=False) def _open_workfiles_in_fifo_mode(self, asset): + sem = multiprocessing.Semaphore(0) ref_p = multiprocessing.Process(target=self._open_ref_workfile, - args=(asset, True)) + args=(asset, sem, True)) dis_p = multiprocessing.Process(target=self._open_dis_workfile, - args=(asset, True)) + args=(asset, sem, True)) ref_p.start() dis_p.start() - self._wait_for_workfiles(asset) + + if not sem.acquire(timeout=5) or not sem.acquire(timeout=5): + raise TimeoutError(f"timed out waiting for reference and/or distorted workfiles {asset.ref_workfile_path} and {asset.dis_workfile_path} to be created") def _open_procfiles(self, asset): - self._open_ref_procfile(asset, fifo_mode=False) - self._open_dis_procfile(asset, fifo_mode=False) + self._open_ref_procfile(asset, open_sem=None, fifo_mode=False) + self._open_dis_procfile(asset, open_sem=None, fifo_mode=False) def _open_procfiles_in_fifo_mode(self, asset): + sem = multiprocessing.Semaphore(0) ref_p = multiprocessing.Process(target=self._open_ref_procfile, - args=(asset, True)) + args=(asset, sem, True)) dis_p = multiprocessing.Process(target=self._open_dis_procfile, - args=(asset, True)) + args=(asset, sem, True)) ref_p.start() dis_p.start() - self._wait_for_procfiles(asset) + + if not sem.acquire(timeout=5) or not sem.acquire(timeout=5): + raise TimeoutError(f"timed out waiting for reference and/or distorted procfiles {asset.ref_procfile_path} and {asset.dis_procfile_path} to be created") def _close_workfiles(self, asset): self._close_ref_workfile(asset) @@ -507,7 +495,7 @@ def _get_log_file_path(self, asset): # ===== workfile ===== - def _open_ref_workfile(self, asset, fifo_mode): + def _open_ref_workfile(self, asset, open_sem, fifo_mode): use_path_as_workpath = asset.use_path_as_workpath path = asset.ref_path @@ -537,9 +525,9 @@ def _open_ref_workfile(self, asset, fifo_mode): else self._open_workfile _open_workfile_method(self, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type, preresampling_filterchain, resampling_type, postresampling_filterchain, - width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, logger) + width_height, quality_width_height, ref_or_dis, use_path_as_workpath, open_sem, fifo_mode, logger) - def _open_dis_workfile(self, asset, fifo_mode): + def _open_dis_workfile(self, asset, open_sem, fifo_mode): use_path_as_workpath = asset.use_path_as_workpath path = asset.dis_path @@ -569,12 +557,12 @@ def _open_dis_workfile(self, asset, fifo_mode): else self._open_workfile _open_workfile_method(self, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type, preresampling_filterchain, resampling_type, postresampling_filterchain, - width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, logger) + width_height, quality_width_height, ref_or_dis, use_path_as_workpath, open_sem, fifo_mode, logger) @staticmethod - def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type: Optional[str], - preresampling_filterchain: Optional[List[str]], resampling_type: str, postresampling_filterchain: Optional[List[str]], - width_height: Optional[Tuple[int, int]], quality_width_height: Tuple[int, int], ref_or_dis, use_path_as_workpath, fifo_mode, logger): + def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type: Optional[str], preresampling_filterchain: Optional[List[str]], + resampling_type: str, postresampling_filterchain: Optional[List[str]], width_height: Optional[Tuple[int, int]], + quality_width_height: Tuple[int, int], ref_or_dis, use_path_as_workpath, open_sem, fifo_mode, logger): # decoder type must be None here assert decoder_type is None, f'decoder_type must be None but is: {decoder_type}' @@ -587,9 +575,16 @@ def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, # only need to open workfile if the path is different from path assert use_path_as_workpath is False and path != workfile_path + # if fifo mode, mkfifo if fifo_mode: os.mkfifo(workfile_path) + else: + with open(workfile_path, 'wb'): + pass + + if open_sem is not None: + open_sem.release() if ref_or_dis == 'ref': start_end_frame = asset.ref_start_end_frame @@ -638,7 +633,7 @@ def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, # ===== procfile ===== - def _open_ref_procfile(self, asset, fifo_mode): + def _open_ref_procfile(self, asset, open_sem, fifo_mode): # only need to open ref procfile if the path is different from ref path assert asset.use_workpath_as_procpath is False and asset.ref_workfile_path != asset.ref_procfile_path @@ -647,6 +642,12 @@ def _open_ref_procfile(self, asset, fifo_mode): if fifo_mode: os.mkfifo(asset.ref_procfile_path) + else: + with open(asset.ref_procfile_path, 'wb'): + pass + + if open_sem is not None: + open_sem.release() quality_width, quality_height = self._get_quality_width_height(asset) yuv_type = asset.workfile_yuv_type @@ -662,7 +663,7 @@ def _open_ref_procfile(self, asset, fifo_mode): except StopIteration: break - def _open_dis_procfile(self, asset, fifo_mode): + def _open_dis_procfile(self, asset, open_sem, fifo_mode): # only need to open dis procfile if the path is different from dis path assert asset.use_workpath_as_procpath is False and asset.dis_workfile_path != asset.dis_procfile_path @@ -671,6 +672,12 @@ def _open_dis_procfile(self, asset, fifo_mode): if fifo_mode: os.mkfifo(asset.dis_procfile_path) + else: + with open(asset.dis_procfile_path, 'wb'): + pass + + if open_sem is not None: + open_sem.release() quality_width, quality_height = self._get_quality_width_height(asset) yuv_type = asset.workfile_yuv_type @@ -914,28 +921,6 @@ def _get_workfile_yuv_type(asset): else: return asset.dis_yuv_type - @override(Executor) - def _wait_for_workfiles(self, asset): - # wait til workfile paths being generated - for i in range(10): - if os.path.exists(asset.dis_workfile_path): - break - sleep(0.1) - else: - raise RuntimeError("dis video workfile path {} is missing.".format( - asset.dis_workfile_path)) - - @override(Executor) - def _wait_for_procfiles(self, asset): - # wait til procfile paths being generated - for i in range(10): - if os.path.exists(asset.dis_procfile_path): - break - sleep(0.1) - else: - raise RuntimeError("dis video procfile path {} is missing.".format( - asset.dis_procfile_path)) - @override(Executor) def _assert_paths(self, asset): assert os.path.exists(asset.dis_path) or match_any_files(asset.dis_path), \ @@ -943,25 +928,31 @@ def _assert_paths(self, asset): @override(Executor) def _open_workfiles(self, asset): - self._open_dis_workfile(asset, fifo_mode=False) + self._open_dis_workfile(asset, open_sem=None, fifo_mode=False) @override(Executor) def _open_workfiles_in_fifo_mode(self, asset): + sem = multiprocessing.Semaphore(0) dis_p = multiprocessing.Process(target=self._open_dis_workfile, - args=(asset, True)) + args=(asset, sem, True)) dis_p.start() - self._wait_for_workfiles(asset) + + if not sem.acquire(timeout=5): + raise TimeoutError(f"timed out waiting for distorted workfile {asset.dis_workfile_path} to be created") @override(Executor) def _open_procfiles(self, asset): - self._open_dis_procfile(asset, fifo_mode=False) + self._open_dis_procfile(asset, open_sem=None, fifo_mode=False) @override(Executor) def _open_procfiles_in_fifo_mode(self, asset): + sem = multiprocessing.Semaphore(0) dis_p = multiprocessing.Process(target=self._open_dis_procfile, - args=(asset, True)) + args=(asset, sem, True)) dis_p.start() - self._wait_for_procfiles(asset) + + if not sem.acquire(timeout=5): + raise TimeoutError(f"timed out waiting for distorted procfile {asset.dis_procfile_path} to be created") @override(Executor) def _close_workfiles(self, asset): diff --git a/python/vmaf/core/raw_extractor.py b/python/vmaf/core/raw_extractor.py index 5b29a68fa..b2e227355 100644 --- a/python/vmaf/core/raw_extractor.py +++ b/python/vmaf/core/raw_extractor.py @@ -41,19 +41,15 @@ def _assert_an_asset(cls, asset): pass @override(Executor) - def _open_ref_workfile(self, asset, fifo_mode): + def _open_ref_workfile(self, asset, open_sem, fifo_mode): # do nothing pass @override(Executor) - def _open_dis_workfile(self, asset, fifo_mode): + def _open_dis_workfile(self, asset, open_sem, fifo_mode): # do nothing pass - @override(Executor) - def _wait_for_workfiles(self, asset): - pass - def _generate_result(self, asset): # do nothing pass @@ -102,21 +98,10 @@ def _assert_args(self): self.assert_h5py_file() @override(Executor) - def _open_ref_workfile(self, asset, fifo_mode): + def _open_ref_workfile(self, asset, open_sem, fifo_mode): # do nothing - pass - - @override(Executor) - def _wait_for_workfiles(self, asset): - # Override Executor._wait_for_workfiles to skip ref_workfile_path - # wait til workfile paths being generated - for i in range(10): - if os.path.exists(asset.dis_workfile_path): - break - sleep(0.1) - else: - raise RuntimeError("dis video workfile path {} is missing.".format( - asset.dis_workfile_path)) + if open_sem is not None: + open_sem.release() def _generate_result(self, asset): quality_w, quality_h = asset.quality_width_height From e7592504b8842547fe1a8199fe0b467bb8a0f857 Mon Sep 17 00:00:00 2001 From: Arjun Barrett Date: Wed, 3 Jul 2024 14:57:33 -0700 Subject: [PATCH 2/5] use kwarg for semaphore --- python/vmaf/core/executor.py | 40 ++++++++++++++++++------------- python/vmaf/core/raw_extractor.py | 14 +++++------ 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/python/vmaf/core/executor.py b/python/vmaf/core/executor.py index fb032a963..ed94b1f36 100644 --- a/python/vmaf/core/executor.py +++ b/python/vmaf/core/executor.py @@ -416,15 +416,17 @@ def _run_on_asset(self, asset): return result def _open_workfiles(self, asset): - self._open_ref_workfile(asset, open_sem=None, fifo_mode=False) - self._open_dis_workfile(asset, open_sem=None, fifo_mode=False) + self._open_ref_workfile(asset, fifo_mode=False) + self._open_dis_workfile(asset, fifo_mode=False) def _open_workfiles_in_fifo_mode(self, asset): sem = multiprocessing.Semaphore(0) ref_p = multiprocessing.Process(target=self._open_ref_workfile, - args=(asset, sem, True)) + args=(asset, True), + kwargs={'open_sem': sem}) dis_p = multiprocessing.Process(target=self._open_dis_workfile, - args=(asset, sem, True)) + args=(asset, True), + kwargs={'open_sem': sem}) ref_p.start() dis_p.start() @@ -438,9 +440,11 @@ def _open_procfiles(self, asset): def _open_procfiles_in_fifo_mode(self, asset): sem = multiprocessing.Semaphore(0) ref_p = multiprocessing.Process(target=self._open_ref_procfile, - args=(asset, sem, True)) + args=(asset, True), + kwargs={'open_sem': sem}) dis_p = multiprocessing.Process(target=self._open_dis_procfile, - args=(asset, sem, True)) + args=(asset, True), + kwargs={'open_sem': sem}) ref_p.start() dis_p.start() @@ -495,7 +499,7 @@ def _get_log_file_path(self, asset): # ===== workfile ===== - def _open_ref_workfile(self, asset, open_sem, fifo_mode): + def _open_ref_workfile(self, asset, fifo_mode, open_sem=None): use_path_as_workpath = asset.use_path_as_workpath path = asset.ref_path @@ -525,9 +529,9 @@ def _open_ref_workfile(self, asset, open_sem, fifo_mode): else self._open_workfile _open_workfile_method(self, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type, preresampling_filterchain, resampling_type, postresampling_filterchain, - width_height, quality_width_height, ref_or_dis, use_path_as_workpath, open_sem, fifo_mode, logger) + width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, open_sem, logger) - def _open_dis_workfile(self, asset, open_sem, fifo_mode): + def _open_dis_workfile(self, asset, fifo_mode, open_sem=None): use_path_as_workpath = asset.use_path_as_workpath path = asset.dis_path @@ -557,12 +561,12 @@ def _open_dis_workfile(self, asset, open_sem, fifo_mode): else self._open_workfile _open_workfile_method(self, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type, preresampling_filterchain, resampling_type, postresampling_filterchain, - width_height, quality_width_height, ref_or_dis, use_path_as_workpath, open_sem, fifo_mode, logger) + width_height, quality_width_height, ref_or_dis, use_path_as_workpath, fifo_mode, open_sem, logger) @staticmethod def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, decoder_type: Optional[str], preresampling_filterchain: Optional[List[str]], resampling_type: str, postresampling_filterchain: Optional[List[str]], width_height: Optional[Tuple[int, int]], - quality_width_height: Tuple[int, int], ref_or_dis, use_path_as_workpath, open_sem, fifo_mode, logger): + quality_width_height: Tuple[int, int], ref_or_dis, use_path_as_workpath, fifo_mode, open_sem, logger): # decoder type must be None here assert decoder_type is None, f'decoder_type must be None but is: {decoder_type}' @@ -633,7 +637,7 @@ def _open_workfile(cls, asset, path, workfile_path, yuv_type, workfile_yuv_type, # ===== procfile ===== - def _open_ref_procfile(self, asset, open_sem, fifo_mode): + def _open_ref_procfile(self, asset, fifo_mode, open_sem=None): # only need to open ref procfile if the path is different from ref path assert asset.use_workpath_as_procpath is False and asset.ref_workfile_path != asset.ref_procfile_path @@ -663,7 +667,7 @@ def _open_ref_procfile(self, asset, open_sem, fifo_mode): except StopIteration: break - def _open_dis_procfile(self, asset, open_sem, fifo_mode): + def _open_dis_procfile(self, asset, fifo_mode, open_sem=None): # only need to open dis procfile if the path is different from dis path assert asset.use_workpath_as_procpath is False and asset.dis_workfile_path != asset.dis_procfile_path @@ -928,13 +932,14 @@ def _assert_paths(self, asset): @override(Executor) def _open_workfiles(self, asset): - self._open_dis_workfile(asset, open_sem=None, fifo_mode=False) + self._open_dis_workfile(asset, fifo_mode=False) @override(Executor) def _open_workfiles_in_fifo_mode(self, asset): sem = multiprocessing.Semaphore(0) dis_p = multiprocessing.Process(target=self._open_dis_workfile, - args=(asset, sem, True)) + args=(asset, True), + kwargs={'open_sem': sem}) dis_p.start() if not sem.acquire(timeout=5): @@ -942,13 +947,14 @@ def _open_workfiles_in_fifo_mode(self, asset): @override(Executor) def _open_procfiles(self, asset): - self._open_dis_procfile(asset, open_sem=None, fifo_mode=False) + self._open_dis_procfile(asset, fifo_mode=False) @override(Executor) def _open_procfiles_in_fifo_mode(self, asset): sem = multiprocessing.Semaphore(0) dis_p = multiprocessing.Process(target=self._open_dis_procfile, - args=(asset, sem, True)) + args=(asset, True), + kwargs={'open_sem': sem}) dis_p.start() if not sem.acquire(timeout=5): diff --git a/python/vmaf/core/raw_extractor.py b/python/vmaf/core/raw_extractor.py index b2e227355..347b814a6 100644 --- a/python/vmaf/core/raw_extractor.py +++ b/python/vmaf/core/raw_extractor.py @@ -41,14 +41,14 @@ def _assert_an_asset(cls, asset): pass @override(Executor) - def _open_ref_workfile(self, asset, open_sem, fifo_mode): - # do nothing - pass + def _open_ref_workfile(self, asset, fifo_mode, open_sem=None): + if open_sem is not None: + open_sem.release() @override(Executor) - def _open_dis_workfile(self, asset, open_sem, fifo_mode): - # do nothing - pass + def _open_dis_workfile(self, asset, fifo_mode, open_sem=None): + if open_sem is not None: + open_sem.release() def _generate_result(self, asset): # do nothing @@ -98,7 +98,7 @@ def _assert_args(self): self.assert_h5py_file() @override(Executor) - def _open_ref_workfile(self, asset, open_sem, fifo_mode): + def _open_ref_workfile(self, asset, fifo_mode, open_sem=None): # do nothing if open_sem is not None: open_sem.release() From e3318321baab8f273db518369ea875a6c96005e4 Mon Sep 17 00:00:00 2001 From: Arjun Barrett Date: Wed, 3 Jul 2024 16:18:19 -0700 Subject: [PATCH 3/5] don't throw on timeout --- python/vmaf/core/executor.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/python/vmaf/core/executor.py b/python/vmaf/core/executor.py index ed94b1f36..66e2ea885 100644 --- a/python/vmaf/core/executor.py +++ b/python/vmaf/core/executor.py @@ -430,8 +430,11 @@ def _open_workfiles_in_fifo_mode(self, asset): ref_p.start() dis_p.start() - if not sem.acquire(timeout=5) or not sem.acquire(timeout=5): - raise TimeoutError(f"timed out waiting for reference and/or distorted workfiles {asset.ref_workfile_path} and {asset.dis_workfile_path} to be created") + if not sem.acquire(timeout=5): + if self.logger: + self.logger.warn(f">5 seconds elapsed waiting for reference and/or distorted workfiles {asset.ref_workfile_path} and {asset.dis_workfile_path} to be created; now blocking until created") + sem.acquire() + sem.acquire() def _open_procfiles(self, asset): self._open_ref_procfile(asset, open_sem=None, fifo_mode=False) @@ -448,8 +451,11 @@ def _open_procfiles_in_fifo_mode(self, asset): ref_p.start() dis_p.start() - if not sem.acquire(timeout=5) or not sem.acquire(timeout=5): - raise TimeoutError(f"timed out waiting for reference and/or distorted procfiles {asset.ref_procfile_path} and {asset.dis_procfile_path} to be created") + if not sem.acquire(timeout=5): + if self.logger: + self.logger.warn(f">5 seconds elapsed waiting for reference and/or distorted procfiles {asset.ref_procfile_path} and {asset.dis_procfile_path} to be created; now blocking until created") + sem.acquire() + sem.acquire() def _close_workfiles(self, asset): self._close_ref_workfile(asset) @@ -943,7 +949,9 @@ def _open_workfiles_in_fifo_mode(self, asset): dis_p.start() if not sem.acquire(timeout=5): - raise TimeoutError(f"timed out waiting for distorted workfile {asset.dis_workfile_path} to be created") + if self.logger: + self.logger.warn(f">5 seconds elapsed waiting for distorted workfile {asset.dis_workfile_path} to be created to be created; now blocking until created") + sem.acquire() @override(Executor) def _open_procfiles(self, asset): @@ -958,7 +966,9 @@ def _open_procfiles_in_fifo_mode(self, asset): dis_p.start() if not sem.acquire(timeout=5): - raise TimeoutError(f"timed out waiting for distorted procfile {asset.dis_procfile_path} to be created") + if self.logger: + self.logger.warn(f">5 seconds elapsed waiting for distorted procfile {asset.dis_procfile_path} to be created to be created; now blocking until created") + sem.acquire() @override(Executor) def _close_workfiles(self, asset): From 2b016e43bb77b20d7f936d4d67762abd0212f879 Mon Sep 17 00:00:00 2001 From: Arjun Barrett Date: Fri, 5 Jul 2024 10:37:11 -0700 Subject: [PATCH 4/5] remove explicit default --- python/vmaf/core/executor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/vmaf/core/executor.py b/python/vmaf/core/executor.py index 66e2ea885..24e354c12 100644 --- a/python/vmaf/core/executor.py +++ b/python/vmaf/core/executor.py @@ -437,8 +437,8 @@ def _open_workfiles_in_fifo_mode(self, asset): sem.acquire() def _open_procfiles(self, asset): - self._open_ref_procfile(asset, open_sem=None, fifo_mode=False) - self._open_dis_procfile(asset, open_sem=None, fifo_mode=False) + self._open_ref_procfile(asset, fifo_mode=False) + self._open_dis_procfile(asset, fifo_mode=False) def _open_procfiles_in_fifo_mode(self, asset): sem = multiprocessing.Semaphore(0) From 1c06ca4f1bb5da38b54db075a27c35ba8ea9d7b7 Mon Sep 17 00:00:00 2001 From: nilfm Date: Mon, 15 Jul 2024 12:42:11 -0400 Subject: [PATCH 5/5] Bump Python library to 4.0.0 due to backwards incompatible changes in fifo handling --- python/vmaf/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/vmaf/__init__.py b/python/vmaf/__init__.py index 54ccbac39..3fc7a0d20 100644 --- a/python/vmaf/__init__.py +++ b/python/vmaf/__init__.py @@ -4,7 +4,7 @@ __copyright__ = "Copyright 2016-2020, Netflix, Inc." __license__ = "BSD+Patent" -__version__ = "3.0.0" +__version__ = "4.0.0" logging.basicConfig() logger = logging.getLogger(os.path.splitext(os.path.basename(__file__))[0])