From 39c12ba1f6ec8faf2ad806284eeb8c0bb76ed34e Mon Sep 17 00:00:00 2001 From: Mark Schreiber Date: Wed, 18 Sep 2024 08:56:08 -0400 Subject: [PATCH] Fix for dropped events (#67) --- omics/cli/run_analyzer/__main__.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/omics/cli/run_analyzer/__main__.py b/omics/cli/run_analyzer/__main__.py index d5ad679..cce6fe8 100755 --- a/omics/cli/run_analyzer/__main__.py +++ b/omics/cli/run_analyzer/__main__.py @@ -167,6 +167,7 @@ def stream_to_run(strm): def get_streams(logs, rqst, start_time=None): """Get matching CloudWatch Log streams""" streams = [] + # using boto3 get the log stream descriptions for the request, paginating the responses for page in logs.get_paginator("describe_log_streams").paginate(**rqst): done = False for strm in page["logStreams"]: @@ -219,18 +220,14 @@ def get_run_resources(logs, run): "logGroupName": OMICS_LOG_GROUP, "logStreamName": run["logStreamName"], "startFromHead": True, + "endTime": run["lastEventTimestamp"] + 1, } resources = [] done = False while not done: resp = logs.get_log_events(**rqst) for evt in resp.get("events", []): - try: - resources.append(json.loads(evt["message"])) - except Exception: - pass - if evt["timestamp"] >= run["lastEventTimestamp"]: - done = True + resources.append(json.loads(evt["message"])) token = resp.get("nextForwardToken") if not token or token == rqst.get("nextToken"): done = True