Skip to content

Commit

Permalink
WIP - partial incorporation of timeline logic
Browse files Browse the repository at this point in the history
  • Loading branch information
markjschreiber committed Aug 16, 2024
1 parent 5a176f8 commit 70060d9
Show file tree
Hide file tree
Showing 4 changed files with 728 additions and 1 deletion.
34 changes: 34 additions & 0 deletions omics/cli/run_analyzer/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
[--timeline]
[--file=<path>]
[--out=<path>]
[--plot=<directory>]
[--help]
Options:
Expand All @@ -20,6 +21,7 @@
-T, --timeline Show workflow run timeline
-f, --file=<path> Load input from file
-o, --out=<path> Write output to file
-P, --plot=<directory> Plot a run timeline to a directory
-h, --help Show help text
Examples:
Expand All @@ -40,10 +42,15 @@
import re
import sys

from bokeh.plotting import output_file

import boto3
import dateutil
import dateutil.utils
import docopt

import timeline

exename = os.path.basename(sys.argv[0])
OMICS_LOG_GROUP = "/aws/omics/WorkflowLog"
OMICS_SERVICE_CODE = "AmazonOmics"
Expand Down Expand Up @@ -495,3 +502,30 @@ def tocsv(val):
writer.writerow(row)
if opts["--out"]:
sys.stderr.write(f"{exename}: wrote {opts['--out']}\n")
if opts["--plot"]:
if len(resources) < 1:
die("no resources to plot")

run = {}
for res in resources:
rtype = re.split(r"[:/]", res["arn"])[-2]
if rtype == "run":
run = res
resources.remove(res) # we don't want the run in the data to plot
break

start = datetime.datetime.strptime(run["startTime"], "%Y-%m-%dT%H:%M:%S.%fZ")
stop = datetime.datetime.strptime(run["stopTime"], "%Y-%m-%dT%H:%M:%S.%fZ")
run_duration_hrs = (stop - start).total_seconds() / 3600

runid = run["arn"].split("/")[-1]
output_file_basename = f"{runid}_timeline"

# open or create the plot directory
plot_dir = opts["--plot"]
if not os.path.isdir(plot_dir):
os.makedirs(plot_dir)
output_file(filename=os.path.join(plot_dir, f"{output_file_basename}.html"), title=runid, mode="cdn")
title = f"arn: {run['arn']}, name: {run.get('name')}"

timeline.plot_timeline(resources, title=title, max_duration_hrs=run_duration_hrs)
182 changes: 182 additions & 0 deletions omics/cli/run_analyzer/timeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import argparse
import os
import os.path as path
from textwrap import dedent
from datetime import datetime

from bokeh.models import ColumnDataSource, Range1d, Div
from bokeh.layouts import gridplot, column
from bokeh.plotting import figure, output_file, show
from bokeh.resources import CDN
import boto3
import pandas as pd

# parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
# parser.add_argument("runid", help="HealthOmics workflow run-id to plot")
# parser.add_argument("--profile", default=None, help="AWS profile to use")
# parser.add_argument("--region", default=None, help="AWS region to use")
# parser.add_argument("-u", "--time-units", default="min", choices=['sec', 'min', 'hr', 'day'], help="Time units to use for plot")
# parser.add_argument("-o", "--output-dir", default='.', help="Directory to save output files")
# parser.add_argument("--no-show", action="store_true", help="Do not show plot")


TIME_SCALE_FACTORS = {
"sec": 1, "min": 1/60, "hr": 1/3600, "day": 1/86400
}

TASK_COLORS={'COMPLETED': 'cornflowerblue', 'FAILED': 'crimson', 'CANCELLED': 'orange'}

def get_tasks(runid, client=None):
if not client:
client = boto3.client('omics')

request = {"id": runid }
tasks = []
while True:
response = client.list_run_tasks(**request)
next_token = response.get("nextToken")
tasks += response.get('items')
if not next_token:
break
else:
request["startingToken"] = next_token

return tasks

def parse_time_str(time_str):
try:
return datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
return datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S%fZ")

def get_task_timings_data(tasks, time_units='min'):
time_scale_factor = TIME_SCALE_FACTORS[time_units]
tare = min([parse_time_str(task['creationTime']) for task in tasks])

for i, task in enumerate(tasks):
task['y'] = i
task['color'] = TASK_COLORS[task['status']]

task['running_left'] = (parse_time_str(task['startTime']) - tare).total_seconds() * time_scale_factor
task['running_right'] = (parse_time_str(task['stopTime']) - tare).total_seconds() * time_scale_factor
task['running_duration'] = task['running_right'] - task['running_left']

task['queued_left'] = (parse_time_str(task['creationTime']) - tare).total_seconds() * time_scale_factor
task['queued_right'] = task['running_left']
task['queued_duration'] = task['queued_right'] - task['queued_left']

task['memory_to_cpus'] = task['memory'] / task['cpus']

task['label'] = f"({task['arn']}) {task['name']}"
task['text_x'] = ((parse_time_str(task['stopTime']) - tare).total_seconds() + 30) * time_scale_factor

tasks[i] = task

return pd.DataFrame.from_records(tasks).sort_values('creationTime')


