Skip to content

Commit

Permalink
adds first tests for batch operations
Browse files Browse the repository at this point in the history
  • Loading branch information
markjschreiber committed Sep 24, 2024
1 parent 1f7cb9b commit fbad06a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 13 deletions.
24 changes: 14 additions & 10 deletions omics/cli/run_analyzer/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
"meanRunningSeconds",
"maximumRunningSeconds",
"stdDevRunningSeconds",
"maximumCpuUtilization",
"maximumMemoryUtilization",
"maximumCpuUtilizationRatio",
"maximumMemoryUtilizationRatio",
"maximumGpusReserved",
"recommendedCpus",
"recommendedMemoryGiB",
Expand Down Expand Up @@ -47,14 +47,16 @@ def aggregate_and_print(resources_list, pricing, engine, headroom=0.0, out=sys.s
_aggregate_resources(resources, name, out)


def _aggregate_resources(resources, name, out):
def _aggregate_resources(resources, name, engine, out):
"""Aggregate resources with the same name"""
filtered = [r for r in resources if utils.task_base_name(r["name"]) == name]
filtered = [r for r in resources if utils.task_base_name(r["name"], engine) == name]
if filtered:
res = filtered[0]
for k in hdrs:
if k in ["type", "name"]:
if k == "type":
continue
elif k == "name":
res[k] = name
elif k == "count":
res[k] = _do_aggregation(filtered, k, "count")
elif k.startswith("mean"):
Expand All @@ -68,24 +70,26 @@ def _aggregate_resources(resources, name, out):
rk = k.replace("maximum", "")[0].lower() + k.replace("maximum", "")[1:]
res[k] = _do_aggregation(filtered, rk, "maximum")
elif k in ["recommendedCpus", "recommendedMemoryGiB"]:
_do_aggregation(resources, k, "maximum")
res[k] = _do_aggregation(filtered, k, "maximum")
elif k in ["recommendOmicsInstanceType"]:
_do_aggregation(resources, "omicsInstanceTypeMinimum", "maximum")
res[k] = _do_aggregation(filtered, "omicsInstanceTypeMinimum", "maximum")
else:
raise ValueError(f"Unhandled aggregation for key: {k}")

print(",".join([str(res.get(h, "")) for h in hdrs]), file=out)
print(",".join([str(res.get(h, "")) for h in hdrs]), file=out)


def _do_aggregation(resources, resource_key, operation):
def _do_aggregation(resources: list, resource_key: str, operation: str):
if operation == "count":
return len(resources)
elif operation == "sum":
return sum([r[resource_key] for r in resources])
elif operation == "maximum":
if resource_key == "omicsInstanceTypeMinimum":
# special case for instance types
return max(resources, key=lambda x: utils.omics_instance_weight(x[resource_key]))
instances=[]
for r in resources: instances.append(r[resource_key])
return max(instances, key=lambda x: utils.omics_instance_weight(x))
return max([r[resource_key] for r in resources])
elif operation == "mean":
return round(statistics.mean([r[resource_key] for r in resources]), 2)
Expand Down
4 changes: 1 addition & 3 deletions omics/cli/run_analyzer/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def task_base_name(name: str, engine: str) -> str:


_sizes = {
"": 2,
"large": 2,
"xlarge": 4,
"2xlarge": 8,
"4xlarge": 16,
Expand All @@ -50,12 +50,10 @@ def task_base_name(name: str, engine: str) -> str:

def omics_instance_weight(instance: str) -> int:
"""Compute a numeric weight for an instance to be used in sorting or finding a max or min"""
print(instance)
# remove the "omics." from the string
instance = instance.replace("omics.", "")
# split the instance into family and size
parts = instance.split(".")
print(parts)
fam = parts[0]
size = parts[1]

Expand Down
44 changes: 44 additions & 0 deletions tests/cli/run_analyzer/unit/test_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import io
import unittest

import omics.cli.run_analyzer.batch as batch

class TestRunAnalyzerBatch(unittest.TestCase):
def test_do_aggregation(self):
resources = [{'key': 2}, {'key': 1}, {'key': 4}]
result = batch._do_aggregation(resources, 'key', "count")
self.assertEqual(result, 3)
result = batch._do_aggregation(resources, 'key', "sum")
self.assertEqual(result, 7)
result = batch._do_aggregation(resources, 'key', "maximum")
self.assertEqual(result, 4)
result = batch._do_aggregation(resources, 'key', "mean")
self.assertEqual(result, 2.33)
result = batch._do_aggregation(resources, 'key', 'stdDev')
self.assertEqual(result, 1.53)

def test_do_aggregation_with_bad_operation(self):
resources = [{'key': 2}, {'key': 1}, {'key': 4}]
self.assertRaises(ValueError, batch._do_aggregation, resources, 'key', "bad_operation")

def test_aggregate_resources(self):
resources=[
{
'type': 'run', 'runningSeconds': 10.0, 'cpuUtilizationRatio': 1.0, 'memoryUtilizationRatio': 1.0,
'gpusReserved': 0, 'recommendedCpus': 4, 'recommendedMemoryGiB': 8,
'omicsInstanceTypeMinimum': 'omics.c.large', 'estimatedUSD':1.00,
'name': 'foo-01-000'
},
{
'type': 'run', 'runningSeconds': 20.0, 'cpuUtilizationRatio': 1.0, 'memoryUtilizationRatio': 1.0,
'gpusReserved': 0, 'recommendedCpus': 4, 'recommendedMemoryGiB': 8,
'omicsInstanceTypeMinimum': 'omics.c.large', 'estimatedUSD':1.00,
'name': 'foo-02-000'
}
]
with io.StringIO() as result:
batch._aggregate_resources(resources=resources, name='foo', engine='WDL', out=result)
self.assertEqual(result.getvalue(), 'run,foo,2,15.0,20.0,7.07,1.0,1.0,0,4,8,omics.c.large,1.0,1.0\n')



0 comments on commit fbad06a

Please sign in to comment.