From f2486881ae64605f4bb09e362088b578cfd06d57 Mon Sep 17 00:00:00 2001 From: Mark Schreiber Date: Tue, 17 Sep 2024 11:17:51 -0400 Subject: [PATCH] initial work on batch mode --- omics/cli/run_analyzer/__main__.py | 17 +++++++++++-- omics/cli/run_analyzer/batch.py | 38 ++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 2 deletions(-) create mode 100644 omics/cli/run_analyzer/batch.py diff --git a/omics/cli/run_analyzer/__main__.py b/omics/cli/run_analyzer/__main__.py index 40ce869..8baf4bf 100755 --- a/omics/cli/run_analyzer/__main__.py +++ b/omics/cli/run_analyzer/__main__.py @@ -13,7 +13,7 @@ [--plot=] [--headroom=] omics-run-analyzer --batch ... [--profile=] [--region=] [--headroom=] - [--show] --[out=] + [--out=] omics-run-analyzer (-h --help) omics-run-analyzer --version @@ -73,6 +73,7 @@ from bokeh.plotting import output_file from . import timeline # type: ignore +from . import batch # type: ignore exename = os.path.basename(sys.argv[0]) OMICS_LOG_GROUP = "/aws/omics/WorkflowLog" @@ -215,7 +216,11 @@ def get_runs(logs, runs, opts): "logGroupName": OMICS_LOG_GROUP, "logStreamNamePrefix": prefix, } - streams.extend(get_streams(logs, rqst)) + returned_streams = get_streams(logs, rqst) + if returned_streams and len(returned_streams) > 0: + streams.extend(get_streams(logs, rqst)) + else: + die(f"run {run[-1]} not found") else: # Get runs in time range start_time = datetime.datetime.now() - parse_time_delta(opts["--time"]) @@ -460,6 +465,12 @@ def get_timeline_event(res, resources): resources = get_run_resources(logs, runs[0]) if not resources: die("no workflow run resources") + if len(runs) >=1 and opts["--batch"]: + list_of_resources = [] + for run in runs: + resources = get_run_resources(logs, run) + if resources: + list_of_resources.append(resources) # Display output with open(opts["--out"] or sys.stdout.fileno(), "w") as out: @@ -492,6 +503,8 @@ def get_timeline_event(res, resources): die(f'the --headroom argument {opts["--headroom"]} is not a valid float value') if headroom > 1.0 or headroom < 0.0: die(f"the --headroom argument {headroom} must be between 0.0 and 1.0") + if opts["--batch"]: + batch.aggregate_and_print(list_of_resources, pricing, opts) # Show run statistics def tocsv(val): diff --git a/omics/cli/run_analyzer/batch.py b/omics/cli/run_analyzer/batch.py new file mode 100644 index 0000000..e92c1d9 --- /dev/null +++ b/omics/cli/run_analyzer/batch.py @@ -0,0 +1,38 @@ +import sys + +from . import __main__ as main + + +hdrs = [ + "type", + "name", + "count", + "meanRunningSeconds", + "maximumRunningSeconds", + "stdDevRunningSeconds", + "maximumPeakCpuUtilization", + "maximumPeakMemoryUtilization", + "maxGpusReserved", + "recommendedCpus", + "recommendedMemoryGiB", + "recommendOmicsInstanceType", + "estimatedUSDForMaximumRunningSeconds", + "estimatedUSDForMeanRunningSeconds", + "storageMaximumGiB", + "meanStorageMaximumGiB", + "stdDevStorageMaximumGiB" + ] + +def aggregate_and_print(resources_list, pricing, headroom=0.0, out=sys.stdout): + """Aggregate resources and print to output""" + for resources in resources_list: + # filter resources to remove anything where res["type"] is not "run" + resources = [r for r in resources if r["type"] == "run"] + for res in resources: + main.add_metrics(res, resources, pricing, headroom) + + # if there are scattter resources from the run with a common name prefix then aggregate + names = [r["name"] for r in resources] + names = list(set(names)) + for name in names: + _aggregate_resources(resources, name)