Skip to content

Commit

Permalink
wired batch option to batch logic
Browse files Browse the repository at this point in the history
  • Loading branch information
markjschreiber committed Sep 26, 2024
1 parent c22f22c commit 665b8a7
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 129 deletions.
13 changes: 5 additions & 8 deletions omics/cli/run_analyzer/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def add_run_util(run, tasks):
metrics[name] /= time


def add_metrics(res, resources, pricing, headroom):
def add_metrics(res, resources, pricing, headroom=0.0):
"""Add run/task metrics"""
arn = re.split(r"[:/]", res["arn"])
rtype = arn[-2]
Expand Down Expand Up @@ -434,8 +434,6 @@ def get_timeline_event(res, resources):
if __name__ == "__main__":
# Parse command-line options
opts = docopt.docopt(__doc__, version=f"v{importlib.metadata.version('amazon-omics-tools')}")
print(opts, file=sys.stderr)
exit(0)

try:
session = boto3.Session(profile_name=opts["--profile"], region_name=opts["--region"])
Expand Down Expand Up @@ -468,20 +466,21 @@ def get_timeline_event(res, resources):
engine: str = None
for run in runs:
resources = get_run_resources(logs, run)
run_engine = utils.get_engine_from_id(id=run)
run_engine = utils.get_engine(workflow_arn=resources[0]["workflow"], client=session.client("omics"))
if not engine:
engine = run_engine
elif engine != run_engine:
die("aggregated runs must be from the same engine")
if resources:
list_of_resources.append(resources)
batch.aggregate_and_print(
resources_list=list_of_resources,
run_resources_list=list_of_resources,
pricing=pricing,
engine=engine,
headroom=opts["--headroom"],
headroom=opts["--headroom"] or 0.0,
out=opts["--out"],
)
exit(0)

# Display output
with open(opts["--out"] or sys.stdout.fileno(), "w") as out:
Expand Down Expand Up @@ -514,8 +513,6 @@ def get_timeline_event(res, resources):
die(f'the --headroom argument {opts["--headroom"]} is not a valid float value')
if headroom > 1.0 or headroom < 0.0:
die(f"the --headroom argument {headroom} must be between 0.0 and 1.0")
if opts["--batch"]:
batch.aggregate_and_print(list_of_resources, pricing, opts)

# Show run statistics
def tocsv(val):
Expand Down
65 changes: 34 additions & 31 deletions omics/cli/run_analyzer/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@


