Skip to content

Commit

Permalink
continued work on aggregation of run batches
Browse files Browse the repository at this point in the history
  • Loading branch information
markjschreiber committed Sep 19, 2024
1 parent f2f9644 commit 92f4c54
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 47 deletions.
7 changes: 3 additions & 4 deletions omics/cli/run_analyzer/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
"""
import csv
import datetime
import importlib.metadata
import json
import math
import os
Expand All @@ -68,12 +69,10 @@
import dateutil
import dateutil.utils
import docopt
import importlib.metadata

from bokeh.plotting import output_file

from . import batch # type: ignore
from . import timeline # type: ignore
from . import batch # type: ignore

exename = os.path.basename(sys.argv[0])
OMICS_LOG_GROUP = "/aws/omics/WorkflowLog"
Expand Down Expand Up @@ -465,7 +464,7 @@ def get_timeline_event(res, resources):
resources = get_run_resources(logs, runs[0])
if not resources:
die("no workflow run resources")
if len(runs) >=1 and opts["--batch"]:
if len(runs) >= 1 and opts["--batch"]:
list_of_resources = []
for run in runs:
resources = get_run_resources(logs, run)
Expand Down
42 changes: 23 additions & 19 deletions omics/cli/run_analyzer/batch.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
import sys
import statistics
import sys

from . import __main__ as main
from . import utils

hdrs = [
"type",
"name",
"count",
"meanRunningSeconds",
"maximumRunningSeconds",
"stdDevRunningSeconds",
"maximumCpuUtilization",
"maximumMemoryUtilization",
"maximumGpusReserved",
"recommendedCpus",
"recommendedMemoryGiB",
"recommendOmicsInstanceType",
"maximumEstimatedUSD",
"meanEstimatedUSD"
]
"type",
"name",
"count",
"meanRunningSeconds",
"maximumRunningSeconds",
"stdDevRunningSeconds",
"maximumCpuUtilization",
"maximumMemoryUtilization",
"maximumGpusReserved",
"recommendedCpus",
"recommendedMemoryGiB",
"recommendOmicsInstanceType",
"maximumEstimatedUSD",
"meanEstimatedUSD",
]


def aggregate_and_print(resources_list, pricing, engine, headroom=0.0, out=sys.stdout):
"""Aggregate resources and print to output"""
if engine not in utils.ENGINES:
raise ValueError(f"Invalid engine for use in batch aggregation: {engine}. Must be one of {utils.ENGINES}")
raise ValueError(
f"Invalid engine for use in batch aggregation: {engine}. Must be one of {utils.ENGINES}"
)

for resources in resources_list:
for resources in resources_list:
# filter resources to remove anything where res["type"] is not "run"
resources = [r for r in resources if r["type"] == "run"]
for res in resources:
Expand All @@ -43,6 +46,7 @@ def aggregate_and_print(resources_list, pricing, engine, headroom=0.0, out=sys.s
for name in names:
_aggregate_resources(resources, name, out)


def _aggregate_resources(resources, name, out):
"""Aggregate resources with the same name"""
filtered = [r for r in resources if utils.task_base_name(r["name"]) == name]
Expand Down Expand Up @@ -72,6 +76,7 @@ def _aggregate_resources(resources, name, out):

print(",".join([str(res.get(h, "")) for h in hdrs]), file=out)


def _do_aggregation(resources, resource_key, operation):
if operation == "count":
return len(resources)
Expand All @@ -88,4 +93,3 @@ def _do_aggregation(resources, resource_key, operation):
return round(statistics.stdev([r[resource_key] for r in resources]), 2)
else:
raise ValueError(f"Invalid aggregation operation: {operation}")

51 changes: 27 additions & 24 deletions omics/cli/run_analyzer/utils.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,41 @@
import re

ENGINES = ['WDL', 'CWL', 'Nextflow']
ENGINES = ["WDL", "CWL", "Nextflow"]

_wdl_task_regex = r"^([^-]+)(-\d+-\d+.*)?$"
_nextflow_task_regex = r"^(.+)(\s\(.+\))$"
_cwl_task_regex = r"^(^\D+)(_\d+)?$"


def get_engine(workflow_arn, session) -> str:
omics = session.client("omics")
id = workflow_arn.split('/')[-1]
return omics.get_workflow(id)['engine']
omics = session.client("omics")
id = workflow_arn.split("/")[-1]
return omics.get_workflow(id)["engine"]


def task_base_name(name: str, engine: str) -> str:
# WDL
if engine == 'WDL':
m = re.match(_wdl_task_regex, name)
if m:
return m.group(1)
# Nextflow
elif engine == 'Nextflow':
m = re.match(_nextflow_task_regex, name)
if m:
return m.group(1)
# CWL
elif engine == 'CWL':
m = re.match(_cwl_task_regex, name)
if m:
return m.group(1)
else:
raise ValueError(f"Unsupported engine: {engine}")
return name
# WDL
if engine == "WDL":
m = re.match(_wdl_task_regex, name)
if m:
return m.group(1)
# Nextflow
elif engine == "Nextflow":
m = re.match(_nextflow_task_regex, name)
if m:
return m.group(1)
# CWL
elif engine == "CWL":
m = re.match(_cwl_task_regex, name)
if m:
return m.group(1)
else:
raise ValueError(f"Unsupported engine: {engine}")
return name


def omics_instance_weight(instance: str) -> int:

sizes = {
"": 2,
"x": 4,
Expand All @@ -53,4 +56,4 @@ def omics_instance_weight(instance: str) -> int:

ccount = sizes[size]
weight = ccount * families[fam]
return weight
return weight

0 comments on commit 92f4c54

Please sign in to comment.