diff --git a/omics/cli/run_analyzer/batch.py b/omics/cli/run_analyzer/batch.py index b8a43af..818e1c9 100644 --- a/omics/cli/run_analyzer/batch.py +++ b/omics/cli/run_analyzer/batch.py @@ -11,8 +11,8 @@ "meanRunningSeconds", "maximumRunningSeconds", "stdDevRunningSeconds", - "maximumCpuUtilization", - "maximumMemoryUtilization", + "maximumCpuUtilizationRatio", + "maximumMemoryUtilizationRatio", "maximumGpusReserved", "recommendedCpus", "recommendedMemoryGiB", @@ -47,14 +47,16 @@ def aggregate_and_print(resources_list, pricing, engine, headroom=0.0, out=sys.s _aggregate_resources(resources, name, out) -def _aggregate_resources(resources, name, out): +def _aggregate_resources(resources, name, engine, out): """Aggregate resources with the same name""" - filtered = [r for r in resources if utils.task_base_name(r["name"]) == name] + filtered = [r for r in resources if utils.task_base_name(r["name"], engine) == name] if filtered: res = filtered[0] for k in hdrs: - if k in ["type", "name"]: + if k == "type": continue + elif k == "name": + res[k] = name elif k == "count": res[k] = _do_aggregation(filtered, k, "count") elif k.startswith("mean"): @@ -68,16 +70,16 @@ def _aggregate_resources(resources, name, out): rk = k.replace("maximum", "")[0].lower() + k.replace("maximum", "")[1:] res[k] = _do_aggregation(filtered, rk, "maximum") elif k in ["recommendedCpus", "recommendedMemoryGiB"]: - _do_aggregation(resources, k, "maximum") + res[k] = _do_aggregation(filtered, k, "maximum") elif k in ["recommendOmicsInstanceType"]: - _do_aggregation(resources, "omicsInstanceTypeMinimum", "maximum") + res[k] = _do_aggregation(filtered, "omicsInstanceTypeMinimum", "maximum") else: raise ValueError(f"Unhandled aggregation for key: {k}") - print(",".join([str(res.get(h, "")) for h in hdrs]), file=out) + print(",".join([str(res.get(h, "")) for h in hdrs]), file=out) -def _do_aggregation(resources, resource_key, operation): +def _do_aggregation(resources: list, resource_key: str, operation: str): if operation == "count": return len(resources) elif operation == "sum": @@ -85,7 +87,9 @@ def _do_aggregation(resources, resource_key, operation): elif operation == "maximum": if resource_key == "omicsInstanceTypeMinimum": # special case for instance types - return max(resources, key=lambda x: utils.omics_instance_weight(x[resource_key])) + instances=[] + for r in resources: instances.append(r[resource_key]) + return max(instances, key=lambda x: utils.omics_instance_weight(x)) return max([r[resource_key] for r in resources]) elif operation == "mean": return round(statistics.mean([r[resource_key] for r in resources]), 2) diff --git a/omics/cli/run_analyzer/utils.py b/omics/cli/run_analyzer/utils.py index 6c0c5e2..12ea260 100644 --- a/omics/cli/run_analyzer/utils.py +++ b/omics/cli/run_analyzer/utils.py @@ -36,7 +36,7 @@ def task_base_name(name: str, engine: str) -> str: _sizes = { - "": 2, + "large": 2, "xlarge": 4, "2xlarge": 8, "4xlarge": 16, @@ -50,12 +50,10 @@ def task_base_name(name: str, engine: str) -> str: def omics_instance_weight(instance: str) -> int: """Compute a numeric weight for an instance to be used in sorting or finding a max or min""" - print(instance) # remove the "omics." from the string instance = instance.replace("omics.", "") # split the instance into family and size parts = instance.split(".") - print(parts) fam = parts[0] size = parts[1] diff --git a/tests/cli/run_analyzer/unit/test_batch.py b/tests/cli/run_analyzer/unit/test_batch.py new file mode 100644 index 0000000..cbbc34d --- /dev/null +++ b/tests/cli/run_analyzer/unit/test_batch.py @@ -0,0 +1,44 @@ +import io +import unittest + +import omics.cli.run_analyzer.batch as batch + +class TestRunAnalyzerBatch(unittest.TestCase): + def test_do_aggregation(self): + resources = [{'key': 2}, {'key': 1}, {'key': 4}] + result = batch._do_aggregation(resources, 'key', "count") + self.assertEqual(result, 3) + result = batch._do_aggregation(resources, 'key', "sum") + self.assertEqual(result, 7) + result = batch._do_aggregation(resources, 'key', "maximum") + self.assertEqual(result, 4) + result = batch._do_aggregation(resources, 'key', "mean") + self.assertEqual(result, 2.33) + result = batch._do_aggregation(resources, 'key', 'stdDev') + self.assertEqual(result, 1.53) + + def test_do_aggregation_with_bad_operation(self): + resources = [{'key': 2}, {'key': 1}, {'key': 4}] + self.assertRaises(ValueError, batch._do_aggregation, resources, 'key', "bad_operation") + + def test_aggregate_resources(self): + resources=[ + { + 'type': 'run', 'runningSeconds': 10.0, 'cpuUtilizationRatio': 1.0, 'memoryUtilizationRatio': 1.0, + 'gpusReserved': 0, 'recommendedCpus': 4, 'recommendedMemoryGiB': 8, + 'omicsInstanceTypeMinimum': 'omics.c.large', 'estimatedUSD':1.00, + 'name': 'foo-01-000' + }, + { + 'type': 'run', 'runningSeconds': 20.0, 'cpuUtilizationRatio': 1.0, 'memoryUtilizationRatio': 1.0, + 'gpusReserved': 0, 'recommendedCpus': 4, 'recommendedMemoryGiB': 8, + 'omicsInstanceTypeMinimum': 'omics.c.large', 'estimatedUSD':1.00, + 'name': 'foo-02-000' + } + ] + with io.StringIO() as result: + batch._aggregate_resources(resources=resources, name='foo', engine='WDL', out=result) + self.assertEqual(result.getvalue(), 'run,foo,2,15.0,20.0,7.07,1.0,1.0,0,4,8,omics.c.large,1.0,1.0\n') + + +