def aggregate_and_print(
run_resources_list: list[list[dict]], pricing, engine, headroom=0.0, out=sys.stdout
run_resources_list: list[list[dict]], pricing: dict, engine: str, headroom=0.0, out=sys.stdout
):
"""Aggregate resources and print to output"""
if engine not in utils.ENGINES:
Expand All @@ -33,10 +33,12 @@ def aggregate_and_print(

task_names = set()
for run_resources in run_resources_list:
# filter run resources to remove anything where "type" is not "run"
resources = [r for r in run_resources if r["type"] == "run"]
for res in resources:
main.add_metrics(res, resources, pricing, headroom)
for res in run_resources:
# skip resources that are not tasks
if "task" not in res["arn"]:
run_resources.remove(res)
continue
main.add_metrics(res, run_resources, pricing, headroom)
task_names.add(utils.task_base_name(res["name"], engine))

# print hdrs
Expand All @@ -47,65 +49,66 @@ def aggregate_and_print(
_aggregate_resources(run_resources_list, task_name, engine, out)


def _aggregate_resources(run_resources_list: list[list[dict]], task_name: str, engine: str, out):
"""Aggregate resources with the same name"""
def _aggregate_resources(run_resources_list: list[list[dict]], task_base_name: str, engine: str, out):
"""Aggregate resources with the same base name"""

run_tasks_with_name: list[dict] = []

for run_resources in run_resources_list:
# find resources in run_resources that have a name matching the task_name
run_resources = [
r for r in run_resources if utils.task_base_name(r["name"], engine) == task_name
]
for run_task in run_resources:
# find resources in run_resources that have a name matching the task_name
run_task_base_name = utils.task_base_name(run_task["name"], engine)
if run_task_base_name == task_base_name:
run_tasks_with_name.append(run_task)

# for each header key, perform the aggregation
aggregate = {}
for k in hdrs:
if k == "type":
aggregate[k] = "run"
aggregate[k] = "task"
elif k == "name":
aggregate[k] = task_name
aggregate[k] = task_base_name
elif k == "count":
aggregate[k] = _do_aggregation(run_resources_list, k, "count")
aggregate[k] = _do_aggregation(run_tasks_with_name, k, "count")
elif k.startswith("mean"):
# resource key is k with "mean" removed and the first char to lowercase
rk = k.replace("mean", "")[0].lower() + k.replace("mean", "")[1:]
aggregate[k] = _do_aggregation(run_resources_list, rk, "mean")
aggregate[k] = _do_aggregation(run_tasks_with_name, rk, "mean")
elif k.startswith("stdDev"):
rk = k.replace("stdDev", "")[0].lower() + k.replace("stdDev", "")[1:]
aggregate[k] = _do_aggregation(run_resources_list, rk, "stdDev")
aggregate[k] = _do_aggregation(run_tasks_with_name, rk, "stdDev")
elif k.startswith("maximum"):
rk = k.replace("maximum", "")[0].lower() + k.replace("maximum", "")[1:]
aggregate[k] = _do_aggregation(run_resources_list, rk, "maximum")
aggregate[k] = _do_aggregation(run_tasks_with_name, rk, "maximum")
elif k in ["recommendedCpus", "recommendedMemoryGiB"]:
aggregate[k] = _do_aggregation(run_resources_list, k, "maximum")
aggregate[k] = _do_aggregation(run_tasks_with_name, k, "maximum")
elif k in ["recommendOmicsInstanceType"]:
aggregate[k] = _do_aggregation(
run_resources_list, "omicsInstanceTypeMinimum", "maximum"
run_tasks_with_name, "omicsInstanceTypeMinimum", "maximum"
)
else:
raise ValueError(f"Unhandled aggregation for key: {k}")

print(",".join([str(aggregate.get(h, "")) for h in hdrs]), file=out)


def _do_aggregation(run_resources_list: list[list[dict]], resource_key: str, operation: str):

# flatten the list of lists into a single list
resources = [r for rs in run_resources_list for r in rs]

def _do_aggregation(resources_list: list[dict], resource_key: str, operation: str):
if operation == "count":
return len(resources)
return len(resources_list)
elif operation == "sum":
return sum([r[resource_key] for r in resources])
return sum([r[resource_key] for r in resources_list])
elif operation == "maximum":
if resource_key == "omicsInstanceTypeMinimum":
# special case for instance types
instances = []
for r in resources:
instances.append(r[resource_key])
for r in resources_list:
instances.append(r["metrics"][resource_key])
return max(instances, key=lambda x: utils.omics_instance_weight(x))
return max([r[resource_key] for r in resources])
else:
return max([r["metrics"][resource_key] for r in resources_list])
elif operation == "mean":
return round(statistics.mean([r[resource_key] for r in resources]), 2)
return round(statistics.mean([r["metrics"][resource_key] for r in resources_list]), 2)
elif operation == "stdDev":
return round(statistics.stdev([r[resource_key] for r in resources]), 2)
return round(statistics.stdev([r["metrics"][resource_key] for r in resources_list]), 2)
else:
raise ValueError(f"Invalid aggregation operation: {operation}")
131 changes: 41 additions & 90 deletions tests/cli/run_analyzer/unit/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,121 +5,72 @@


class TestRunAnalyzerBatch(unittest.TestCase):
def test_do_aggregation(self):
resources = [[{"key": 2}, {"key": 1}, {"key": 4}]]
result = batch._do_aggregation(resources, "key", "count")
self.assertEqual(result, 3)
result = batch._do_aggregation(resources, "key", "sum")
self.assertEqual(result, 7)
result = batch._do_aggregation(resources, "key", "maximum")
self.assertEqual(result, 4)
result = batch._do_aggregation(resources, "key", "mean")
self.assertEqual(result, 2.33)
result = batch._do_aggregation(resources, "key", "stdDev")
self.assertEqual(result, 1.53)

def test_do_aggregation_with_bad_operation(self):
resources = [[{"key": 2}, {"key": 1}, {"key": 4}]]
self.assertRaises(ValueError, batch._do_aggregation, resources, "key", "bad_operation")

def test_aggregate_resources(self):
resources = [
[
{
"type": "run",
"runningSeconds": 10.0,
"cpuUtilizationRatio": 1.0,
"memoryUtilizationRatio": 1.0,
"gpusReserved": 0,
"recommendedCpus": 4,
"recommendedMemoryGiB": 8,
"omicsInstanceTypeMinimum": "omics.c.large",
"estimatedUSD": 1.00,
"name": "foo-01-000",
},
{
"type": "run",
"runningSeconds": 20.0,
"cpuUtilizationRatio": 1.0,
"memoryUtilizationRatio": 1.0,
"gpusReserved": 0,
"recommendedCpus": 4,
"recommendedMemoryGiB": 8,
"omicsInstanceTypeMinimum": "omics.c.large",
"estimatedUSD": 1.00,
"name": "foo-02-000",
},
]
]
with io.StringIO() as result:
batch._aggregate_resources(
run_resources_list=resources, task_name="foo", engine="WDL", out=result
)
self.assertEqual(
result.getvalue(), "run,foo,2,15.0,20.0,7.07,1.0,1.0,0,4,8,omics.c.large,1.0,1.0\n"
)

def test_aggregate_and_print_resources(self):
resources_list = [
[
{
"type": "run",
"runningSeconds": 10.0,
"cpuUtilizationRatio": 1.0,
"memoryUtilizationRatio": 1.0,
"gpusReserved": 0,
"recommendedCpus": 4,
"recommendedMemoryGiB": 8,
"omicsInstanceTypeMinimum": "omics.c.large",
"estimatedUSD": 1.00,
"metrics": {
"runningSeconds": 10.0,
"cpuUtilizationRatio": 1.0,
"memoryUtilizationRatio": 1.0,
"gpusReserved": 0,
"recommendedCpus": 4,
"recommendedMemoryGiB": 8,
"omicsInstanceTypeMinimum": "omics.c.large",
"estimatedUSD": 1.00
},
"name": "foo-01-000",
"arn": "arn:aws:omics:us-east-1:123456789012:task/111113",
},
{
"type": "run",
"runningSeconds": 20.0,
"cpuUtilizationRatio": 1.0,
"memoryUtilizationRatio": 1.0,
"gpusReserved": 0,
"recommendedCpus": 4,
"recommendedMemoryGiB": 8,
"omicsInstanceTypeMinimum": "omics.c.large",
"estimatedUSD": 1.00,
"metrics": {
"runningSeconds": 20.0,
"cpuUtilizationRatio": 1.0,
"memoryUtilizationRatio": 1.0,
"gpusReserved": 0,
"recommendedCpus": 4,
"recommendedMemoryGiB": 8,
"omicsInstanceTypeMinimum": "omics.c.large",
"estimatedUSD": 1.00,
},
"name": "foo-02-000",
"arn": "arn:aws:omics:us-east-1:123456789012:task/123458",
},
],
[
{
"type": "run",
"runningSeconds": 30.0,
"cpuUtilizationRatio": 0.5,
"memoryUtilizationRatio": 0.5,
"gpusReserved": 0,
"recommendedCpus": 4,
"recommendedMemoryGiB": 8,
"omicsInstanceTypeMinimum": "omics.c.large",
"estimatedUSD": 1.00,
"metrics": {
"runningSeconds": 30.0,
"cpuUtilizationRatio": 0.5,
"memoryUtilizationRatio": 0.5,
"gpusReserved": 0,
"recommendedCpus": 4,
"recommendedMemoryGiB": 8,
"omicsInstanceTypeMinimum": "omics.c.large",
"estimatedUSD": 1.00
},
"name": "foo-01-050",
"arn": "arn:aws:omics:us-east-1:123456789012:task/111111",
},
{
"type": "run",
"runningSeconds": 20.0,
"cpuUtilizationRatio": 0.5,
"memoryUtilizationRatio": 0.5,
"gpusReserved": 0,
"recommendedCpus": 4,
"recommendedMemoryGiB": 8,
"omicsInstanceTypeMinimum": "omics.c.large",
"estimatedUSD": 1.00,
"metrics": {
"runningSeconds": 20.0,
"cpuUtilizationRatio": 0.5,
"memoryUtilizationRatio": 0.5,
"gpusReserved": 0,
"recommendedCpus": 4,
"recommendedMemoryGiB": 8,
"omicsInstanceTypeMinimum": "omics.c.large",
"estimatedUSD": 1.00,
},
"name": "foo-02-010",
"arn": "arn:aws:omics:us-east-1:123456789012:task/123456",
},
],
]
header_string = ",".join(batch.hdrs) + "\n"
expected = header_string + "run,foo,4,20.0,30.0,8.16,1.0,1.0,0,4,8,omics.c.large,1.0,1.0\n"
expected = header_string + "task,foo,4,20.0,30.0,8.16,1.0,1.0,0,4,8,omics.c.large,1.0,1.0\n"
with io.StringIO() as result:
batch.aggregate_and_print(
run_resources_list=resources_list, pricing={}, engine="WDL", out=result
Expand Down

0 comments on commit 665b8a7

Please sign in to comment.