def plot_timeline(tasks, title="", time_units='min', max_duration_hrs=5, show_plot=True):
time_scale_factor = TIME_SCALE_FACTORS[time_units]
data = get_task_timings_data(tasks, time_units=time_units)

source = ColumnDataSource(data)

tooltips = [
("taskId", "@taskId"),
("name", "@name"),
("cpus", "@cpus"),
("memory", "@memory GiB"),
("memory/vcpus", "@memory_to_cpus"),
("queued", f"@queued_duration {time_units}"),
("duration", f"@running_duration {time_units}"),
("status", "@status"),
]

if 'metrics' in data.columns:
tooltips.append(("est cost", "@metrics['estimatedCost'] USD"))

p_run = figure(width=960, height=800, sizing_mode="stretch_both", tooltips=tooltips)
p_run.hbar(y='y', left='queued_left', right='queued_right', height=0.8, color='lightgrey', source=source, legend_label="queued")
p_run.hbar(y='y', left='running_left', right='running_right', height=0.8, color='color', source=source, legend_label="running")
#p_run.text(x='text_x', y='y', text='label', alpha=0.4, text_baseline='middle', text_font_size='1.5ex', source=source)
x_max = max_duration_hrs*3600 * time_scale_factor # max expected workflow duration in hours
x_min = -(x_max * 0.05)
p_run.x_range = Range1d(x_min, x_max)
p_run.y_range.flipped = False
p_run.xaxis.axis_label = f"task execution time ({time_units})"
p_run.yaxis.visible = False
p_run.legend.location = "top_right"
p_run.title.text = (
f"tasks: {len(tasks)}, "
f"wall time: {(data['stopTime'].max() - data['creationTime'].min()).total_seconds() * time_scale_factor:.2f} {time_units}"
)

p_cpu = figure(width=160, y_range=p_run.y_range, sizing_mode="stretch_height", tooltips=tooltips)
p_cpu.hbar(y='y', right='cpus', height=0.8, color="darkgrey", source=source)
p_cpu.x_range = Range1d(-1, data['cpus'].max())
p_cpu.xaxis.axis_label = "vcpus"
p_cpu.yaxis.visible = False
p_cpu.title.text = f"max cpus: {max(source.data['cpus'])}"

p_mem = figure(width=160, y_range=p_run.y_range, sizing_mode="stretch_height", tooltips=tooltips)
p_mem.hbar(y='y', right='memory', height=0.8, color="darkgrey", source=source)
p_mem.x_range = Range1d(-1, data['memory'].max())
p_mem.xaxis.axis_label = "memory (GiB)"
p_mem.yaxis.visible = False
p_mem.title.text = f"max mem: {max(source.data['memory']):.2f} GiB"

p_mcr = figure(width=160, y_range=p_run.y_range, sizing_mode="stretch_height", tooltips=tooltips)
p_mcr.hbar(y='y', right='memory_to_cpus', height=0.8, color="darkslateblue", source=source)
p_mcr.ray(x=[2, 4, 8], y=-1, length=0, angle=90, angle_units="deg", color="darkred")
p_mcr.x_range = Range1d(-0.01, data['memory_to_cpus'].max())
p_mcr.xaxis.axis_label = "memory/vcpus"
p_mcr.yaxis.visible = False
p_mcr.title.text = f"max mem/vcpus: {max(source.data['memory_to_cpus']):.2f}"

plots = [p_cpu, p_mem, p_mcr, p_run]

if pricing and 'cost_usd' in data.columns:
p_usd = figure(width=160, y_range=p_run.y_range, sizing_mode="stretch_height", tooltips=tooltips)
p_usd.hbar(y='y', right='cost_usd', height=0.8, color="limegreen", source=source)
p_usd.x_range = Range1d(-0.01, data['cost_usd'].max())
p_usd.xaxis.axis_label = "cost ($)"
p_usd.yaxis.visible = False
p_usd.title.text = f"tot. task cost: ${sum(source.data['cost_usd']):.2f}"

plots = [p_usd] + plots

g = gridplot(plots, ncols=len(plots), toolbar_location="right")
layout = column(Div(text=f"<strong>{title}</strong>"), g)

if show_plot:
show(layout)

return layout


# def main(args):
# runid = args.runid

# session = boto3.Session(profile_name=args.profile, region_name=args.region)
# omics = session.client('omics')
# pricing = get_pricing(client=omics)
# run = omics.get_run(id=runid)
# tasks = get_tasks(runid, client=omics)

# run_duration_hrs = (run['stopTime'] - run['startTime']).total_seconds() / 3600

# output_file_basename = f"{runid}_timeline"
# if not args.output_dir == '.':
# os.makedirs(args.output_dir, exist_ok=True)

# data = get_task_timings_data(tasks)
# data.to_csv(path.join(args.output_dir, f"{output_file_basename}.csv"), index=False)

# output_file(filename=path.join(args.output_dir, f"{output_file_basename}.html"), title=runid, mode="cdn")
# title = f"arn: {run['arn']}, name: {run.get('name')}"
# g = plot_timeline(tasks, title=title, time_units=args.time_units, max_duration_hrs=run_duration_hrs, show_plot=(not args.no_show), pricing=pricing)


# if __name__ == "__main__":
# args = parser.parse_args()
# main(args)
Loading

0 comments on commit 70060d9

Please sign in to comment.