From af87819b0368d20eee42ef889a192ce1bc132c6d Mon Sep 17 00:00:00 2001 From: Julien Esseiva Date: Tue, 16 Jan 2024 16:37:59 -0800 Subject: [PATCH] fix issue of max shell argument size when merging many hits file --- src/raythena/drivers/esdriver.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/raythena/drivers/esdriver.py b/src/raythena/drivers/esdriver.py index 1cfc230..868ab62 100644 --- a/src/raythena/drivers/esdriver.py +++ b/src/raythena/drivers/esdriver.py @@ -3,6 +3,7 @@ import re import json import shutil +import stat import tempfile import time import traceback @@ -586,9 +587,14 @@ def run(self) -> None: self.bookKeeper.stop_saver_thread() if not total_events: + max_iter = 5 + n_iter = 0 # didn't have any events to process, we only need to do merging so keep doing it while self.handle_merge_transforms(True): - pass + n_iter += 1 + if n_iter == max_iter: + break + else: # we might still simulate more events, just finish the current merge tasks self.handle_merge_transforms(True) @@ -616,7 +622,10 @@ def rename_output_files(self, output_map: Dict[str, str]): new_filename = output_map[file] except KeyError: # read the commit log to recover the correct name. If we get another KeyError, we can't recover - new_filename = output_map[self.bookKeeper.recover_outputfile_name(file)] + new_filename = output_map.get(self.bookKeeper.recover_outputfile_name(file)) + if not new_filename: + self._logger.warning(f"Couldn't find new name for {file}, will not be staged out correctly") + continue os.rename(os.path.join(self.merged_files_dir, file), os.path.join(self.merged_files_dir, new_filename)) def produce_final_report(self, output_map: Dict[str, str]): @@ -783,24 +792,34 @@ def hits_merge_transform(self, input_files: Iterable[str], output_file: str) -> if not input_files: return tmp_dir = tempfile.mkdtemp() - file_list = ",".join(input_files) + file_list = "\n".join(input_files) job_report_name = os.path.join(self.job_reports_dir, output_file) + ".json" output_file = os.path.join(self.merged_files_dir, output_file) - transform_params = re.sub(r"@inputFor_\$\{OUTPUT0\}", file_list, self.merge_transform_params) + file_list_path = os.path.join(tmp_dir, "file_list.txt") + with open(file_list_path, 'w') as f: + f.write(file_list) + + transform_params = re.sub(r"@inputFor_\$\{OUTPUT0\}", f"@/srv/{os.path.basename(file_list_path)}", self.merge_transform_params) + transform_params = re.sub(r"--inputHitsFile=", "--inputHitsFile ", transform_params) + transform_params = re.sub(r"--inputHITSFile=", "--inputHITSFile ", transform_params) transform_params = re.sub(r"\$\{OUTPUT0\}", output_file, transform_params, count=1) transform_params = re.sub(r"--autoConfiguration=everything", "", transform_params) transform_params = re.sub(r"--DBRelease=current", "", transform_params) endtoken = "" if self.config.payload['containerextrasetup'].strip().endswith(";") else ";" container_script = f"{self.config.payload['containerextrasetup']}{endtoken}{self.merge_transform} {transform_params}" + merge_script_path = os.path.join(tmp_dir, "merge_transform.sh") + with open(merge_script_path, 'w') as f: + f.write(container_script) + os.chmod(merge_script_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH) cmd = str() cmd += "export ATLAS_LOCAL_ROOT_BASE=/cvmfs/atlas.cern.ch/repo/ATLASLocalRootBase;" cmd += f"export thePlatform=\"{self.container_name}\";" endtoken = "" if self.config.payload['containerextraargs'].strip().endswith(";") else ";" cmd += f"{self.config.payload['containerextraargs']}{endtoken}" cmd += f"source ${{ATLAS_LOCAL_ROOT_BASE}}/user/atlasLocalSetup.sh --swtype {self.config.payload['containerengine']} -c $thePlatform -d -s none" - cmd += f" -r \"{container_script}\" -e \"{self.container_options}\";RETURN_VAL=$?;cp jobReport.json {job_report_name} ;exit $RETURN_VAL;" + cmd += f" -r /srv/merge_transform.sh -e \"{self.container_options}\";RETURN_VAL=$?;cp jobReport.json {job_report_name} ;exit $RETURN_VAL;" return (Popen(cmd, stdin=DEVNULL, stdout=DEVNULL,