Skip to content

Commit

Permalink
initial work on batch mode
Browse files Browse the repository at this point in the history
  • Loading branch information
markjschreiber committed Sep 17, 2024
1 parent c045b39 commit f248688
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
17 changes: 15 additions & 2 deletions omics/cli/run_analyzer/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
[--plot=<directory>]
[--headroom=<float>]
omics-run-analyzer --batch <runId>... [--profile=<profile>] [--region=<region>] [--headroom=<float>]
[--show] --[out=<path>]
[--out=<path>]
omics-run-analyzer (-h --help)
omics-run-analyzer --version
Expand Down Expand Up @@ -73,6 +73,7 @@
from bokeh.plotting import output_file

from . import timeline # type: ignore
from . import batch # type: ignore

exename = os.path.basename(sys.argv[0])
OMICS_LOG_GROUP = "/aws/omics/WorkflowLog"
Expand Down Expand Up @@ -215,7 +216,11 @@ def get_runs(logs, runs, opts):
"logGroupName": OMICS_LOG_GROUP,
"logStreamNamePrefix": prefix,
}
streams.extend(get_streams(logs, rqst))
returned_streams = get_streams(logs, rqst)
if returned_streams and len(returned_streams) > 0:
streams.extend(get_streams(logs, rqst))
else:
die(f"run {run[-1]} not found")
else:
# Get runs in time range
start_time = datetime.datetime.now() - parse_time_delta(opts["--time"])
Expand Down Expand Up @@ -460,6 +465,12 @@ def get_timeline_event(res, resources):
resources = get_run_resources(logs, runs[0])
if not resources:
die("no workflow run resources")
if len(runs) >=1 and opts["--batch"]:
list_of_resources = []
for run in runs:
resources = get_run_resources(logs, run)
if resources:
list_of_resources.append(resources)

# Display output
with open(opts["--out"] or sys.stdout.fileno(), "w") as out:
Expand Down Expand Up @@ -492,6 +503,8 @@ def get_timeline_event(res, resources):
die(f'the --headroom argument {opts["--headroom"]} is not a valid float value')
if headroom > 1.0 or headroom < 0.0:
die(f"the --headroom argument {headroom} must be between 0.0 and 1.0")
if opts["--batch"]:
batch.aggregate_and_print(list_of_resources, pricing, opts)

# Show run statistics
def tocsv(val):
Expand Down
38 changes: 38 additions & 0 deletions omics/cli/run_analyzer/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import sys

from . import __main__ as main


hdrs = [
"type",
"name",
"count",
"meanRunningSeconds",
"maximumRunningSeconds",
"stdDevRunningSeconds",
"maximumPeakCpuUtilization",
"maximumPeakMemoryUtilization",
"maxGpusReserved",
"recommendedCpus",
"recommendedMemoryGiB",
"recommendOmicsInstanceType",
"estimatedUSDForMaximumRunningSeconds",
"estimatedUSDForMeanRunningSeconds",
"storageMaximumGiB",
"meanStorageMaximumGiB",
"stdDevStorageMaximumGiB"
]

def aggregate_and_print(resources_list, pricing, headroom=0.0, out=sys.stdout):
"""Aggregate resources and print to output"""
for resources in resources_list:
# filter resources to remove anything where res["type"] is not "run"
resources = [r for r in resources if r["type"] == "run"]
for res in resources:
main.add_metrics(res, resources, pricing, headroom)

# if there are scattter resources from the run with a common name prefix then aggregate
names = [r["name"] for r in resources]
names = list(set(names))
for name in names:
_aggregate_resources(resources, name)

0 comments on commit f248688

Please sign in to comment.