Skip to content

Commit

Permalink
[card-server] cli command to expose a card server to view realtime up…
Browse files Browse the repository at this point in the history
…dates

- Added a card viewer html file
- Created a simple HTTP based card server that will help showcase the realtime cards from querying the server
  • Loading branch information
valayDave committed Dec 8, 2023
1 parent 31f714b commit 7192efd
Show file tree
Hide file tree
Showing 3 changed files with 428 additions and 1 deletion.
94 changes: 93 additions & 1 deletion metaflow/plugins/cards/card_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from metaflow.client import Task
from metaflow import JSONType, namespace
from metaflow.exception import CommandException
from metaflow.util import resolve_identity
from metaflow.exception import (
CommandException,
MetaflowNotFound,
MetaflowNamespaceMismatch,
)
import webbrowser
import re
from metaflow._vendor import click
Expand Down Expand Up @@ -945,3 +950,90 @@ def list(
show_list_as_json=as_json,
file=file,
)


@card.command(help="Run local card viewer server")
@click.option(
"--run-id",
default=None,
show_default=True,
type=str,
help="Run ID of the flow",
)
@click.option(
"--port",
default=8324,
show_default=True,
type=int,
help="Port on which Metaflow card server will run",
)
@click.option(
"--namespace",
"user_namespace",
default=None,
show_default=True,
type=str,
help="Namespace of the flow",
)
@click.option(
"--max-cards",
default=30,
show_default=True,
type=int,
help="Maximum number of cards to be shown at any time by the server",
)
@click.pass_context
def server(ctx, run_id, port, user_namespace, max_cards):
from .card_server import create_card_server, CardServerOptions

user_namespace = resolve_identity() if user_namespace is None else user_namespace
run = _get_run_object(ctx.obj, run_id, user_namespace)
options = CardServerOptions(
run_object=run,
only_running=False,
follow_resumed=False,
flow_datastore=ctx.obj.flow_datastore,
max_cards=max_cards,
)
create_card_server(options, port, ctx.obj)


def _get_run_object(obj, run_id, user_namespace):
from metaflow import Flow, Run, Task

flow_name = obj.flow.name
try:
if run_id is not None:
namespace(None)
else:
_msg = "Searching for runs in namespace: %s" % user_namespace
obj.echo(_msg, fg="blue", bold=False)
namespace(user_namespace)
flow = Flow(pathspec=flow_name)
except MetaflowNotFound:
raise CommandException("No run found for *%s*." % flow_name)

except MetaflowNamespaceMismatch:
raise CommandException(
"No run found for *%s* in namespace *%s*. You can switch the namespace using --namespace"
% (flow_name, user_namespace)
)

if run_id is None:
run_id = flow.latest_run.pathspec

else:
assert len(run_id.split("/")) == 1, "run_id should be of the form <runid>"
run_id = "/".join([flow_name, run_id])

try:
run = Run(run_id)
except MetaflowNotFound:
raise CommandException("No run found for runid: *%s*." % run_id)
except MetaflowNamespaceMismatch:
raise CommandException(
"No run found for runid: *%s* in namespace *%s*. You can switch the namespace using --namespace"
% (run_id, user_namespace)
)
obj.echo("Using run-id %s" % run_id, fg="blue", bold=False)
return run
188 changes: 188 additions & 0 deletions metaflow/plugins/cards/card_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import os
import json
from http.server import BaseHTTPRequestHandler

try:
from http.server import ThreadingHTTPServer
except ImportError:
from socketserver import ThreadingMixIn
from http.server import HTTPServer

class ThreadingHTTPServer(ThreadingMixIn, HTTPServer):
daemon_threads = True


from .card_client import CardContainer
from .exception import CardNotPresentException
from .card_resolver import resolve_paths_from_task

VIEWER_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "card_viewer", "viewer.html"
)

CARD_VIEWER_HTML = open(VIEWER_PATH).read()

TASK_CACHE = {}


class CardServerOptions:
def __init__(
self, run_object, only_running, follow_resumed, flow_datastore, max_cards=20
):
self.run_object = run_object
self.only_running = only_running
self.follow_resumed = follow_resumed
self.flow_datastore = flow_datastore
self.max_cards = max_cards


