From 580a1177c321d59ef1fa9ad29fb3c489b8fc4d8a Mon Sep 17 00:00:00 2001 From: Pavel Silin Date: Thu, 13 Feb 2025 14:08:26 +0100 Subject: [PATCH] issue 3864 update nf-weblog-handler to be more resilient to incorrect input (#3911) --- deploy/docker/cp-tools/base/nextflow/nextflow | 2 +- .../nf-weblog-handler/app/nextflow_event_handler.py | 7 ++++--- .../base/nextflow/nf-weblog-handler/app/util/parse.py | 6 +++++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/deploy/docker/cp-tools/base/nextflow/nextflow b/deploy/docker/cp-tools/base/nextflow/nextflow index c98c86daae..b46e4ee3f3 100644 --- a/deploy/docker/cp-tools/base/nextflow/nextflow +++ b/deploy/docker/cp-tools/base/nextflow/nextflow @@ -63,7 +63,7 @@ if [ "$_IS_RUN_COMMAND" == "1" ]; then echo "trace.enabled = true" >> $NEXTFLOW_CONFIG echo "trace.raw = true" >> $NEXTFLOW_CONFIG echo "trace.file = '${CP_NF_TRACE_FILE}'" >> $NEXTFLOW_CONFIG - echo "trace.fields = 'task_id,hash,native_id,process,tag,name,status,exit,module,container,cpus,time,disk,memory,attempt,submit,start,complete,duration,realtime,queue,%cpu,%mem,rss,vmem,peak_rss,peak_vmem,rchar,wchar,syscr,syscw,read_bytes,write_bytes,env,workdir'" >> $NEXTFLOW_CONFIG + echo "trace.fields = 'task_id,hash,native_id,process,tag,name,status,exit,module,container,cpus,time,disk,memory,attempt,submit,start,complete,duration,realtime,queue,%cpu,%mem,rss,vmem,peak_rss,peak_vmem,rchar,wchar,syscr,syscw,read_bytes,write_bytes'" >> $NEXTFLOW_CONFIG fi fi fi diff --git a/deploy/docker/cp-tools/base/nextflow/nf-weblog-handler/app/nextflow_event_handler.py b/deploy/docker/cp-tools/base/nextflow/nf-weblog-handler/app/nextflow_event_handler.py index 1ffddc672b..f4d782d804 100644 --- a/deploy/docker/cp-tools/base/nextflow/nf-weblog-handler/app/nextflow_event_handler.py +++ b/deploy/docker/cp-tools/base/nextflow/nf-weblog-handler/app/nextflow_event_handler.py @@ -115,16 +115,17 @@ def sync_events_from_trace_file(self, trace_file_path, attempts=5): events = [] synced = False with open(trace_file_path) as trace_file: - line_index = 0 header = None for line in trace_file: - if line_index == 0: + self.logger.info("Reading line: {}".format(line)) + if not header: header = NextflowEventHandler._parse_header_from_trace_file(line.rstrip()) else: event_json = NextflowEventHandler._parse_event_from_trace_file(header, line.rstrip()) if event_json: + self.logger.info("event json: {}".format(event_json)) + self.logger.info("object: {}\n\n".format(self._parse_event(event_json).to_dict())) events.append(self._parse_event(event_json)) - line_index = line_index + 1 while not synced and attempts > 0: if self.api_client.log_pipeline_run_engine_task_events(events): diff --git a/deploy/docker/cp-tools/base/nextflow/nf-weblog-handler/app/util/parse.py b/deploy/docker/cp-tools/base/nextflow/nf-weblog-handler/app/util/parse.py index 786304de71..26689e908b 100644 --- a/deploy/docker/cp-tools/base/nextflow/nf-weblog-handler/app/util/parse.py +++ b/deploy/docker/cp-tools/base/nextflow/nf-weblog-handler/app/util/parse.py @@ -24,19 +24,23 @@ def get_json_attr(event_json, attr_name, default=None): def parse_timestamp(timestamp): - if not timestamp: + if not timestamp or timestamp == "-": return None timestamp_sec = timestamp / 1000 return (datetime.fromtimestamp(timestamp_sec) .strftime("%Y-%m-%d %H:%M:%S.000Z")) def parse_duration_field_from_trace_file(duration_str): + if not duration_str or duration_str == "-": + return None if duration_str.isdigit(): return int(duration_str) else: return duration_str_to_int(duration_str) def parse_time_field_from_trace_file(datetime_str): + if not datetime_str or datetime_str == "-": + return None if datetime_str.isdigit(): return int(datetime_str) else: