diff --git a/omics/analyzer/omics-run-analyzer b/omics/analyzer/omics-run-analyzer new file mode 100755 index 0000000..bb7c445 --- /dev/null +++ b/omics/analyzer/omics-run-analyzer @@ -0,0 +1,460 @@ +#!/usr/bin/env python3 +""" +Generate statistics for a completed HealthOmics workflow run + +Usage: omics-run-analyzer [...] + [--profile=] + [--region=] + [--time=] + [--show] + [--timeline] + [--file=] + [--out=] + [--help] + +Options: + -p, --profile= AWS profile + -r, --region= AWS region + -t, --time= Select runs over a time interval [default: 1day] + -s, --show Show run resources with no post-processing (JSON) + -T, --timeline Show workflow run timeline + -f, --file= Load input from file + -o, --out= Write output to file + -h, --help Show help text + +Examples: + # Show workflow runs that were running in the last 5 days + omics-run-analyzer -t5d + # Retrieve and analyze a specific workflow run by ID + omics-run-analyzer 1234567 -o run-1234567.csv + # Retrieve and analyze a specific workflow run by ID and UUID + omics-run-analyzer 2345678:12345678-1234-5678-9012-123456789012 + # Output workflow run and tasks in JSON format + omics-run-analyzer 1234567 -s -o run-1234567.json +""" +import csv +import datetime +import json +import os +import re +import sys + +import boto3 +import dateutil +import docopt + +exename = os.path.basename(sys.argv[0]) +omics_log_group = "/aws/omics/WorkflowLog" +omics_service_code = "AmazonOmics" +pricing_aws_region = "us-east-1" # Pricing service endpoint + + +def die(msg): + """Show error message and terminate""" + exit(f"{exename}: {msg}") + + +def parse_time_str(s, utc=True): + """Parse time string""" + tz = datetime.timezone.utc + return dateutil.parser.parse(s).replace(tzinfo=tz) if s else None + + +def parse_time_delta(s): + """Parse time delta string""" + m = re.match(r"(\d+)\s*(m|min|minutes?|h|hours?|d|days?|w|weeks?)$", s) + if not m: + die("unrecognized time interval format '{}'".format(s)) + secs = {"m": 60, "h": 3600, "d": 86400, "w": 604800} + delta = int(m.group(1)) * secs[m.group(2)[0]] + return datetime.timedelta(seconds=delta) + + +def get_storage_gib(capacity=None): + """Return filesystem size in GiB""" + omics_storage_min = 1200 # Minimum size + omics_storage_inc = 2400 # Size increment (2400, 4800, 7200, ...) + if not capacity or capacity <= omics_storage_min: + return omics_storage_min + capacity = (capacity + omics_storage_inc - 1) / omics_storage_inc + return int(capacity) * omics_storage_inc + + +def get_instance(cpus, mem): + """Return smallest matching instance type""" + sizes = { + "": 2, + "x": 4, + "2x": 8, + "4x": 16, + "8x": 32, + "12x": 48, + "16x": 64, + "24x": 96, + } + families = {"c": 2, "m": 4, "r": 8} + for size in sorted(sizes, key=lambda x: sizes[x]): + ccount = sizes[size] + if ccount < cpus: + continue + for fam in sorted(families, key=lambda x: families[x]): + mcount = ccount * families[fam] + if mcount < mem: + continue + return f"omics.{fam}.{size}large" + return "" + + +def get_pricing(pricing, resource, region, hours): + key = f"{resource}:{region}" + price = get_pricing.pricing.get(key) + if price: + return price * hours + elif not pricing: + return None + filters = [ + {"Type": "TERM_MATCH", "Field": "resourceType", "Value": resource}, + {"Type": "TERM_MATCH", "Field": "regionCode", "Value": region}, + ] + rqst = {"ServiceCode": omics_service_code, "Filters": filters} + for page in pricing.get_paginator("get_products").paginate(**rqst): + for item in page["PriceList"]: + entry = json.loads(item) + price = entry.get("terms", {}).get("OnDemand", {}) + price = next(iter(price.values()), {}).get("priceDimensions", {}) + price = next(iter(price.values()), {}).get("pricePerUnit", {}) + price = price.get("USD") + if price is None: + continue + price = float(price) + get_pricing.pricing[key] = price + return price * hours + return None + + +get_pricing.pricing = {} + + +def stream_to_run(strm): + """Convert CloudWatch Log stream to workflow run details""" + m = re.match(r"^manifest/run/(\d+)/([a-f0-9-]+)$", strm["logStreamName"]) + if not m: + return None + strm["id"] = m.group(1) + strm["uuid"] = m.group(2) + return strm + + +def get_streams(logs, rqst, start_time=None): + """Get matching CloudWatch Log streams""" + streams = [] + for page in logs.get_paginator("describe_log_streams").paginate(**rqst): + done = False + for strm in page["logStreams"]: + if start_time and strm["lastEventTimestamp"] < start_time: + done = True + elif stream_to_run(strm): + streams.append(strm) + if (len(streams) % 100) == 0: + sys.stderr.write(f"{exename}: found {len(streams)} workflow runs\n") + if not start_time: + done = True + if done: + break + return streams + + +def get_runs(logs, runs, opts): + """Get matching workflow runs""" + streams = [] + if runs: + # Get specified runs + for run in runs: + run = re.split(r"[:/]", run) + if re.match(r"[a-f\d]{8}(-[a-f\d]{4}){3}-[a-f\d]{12}$", run[-1]): + prefix = f"manifest/run/{run[-2]}/{run[-1]}" + else: + prefix = f"manifest/run/{run[-1]}/" + rqst = { + "logGroupName": omics_log_group, + "logStreamNamePrefix": prefix, + } + streams.extend(get_streams(logs, rqst)) + else: + # Get runs in time range + start_time = datetime.datetime.now() - parse_time_delta(opts["--time"]) + start_time = start_time.timestamp() * 1000.0 + rqst = { + "logGroupName": omics_log_group, + "orderBy": "LastEventTime", + "descending": True, + } + streams.extend(get_streams(logs, rqst, start_time)) + runs = [stream_to_run(s) for s in streams] + return sorted(runs, key=lambda x: x["creationTime"]) + + +def get_run_resources(logs, run): + """Get workflow run/task details""" + rqst = { + "logGroupName": omics_log_group, + "logStreamName": run["logStreamName"], + "startFromHead": True, + } + resources = [] + done = False + while not done: + resp = logs.get_log_events(**rqst) + for evt in resp.get("events", []): + try: + resources.append(json.loads(evt["message"])) + except Exception: + pass + if evt["timestamp"] >= run["lastEventTimestamp"]: + done = True + token = resp.get("nextForwardToken") + if not token or token == rqst.get("nextToken"): + done = True + rqst["nextToken"] = token + return sorted(resources, key=lambda x: x.get("creationTime")) + + +def add_run_util(run, tasks): + """Add run metrics computed from task metrics""" + events = [] + stop1 = None + stops = [] + for idx, task in enumerate(tasks): + start = parse_time_str(task.get("startTime")) + if start: + events.append({"time": start, "event": "start", "index": idx}) + stop = parse_time_str(task.get("stopTime")) + if stop: + events.append({"time": stop, "event": "stop", "index": idx}) + if not stop1 or stop > stop1: + stop1 = stop + else: + stops.append(idx) + for idx in stops: + events.append({"time": stop1, "event": "stop", "index": idx}) + events.sort(key=lambda x: x["time"]) + + metric_names = [ + "cpusReserved", + "cpusMaximum", + "cpusAverage", + "gpusReserved", + "memoryReservedGiB", + "memoryMaximumGiB", + "memoryAverageGiB", + ] + metrics = run.get("metrics", {}) + run["metrics"] = metrics + + active = [] + t0 = None + time = 0 + for evt in events: + t1 = evt["time"] + if t0: + secs = (t1 - t0).total_seconds() + time += secs + for name in metric_names: + task_metrics = tasks[idx].get("metrics", {}) + mvalues = [task_metrics.get(name) for idx in active] + mvalues = [v for v in mvalues if v is not None] + if not mvalues: + continue + total = sum(mvalues) + if "Average" in name: + metrics[name] = metrics.get(name, 0) + total * secs + else: + metrics[name] = max(metrics.get(name, total), total) + t0 = t1 + if evt["event"] == "start": + active.append(evt["index"]) + elif evt["index"] in active: + active.remove(evt["index"]) + + for name in metric_names: + if name in metrics and "Average" in name: + metrics[name] /= time + + +def add_metrics(res, resources, pricing): + """Add run/task metrics""" + metrics = res.get("metrics", {}) + res["metrics"] = metrics + + arn = re.split(r"[:/]", res["arn"]) + rtype = arn[-2] + region = arn[3] + res["type"] = rtype + if rtype == "run": + add_run_util(res, resources[1:]) + + time1 = parse_time_str(res.get("startTime")) + time2 = parse_time_str(res.get("stopTime")) + running = 0 + if time1 and time2: + running = (time2 - time1).total_seconds() + metrics["runningSeconds"] = running + + cpus_res = metrics.get("cpusReserved") + cpus_max = metrics.get("cpusMaximum") + if cpus_res and cpus_max: + metrics["cpuUtilization"] = float(cpus_max) / float(cpus_res) + gpus_res = metrics.get("gpusReserved") + mem_res = metrics.get("memoryReservedGiB") + mem_max = metrics.get("memoryMaximumGiB") + if mem_res and mem_max: + metrics["memoryUtilization"] = float(mem_max) / float(mem_res) + store_res = metrics.get("storageReservedGiB") + store_max = metrics.get("storageMaximumGiB") + if store_res and store_max: + metrics["storageUtilization"] = float(store_max) / float(store_res) + + if rtype == "run": + capacity = get_storage_gib(res.get("storageCapacity")) + metrics["sizeReserved"] = f"{capacity} GiB" + gib_hrs = capacity * running / 3600.0 + price = get_pricing(pricing, "Run Storage", region, gib_hrs) + if price: + metrics["estimatedUSD"] = price + if store_max: + capacity = get_storage_gib(store_max) + metrics["sizeMinimum"] = f"{capacity} GiB" + gib_hrs = capacity * running / 3600.0 + price = get_pricing(pricing, "Run Storage", region, gib_hrs) + if price: + metrics["minimumUSD"] = price + elif "instanceType" in res: + itype = res["instanceType"] + metrics["sizeReserved"] = itype + price = get_pricing(pricing, itype, region, running / 3600.0) + if price: + metrics["estimatedUSD"] = price + if cpus_max and mem_max and not gpus_res: + itype = get_instance(cpus_max, mem_max) + metrics["sizeMinimum"] = itype + price = get_pricing(pricing, itype, region, running / 3600.0) + if price: + metrics["minimumUSD"] = price + + +def get_timeline_event(res, resources): + """Convert resource to timeline event""" + arn = re.split(r"[:/]", res["arn"]) + time0 = parse_time_str(resources[0].get("creationTime")) + time1 = parse_time_str(res.get("creationTime")) + time2 = parse_time_str(res.get("startTime")) + time3 = parse_time_str(res.get("stopTime")) + attrs = ["name", "cpus", "gpus", "memory"] + attrs = [f"{a}={res[a]}" for a in attrs if res.get(a)] + resource = f"{arn[-2]}/{arn[-1]}" + if attrs: + resource += f" ({','.join(attrs)})" + return { + "resource": resource, + "pending": (time1 - time0).total_seconds(), + "starting": (time2 - time1).total_seconds(), + "running": (time3 - time2).total_seconds(), + } + + +if __name__ == "__main__": + # Parse command-line options + opts = docopt.docopt(__doc__) + + try: + session = boto3.Session( + profile_name=opts["--profile"], region_name=opts["--region"] + ) + pricing = session.client("pricing", region_name=pricing_aws_region) + pricing.describe_services(ServiceCode=omics_service_code) + except Exception as e: + die(e) + + # Retrieve workflow runs & tasks + runs = [] + resources = [] + if opts["--file"]: + with open(opts["--file"]) as f: + resources = json.load(f) + else: + try: + logs = session.client("logs") + runs = get_runs(logs, opts[""], opts) + except Exception as e: + die(e) + if not runs: + die("no matching workflow runs") + + if len(runs) == 1 and opts[""]: + resources = get_run_resources(logs, runs[0]) + if not resources: + die("no workflow run resources") + + # Display output + with open(opts["--out"] or sys.stdout.fileno(), "w") as out: + if not resources: + # Show available runs + out.write("Workflow run IDs ( ):\n") + for r in runs: + time0 = r["creationTime"] / 1000.0 + time0 = datetime.datetime.fromtimestamp(time0) + time0 = time0.isoformat(timespec="seconds") + out.write(f"{r['id']} ({time0} {r['uuid']})\n") + elif opts["--show"]: + # Show run resources + out.write(json.dumps(resources, indent=2) + "\n") + elif opts["--timeline"]: + # Show run timeline + hdrs = ["resource", "pending", "starting", "running"] + writer = csv.writer(out, lineterminator="\n") + writer.writerow(hdrs) + for res in resources: + event = get_timeline_event(res, resources) + row = [event.get(h, "") for h in hdrs] + writer.writerow(row) + else: + # Show run statistics + def tocsv(val): + if val is None: + return "" + return f"{val:f}" if type(val) is float else str(val) + + hdrs = [ + "arn", + "type", + "name", + "startTime", + "stopTime", + "runningSeconds", + "sizeReserved", + "sizeMinimum", + "estimatedUSD", + "minimumUSD", + "cpuUtilization", + "memoryUtilization", + "storageUtilization", + "cpusReserved", + "cpusMaximum", + "cpusAverage", + "gpusReserved", + "memoryReservedGiB", + "memoryMaximumGiB", + "memoryAverageGiB", + "storageReservedGiB", + "storageMaximumGiB", + "storageAverageGiB", + ] + writer = csv.writer(out, lineterminator="\n") + writer.writerow(hdrs) + for res in resources: + add_metrics(res, resources, pricing) + metrics = res.get("metrics", {}) + row = [tocsv(metrics.get(h, res.get(h))) for h in hdrs] + writer.writerow(row) + if opts["--out"]: + sys.stderr.write(f"{exename}: wrote {opts['--out']}\n")