Skip to content

Commit

Permalink
Revert "ability to fetch existing DeployedFlow (Netflix#1955)" (Netfl…
Browse files Browse the repository at this point in the history
…ix#2040)

This reverts commit 7a09f2a.
  • Loading branch information
madhur-ob authored Sep 18, 2024
1 parent 1e2bb00 commit 9d46f85
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 115 deletions.
101 changes: 1 addition & 100 deletions metaflow/plugins/argo/argo_workflows_deployer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
import os, sys
import json
import sys
import tempfile
from typing import Optional, ClassVar

from metaflow.client.core import get_metadata
from metaflow.exception import MetaflowException
from metaflow.plugins.argo.argo_client import ArgoClient
from metaflow.metaflow_config import KUBERNETES_NAMESPACE
from metaflow.plugins.argo.argo_workflows import ArgoWorkflows
from metaflow.runner.deployer import (
Deployer,
DeployerImpl,
DeployedFlow,
TriggeredRun,
Expand All @@ -18,46 +12,6 @@
)


def generate_fake_flow_file_contents(
flow_name: str, param_info: dict, project_name: str = None
):
params_code = ""
for _, param_details in param_info.items():
param_name = param_details["name"]
param_type = param_details["type"]
param_help = param_details["description"]
param_required = param_details["is_required"]

if param_type == "JSON":
params_code += f" {param_name} = Parameter('{param_name}', type=JSONType, help='{param_help}', required={param_required})\n"
elif param_type == "FilePath":
# TODO: ideally, it should also have info about 'is_text' and 'encoding'..
# but this is not present in the param_info..
params_code += f" {param_name} = IncludeFile('{param_name}', help='{param_help}', required={param_required})\n"
else:
params_code += f" {param_name} = Parameter('{param_name}', type={param_type}, help='{param_help}', required={param_required})\n"

project_decorator = f"@project(name='{project_name}')\n" if project_name else ""

contents = f"""\
from metaflow import FlowSpec, Parameter, IncludeFile, JSONType, step, project
{project_decorator}class {flow_name}(FlowSpec):
{params_code}
@step
def start(self):
self.next(self.end)
@step
def end(self):
pass
if __name__ == '__main__':
{flow_name}()
"""
return contents


def suspend(instance: TriggeredRun, **kwargs):
"""
Suspend the running workflow.
Expand Down Expand Up @@ -234,59 +188,6 @@ def delete(instance: DeployedFlow, **kwargs):
return command_obj.process.returncode == 0


def from_deployment(identifier: str, metadata: str = None):
client = ArgoClient(namespace=KUBERNETES_NAMESPACE)
workflow_template = client.get_workflow_template(identifier)

if workflow_template is None:
raise MetaflowException("No deployed flow found for: %s" % identifier)

metadata_annotations = workflow_template.get("metadata", {}).get("annotations", {})

flow_name = metadata_annotations.get("metaflow/flow_name", "")
parameters = json.loads(metadata_annotations.get("metaflow/parameters", {}))

# these two only exist if @project decorator is used..
branch_name = metadata_annotations.get("metaflow/branch_name", None)
project_name = metadata_annotations.get("metaflow/project_name", None)

project_kwargs = {}
if branch_name is not None:
if branch_name.startswith("prod."):
project_kwargs["production"] = True
project_kwargs["branch"] = branch_name[len("prod.") :]
elif branch_name.startswith("test."):
project_kwargs["branch"] = branch_name[len("test.") :]
elif branch_name == "prod":
project_kwargs["production"] = True

fake_flow_file_contents = generate_fake_flow_file_contents(
flow_name=flow_name, param_info=parameters, project_name=project_name
)

with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as fake_flow_file:
with open(fake_flow_file.name, "w") as fp:
fp.write(fake_flow_file_contents)

if branch_name is not None:
d = Deployer(fake_flow_file.name, **project_kwargs).argo_workflows()
else:
d = Deployer(fake_flow_file.name).argo_workflows(name=identifier)

d.name = identifier
d.flow_name = flow_name

if metadata is None:
d.metadata = get_metadata()
else:
d.metadata = metadata

df = DeployedFlow(deployer=d)
d._enrich_deployed_flow(df)

return df


def trigger(instance: DeployedFlow, **kwargs):
"""
Trigger a new run for the deployed flow.
Expand Down
15 changes: 0 additions & 15 deletions metaflow/runner/deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,21 +271,6 @@ class DeployedFlow(object):

def __init__(self, deployer: "DeployerImpl"):
self.deployer = deployer
self.name = self.deployer.name
self.flow_name = self.deployer.flow_name
self.metadata = self.deployer.metadata

@staticmethod
def from_deployment(
identifier: str,
metadata: str = None,
type_: str = "argo-workflows", # TODO: use a metaflow config variable for `type_`
):
if type_ == "argo-workflows":
from metaflow.plugins.argo.argo_workflows_deployer import from_deployment

return from_deployment(identifier, metadata)
raise NotImplementedError("This method is not available for: %s" % type_)

def _enrich_object(self, env):
"""
Expand Down

0 comments on commit 9d46f85

Please sign in to comment.