diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 40fb289a9..a2d49afbe 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -147,7 +147,6 @@ jobs: for i in */; do tar -czvf "../release-snacks/${i%/}.tar.gz" "$i" & done; wait cd .. && sudo rm -rf download-artifact/ cp flyte_tests_manifest.json release-snacks/flyte_tests_manifest.json - python flyte_tests_generator.py >> flyte_tests.txt cp flyte_tests.txt release-snacks/flyte_tests.txt - name: Release test manifest uses: goreleaser/goreleaser-action@v2 @@ -219,8 +218,9 @@ jobs: with: repository: flyteorg/flytesnacks path: flytesnacks - - name: Generate tests - run: python flyte_tests_generator.py >> flyte_tests.txt + - name: Validate if the tests exist + run: | + python flyte_tests_validate.py - name: Register specific tests run: | while read -r line; @@ -234,6 +234,3 @@ jobs: --version ${{ env.FLYTESNACKS_VERSION }} \ flytesnacks/$line; done < flyte_tests.txt - - name: End2End - run: | - make end2end_execute diff --git a/examples/advanced_composition/advanced_composition/merge_sort.py b/examples/advanced_composition/advanced_composition/merge_sort.py index 342ff3306..55d28e194 100644 --- a/examples/advanced_composition/advanced_composition/merge_sort.py +++ b/examples/advanced_composition/advanced_composition/merge_sort.py @@ -29,6 +29,7 @@ # %% [markdown] # A simple split function that divides a list into two halves. + # %% @task def split(numbers: typing.List[int]) -> Tuple[typing.List[int], typing.List[int], int, int]: @@ -65,6 +66,7 @@ def merge(sorted_list1: typing.List[int], sorted_list2: typing.List[int]) -> typ # Generally speaking, the algorithm will recurse through the list, splitting it in half until it reaches a size that we # know is efficient enough to run locally. At which point it'll just use the python-builtin sorted function. + # %% [markdown] # This runs the sorting completely locally. It's faster and more efficient to do so if the entire list fits in memory. # %% diff --git a/examples/advanced_composition/advanced_composition/subworkflows.py b/examples/advanced_composition/advanced_composition/subworkflows.py index 07f4efa7e..be4eff792 100644 --- a/examples/advanced_composition/advanced_composition/subworkflows.py +++ b/examples/advanced_composition/advanced_composition/subworkflows.py @@ -87,6 +87,7 @@ def parent_wf(a: int) -> Tuple[int, str, str]: if __name__ == "__main__": print(f"Running parent_wf(a=3) {parent_wf(a=3)}") + # %% [markdown] # Interestingly, we can nest a workflow that has a subworkflow within a workflow. # Workflows can be simply composed from other workflows, even if they are standalone entities. Each of the @@ -164,6 +165,7 @@ def ext_workflow(my_input: str) -> Dict: "parent_workflow_execution", ) + # %% [markdown] # Define another task that returns the repeated keys (in our case, words) from a dictionary. # %% diff --git a/flyte_tests.txt b/flyte_tests.txt new file mode 100644 index 000000000..5de785ff6 --- /dev/null +++ b/flyte_tests.txt @@ -0,0 +1,17 @@ +examples/advanced_composition/advanced_composition/chain_entities.py +examples/advanced_composition/advanced_composition/conditions.py +examples/advanced_composition/advanced_composition/decorating_tasks.py +examples/advanced_composition/advanced_composition/decorating_workflows.py +examples/advanced_composition/advanced_composition/dynamics.py +examples/advanced_composition/advanced_composition/map_task.py +examples/advanced_composition/advanced_composition/waiting_for_external_inputs.py +examples/basics/basics/documenting_workflows.py +examples/basics/basics/hello_world.py +examples/basics/basics/named_outputs.py +examples/basics/basics/shell_task.py +examples/basics/basics/workflow.py +examples/data_types_and_io/data_types_and_io/dataclass.py +examples/data_types_and_io/data_types_and_io/enum_type.py +examples/data_types_and_io/data_types_and_io/file.py +examples/data_types_and_io/data_types_and_io/folder.py +examples/data_types_and_io/data_types_and_io/structured_dataset.py diff --git a/flyte_tests_generator.py b/flyte_tests_generator.py deleted file mode 100644 index dc35fb135..000000000 --- a/flyte_tests_generator.py +++ /dev/null @@ -1,14 +0,0 @@ -import glob -import itertools - -directories_to_walk = [ - "examples/basics/basics", - "examples/data_types_and_io/data_types_and_io", - "examples/advanced_composition/advanced_composition", -] - -all_tests = [] -for each_directory in directories_to_walk: - all_tests.append(glob.glob(f"{each_directory}/*.py")) - -print("\n".join(list(itertools.chain(*all_tests)))) diff --git a/flyte_tests_manifest.json b/flyte_tests_manifest.json index b0a756d9d..8e5590c37 100644 --- a/flyte_tests_manifest.json +++ b/flyte_tests_manifest.json @@ -3,6 +3,57 @@ "name": "core", "priority": "P0", "path": "core", + "examples": [ + ["advanced_composition.chain_entities.chain_workflows_wf", {}], + ["advanced_composition.conditions.consume_outputs", { "my_input": 10.0 }], + ["advanced_composition.decorating_tasks.wf", { "x": 10 }], + ["advanced_composition.decorating_workflows.wf", { "x": 19.8 }], + ["advanced_composition.dynamics.wf", { "s1": "Pear", "s2": "Earth" }], + [ + "advanced_composition.map_task.my_map_workflow", + { "a": [1, 2, 3, 4, 5] } + ], + [ + "advanced_composition.waiting_for_external_inputs.sleep_wf", + { "num": 5 } + ], + ["basics.documenting_workflows.sphinx_docstring_wf", {}], + ["basics.hello_world.hello_world_wf", {}], + ["basics.named_outputs.simple_wf_with_named_outputs", {}], + ["basics.shell_task.shell_task_wf", {}], + ["basics.workflow.simple_wf", { "x": [1, 2, 3], "y": [1, 2, 3] }], + ["data_types_and_io.dataclass.dataclass_wf", { "x": 10, "y": 20 }], + ["data_types_and_io.enum_type.coffee_maker", { "coffee": "latte" }], + [ + "data_types_and_io.file.normalize_csv_file", + { + "csv_url": "https://people.sc.fsu.edu/~jburkardt/data/csv/biostats.csv", + "column_names": [ + "Name", + "Sex", + "Age", + "Heights (in)", + "Weight (lbs)" + ], + "columns_to_normalize": ["Age"] + } + ], + [ + "data_types_and_io.folder.download_and_normalize_csv_files", + { + "csv_urls": [ + "https://people.sc.fsu.edu/~jburkardt/data/csv/biostats.csv", + "https://people.sc.fsu.edu/~jburkardt/data/csv/faithful.csv" + ], + "columns_metadata": [ + ["Name", "Sex", "Age", "Heights (in)", "Weight (lbs)"], + ["Index", "Eruption length (mins)", "Eruption wait (mins)"] + ], + "columns_to_normalize_metadata": [["Age"], ["Eruption length (mins)"]] + } + ], + ["data_types_and_io.structured_dataset.simple_sd_wf", { "a": 42 }] + ], "exitCondition": { "exit_success": true, "exit_message": "" @@ -21,6 +72,12 @@ "name": "integrations-k8s-spark", "priority": "P1", "path": "examples/k8s_spark_plugin", + "examples": [ + [ + "k8s_spark_plugin.pyspark_pi.my_spark", + { "triggered_date": "2023-11-21T18:58:01" } + ] + ], "exitCondition": { "exit_success": true, "exit_message": "" @@ -30,6 +87,7 @@ "name": "integrations-kfpytorch", "priority": "P1", "path": "examples/kfpytorch_plugin", + "examples": [["kfpytorch_plugin.pytorch_mnist.pytorch_training_wf", {}]], "exitCondition": { "exit_success": true, "exit_message": "" @@ -39,15 +97,9 @@ "name": "integrations-kftensorflow", "priority": "P1", "path": "examples/kftensorflow_plugin", - "exitCondition": { - "exit_success": true, - "exit_message": "" - } - }, - { - "name": "integrations-pod", - "priority": "P1", - "path": "examples/k8s_pod_plugin", + "examples": [ + ["kftensorflow_plugin.tf_mnist.mnist_tensorflow_workflow", {}] + ], "exitCondition": { "exit_success": true, "exit_message": "" @@ -57,15 +109,7 @@ "name": "integrations-pandera", "priority": "P1", "path": "examples/pandera_plugin", - "exitCondition": { - "exit_success": true, - "exit_message": "" - } - }, - { - "name": "integrations-whylogs", - "priority": "P1", - "path": "examples/whylogs_plugin", + "examples": [["pandera_plugin.basic_schema_example.process_data", {}]], "exitCondition": { "exit_success": true, "exit_message": "" @@ -75,6 +119,7 @@ "name": "integrations-modin", "priority": "P1", "path": "examples/modin_plugin", + "examples": [["modin_plugin.knn_classifier.pipeline", {}]], "exitCondition": { "exit_success": true, "exit_message": "" @@ -84,6 +129,9 @@ "name": "integrations-papermill", "priority": "P1", "path": "examples/papermill_plugin", + "examples": [ + ["papermill_plugin.simple.nb_to_python_wf", { "f": 3.1415926535 }] + ], "exitCondition": { "exit_success": true, "exit_message": "" @@ -93,24 +141,12 @@ "name": "integrations-greatexpectations", "priority": "P1", "path": "examples/greatexpectations_plugin", - "exitCondition": { - "exit_success": true, - "exit_message": "" - } - }, - { - "name": "integrations-sagemaker-pytorch", - "priority": "P1", - "path": "examples/sagemaker_pytorch_plugin", - "exitCondition": { - "exit_success": true, - "exit_message": "" - } - }, - { - "name": "integrations-sagemaker-training", - "priority": "P1", - "path": "examples/sagemaker_training_plugin", + "examples": [ + ["greatexpectations_plugin.task_example.simple_wf", {}], + ["greatexpectations_plugin.task_example.file_wf", {}], + ["greatexpectations_plugin.task_example.schema_wf", {}], + ["greatexpectations_plugin.task_example.runtime_wf", {}] + ], "exitCondition": { "exit_success": true, "exit_message": "" diff --git a/flyte_tests_validate.py b/flyte_tests_validate.py new file mode 100644 index 000000000..c66c77384 --- /dev/null +++ b/flyte_tests_validate.py @@ -0,0 +1,60 @@ +import json +import os +import subprocess +import re + +file_list = "flyte_tests.txt" + +with open("flyte_tests_manifest.json", "r") as file: + data = json.load(file) + +examples = [(example[0], example[1]) for entry in data for example in entry.get("examples", []) if len(example) >= 1] + +for file_name in open(file_list, "r").readlines(): + file_name = file_name.strip() + print(f"Processing file: {file_name}") + + # Retrieve the file path, including the name of the file and its immediate parent directory + directory_path = os.path.dirname(file_name).split(os.path.sep)[-1:] + file_path = ".".join(directory_path + [os.path.splitext(os.path.basename(file_name))[0]]) + + # Verify if there are any workflows present in the provided file path + workflows = [] + for workflow, params in examples: + if file_path in workflow: + workflows.append((workflow, params)) + else: + raise Exception("No workflows are defined in the given file.") + + for workflow, params in workflows: + # Use the `pyflyte run` command to execute the workflow + output_string = subprocess.run(["pyflyte", "run", file_name], capture_output=True, text=True).stdout + + # Define a regular expression pattern to match tasks/workflows in the pyflyte run output + pattern = re.compile(r"^\│\s+(\w+)\s+", re.MULTILINE) + + # Extract command names using the specified pattern + commands = re.findall(pattern, output_string) + + # Check if the workflow specified is present in the pyflyte run output + just_the_workflow = workflow.split(".")[2] + if just_the_workflow in commands: + print("Workflow found in the pyflyte run output!") + else: + raise Exception("Workflow not found in the pyflyte run output.") + + # Check if the specified parameters are valid + options_output = subprocess.run( + ["pyflyte", "run", file_name, just_the_workflow, "--help"], capture_output=True, text=True + ).stdout + + # Find all matches in the input string + options = [option.replace("--", "") for option in re.compile(r"--\w+").findall(options_output)] + + # Validate if the provided params are a subset of the supported params + if set(params).issubset(set(options)): + print("All parameters found!") + else: + raise Exception( + f"There's a mismatch between the values accepted by the workflow and the ones you provided." + )