def cards_for_task(
flow_datastore, task_pathspec, card_type=None, card_hash=None, card_id=None
):
try:
paths, card_ds = resolve_paths_from_task(
flow_datastore,
task_pathspec,
type=card_type,
hash=card_hash,
card_id=card_id,
)
except CardNotPresentException:
return None
for card in CardContainer(paths, card_ds, origin_pathspec=None):
yield card


def cards_for_run(
flow_datastore,
run_object,
only_running,
card_type=None,
card_hash=None,
card_id=None,
max_cards=20,
):
curr_idx = 0
for step in run_object.steps():
for task in step.tasks():
if only_running and task.finished:
continue
card_generator = cards_for_task(
flow_datastore,
task.pathspec,
card_type=card_type,
card_hash=card_hash,
card_id=card_id,
)
if card_generator is None:
continue
for card in card_generator:
curr_idx += 1
if curr_idx >= max_cards:
raise StopIteration
yield task.pathspec, card


class CardViewerRoutes(BaseHTTPRequestHandler):

card_options: CardServerOptions = None

def do_GET(self):
try:
_, path = self.path.split("/", 1)
try:
prefix, suffix = path.split("/", 1)
except:
prefix = path
suffix = None
except:
prefix = None
if prefix in self.ROUTES:
self.ROUTES[prefix](self, suffix)
else:
self._response(CARD_VIEWER_HTML.encode("utf-8"))

def get_runinfo(self, suffix):
task_card_generator = cards_for_run(
self.card_options.flow_datastore,
self.card_options.run_object,
self.card_options.only_running,
max_cards=self.card_options.max_cards,
)
flow_name = self.card_options.run_object.parent.id
run_id = self.card_options.run_object.id
cards = []
for pathspec, card in task_card_generator:
step, task = pathspec.split("/")[-2:]
cards.append(
dict(
task=pathspec,
label="%s/%s %s" % (step, task, card.hash),
card_object=dict(
hash=card.hash,
type=card.type,
path=card.path,
id=card.id,
),
card="%s/%s" % (pathspec, card.hash),
)
)
resp = {"status": "ok", "flow": flow_name, "run_id": run_id, "cards": cards}
self._response(resp, is_json=True)

def get_card(self, suffix):
flow, run_id, step, task_id, card_hash = suffix.split("/")
pathspec = "/".join([flow, run_id, step, task_id])
cards = list(
cards_for_task(
self.card_options.flow_datastore, pathspec, card_hash=card_hash
)
)
if len(cards) == 0:
self._response("Card not found", code=404)
return
selected_card = cards[0]
self._response(selected_card.get().encode("utf-8"))

def get_data(self, suffix):
flow, run_id, step, task_id, card_hash = suffix.split("/")
pathspec = "/".join([flow, run_id, step, task_id])
cards = list(
cards_for_task(
self.card_options.flow_datastore, pathspec, card_hash=card_hash
)
)
if len(cards) == 0:
self._response("Card not found", code=404)
return
selected_card = cards[0]
card_data = selected_card.get_data()
if card_data is not None:
self._response({"status": "ok", "payload": card_data}, is_json=True)
else:
self._response({"status": "not found"}, is_json=True)

def _response(self, body, is_json=False, code=200):
self.send_response(code)
mime = "application/json" if is_json else "text/html"
self.send_header("Content-type", mime)
self.end_headers()
if is_json:
self.wfile.write(json.dumps(body).encode("utf-8"))
else:
self.wfile.write(body)

ROUTES = {"runinfo": get_runinfo, "card": get_card, "data": get_data}


def create_card_server(card_options, port, ctx_obj):
CardViewerRoutes.card_options = card_options
server_addr = ("", port)
ctx_obj.echo(
"Starting card server on port %d for run-id %s"
% (port, str(card_options.run_object.pathspec)),
fg="green",
bold=True,
)
server = ThreadingHTTPServer(server_addr, CardViewerRoutes)
server.serve_forever()
Loading

0 comments on commit 7192efd

Please sign in to comment.