From 9d46f85019f1bac62317d0dad051e4133c587ede Mon Sep 17 00:00:00 2001 From: madhur-ob <155637867+madhur-ob@users.noreply.github.com> Date: Wed, 18 Sep 2024 22:22:27 +0530 Subject: [PATCH] Revert "ability to fetch existing DeployedFlow (#1955)" (#2040) This reverts commit 7a09f2a12350f542b55397399cac19571adfa41d. --- .../plugins/argo/argo_workflows_deployer.py | 101 +----------------- metaflow/runner/deployer.py | 15 --- 2 files changed, 1 insertion(+), 115 deletions(-) diff --git a/metaflow/plugins/argo/argo_workflows_deployer.py b/metaflow/plugins/argo/argo_workflows_deployer.py index 5510104f130..1a3056bc28d 100644 --- a/metaflow/plugins/argo/argo_workflows_deployer.py +++ b/metaflow/plugins/argo/argo_workflows_deployer.py @@ -1,15 +1,9 @@ -import os, sys -import json +import sys import tempfile from typing import Optional, ClassVar -from metaflow.client.core import get_metadata -from metaflow.exception import MetaflowException -from metaflow.plugins.argo.argo_client import ArgoClient -from metaflow.metaflow_config import KUBERNETES_NAMESPACE from metaflow.plugins.argo.argo_workflows import ArgoWorkflows from metaflow.runner.deployer import ( - Deployer, DeployerImpl, DeployedFlow, TriggeredRun, @@ -18,46 +12,6 @@ ) -def generate_fake_flow_file_contents( - flow_name: str, param_info: dict, project_name: str = None -): - params_code = "" - for _, param_details in param_info.items(): - param_name = param_details["name"] - param_type = param_details["type"] - param_help = param_details["description"] - param_required = param_details["is_required"] - - if param_type == "JSON": - params_code += f" {param_name} = Parameter('{param_name}', type=JSONType, help='{param_help}', required={param_required})\n" - elif param_type == "FilePath": - # TODO: ideally, it should also have info about 'is_text' and 'encoding'.. - # but this is not present in the param_info.. - params_code += f" {param_name} = IncludeFile('{param_name}', help='{param_help}', required={param_required})\n" - else: - params_code += f" {param_name} = Parameter('{param_name}', type={param_type}, help='{param_help}', required={param_required})\n" - - project_decorator = f"@project(name='{project_name}')\n" if project_name else "" - - contents = f"""\ -from metaflow import FlowSpec, Parameter, IncludeFile, JSONType, step, project - -{project_decorator}class {flow_name}(FlowSpec): -{params_code} - @step - def start(self): - self.next(self.end) - - @step - def end(self): - pass - -if __name__ == '__main__': - {flow_name}() -""" - return contents - - def suspend(instance: TriggeredRun, **kwargs): """ Suspend the running workflow. @@ -234,59 +188,6 @@ def delete(instance: DeployedFlow, **kwargs): return command_obj.process.returncode == 0 -def from_deployment(identifier: str, metadata: str = None): - client = ArgoClient(namespace=KUBERNETES_NAMESPACE) - workflow_template = client.get_workflow_template(identifier) - - if workflow_template is None: - raise MetaflowException("No deployed flow found for: %s" % identifier) - - metadata_annotations = workflow_template.get("metadata", {}).get("annotations", {}) - - flow_name = metadata_annotations.get("metaflow/flow_name", "") - parameters = json.loads(metadata_annotations.get("metaflow/parameters", {})) - - # these two only exist if @project decorator is used.. - branch_name = metadata_annotations.get("metaflow/branch_name", None) - project_name = metadata_annotations.get("metaflow/project_name", None) - - project_kwargs = {} - if branch_name is not None: - if branch_name.startswith("prod."): - project_kwargs["production"] = True - project_kwargs["branch"] = branch_name[len("prod.") :] - elif branch_name.startswith("test."): - project_kwargs["branch"] = branch_name[len("test.") :] - elif branch_name == "prod": - project_kwargs["production"] = True - - fake_flow_file_contents = generate_fake_flow_file_contents( - flow_name=flow_name, param_info=parameters, project_name=project_name - ) - - with tempfile.NamedTemporaryFile(suffix=".py", delete=False) as fake_flow_file: - with open(fake_flow_file.name, "w") as fp: - fp.write(fake_flow_file_contents) - - if branch_name is not None: - d = Deployer(fake_flow_file.name, **project_kwargs).argo_workflows() - else: - d = Deployer(fake_flow_file.name).argo_workflows(name=identifier) - - d.name = identifier - d.flow_name = flow_name - - if metadata is None: - d.metadata = get_metadata() - else: - d.metadata = metadata - - df = DeployedFlow(deployer=d) - d._enrich_deployed_flow(df) - - return df - - def trigger(instance: DeployedFlow, **kwargs): """ Trigger a new run for the deployed flow. diff --git a/metaflow/runner/deployer.py b/metaflow/runner/deployer.py index 85ae378e25c..5d19d641b97 100644 --- a/metaflow/runner/deployer.py +++ b/metaflow/runner/deployer.py @@ -271,21 +271,6 @@ class DeployedFlow(object): def __init__(self, deployer: "DeployerImpl"): self.deployer = deployer - self.name = self.deployer.name - self.flow_name = self.deployer.flow_name - self.metadata = self.deployer.metadata - - @staticmethod - def from_deployment( - identifier: str, - metadata: str = None, - type_: str = "argo-workflows", # TODO: use a metaflow config variable for `type_` - ): - if type_ == "argo-workflows": - from metaflow.plugins.argo.argo_workflows_deployer import from_deployment - - return from_deployment(identifier, metadata) - raise NotImplementedError("This method is not available for: %s" % type_) def _enrich_object(self, env): """