diff --git a/.azure-pipelines/web_classification_pf_in_aml_pipeline_workflow.yml b/.azure-pipelines/web_classification_pf_in_aml_pipeline_workflow.yml new file mode 100644 index 000000000..eb201c3a8 --- /dev/null +++ b/.azure-pipelines/web_classification_pf_in_aml_pipeline_workflow.yml @@ -0,0 +1,26 @@ +parameters: + - name: env_name + displayName: "Execution Environment" + default: "dev" + - name: use_case_base_path + displayName: "Base path of model to execute" + default: "web_classification" + +stages: + - stage: execute_training_job + displayName: execute_training_job + jobs: + - job: Execute_ml_Job_Pipeline + steps: + - template: templates/get_connection_details.yml + + - template: templates/configure_azureml_agent.yml + + - template: templates/execute_python_code.yml + parameters: + step_name: "Execute PF IN AML Pipeline" + script_parameter: | + python -m pf_aml_pipeline.promptflow_in_aml_pipeline \ + --subscription_id "$(SUBSCRIPTION_ID)" \ + --env_name ${{ parameters.env_name }} \ + --base_path ${{ parameters.use_case_base_path }} diff --git a/.github/workflows/web_classification_pf_in_aml_pipeline_workflow.yml b/.github/workflows/web_classification_pf_in_aml_pipeline_workflow.yml new file mode 100644 index 000000000..5e74eac4f --- /dev/null +++ b/.github/workflows/web_classification_pf_in_aml_pipeline_workflow.yml @@ -0,0 +1,61 @@ +name: web_classification_pf_in_aml_pipeline_workflow.yml + +on: + workflow_call: + inputs: + env_name: + type: string + description: "Execution Environment" + required: true + default: "dev" + use_case_base_path: + type: string + description: "The base path of the flow use-case to execute" + required: true + default: "web_classification" + secrets: + azure_credentials: + description: "service principal authentication to Azure" + required: true +jobs: + flow-experiment-and_evaluation: + name: prompt flow experiment and evaluation job in Azure ML + runs-on: ubuntu-latest + environment: + name: ${{ inputs.env_name }} + env: + RESOURCE_GROUP_NAME: ${{ vars.RESOURCE_GROUP_NAME }} + WORKSPACE_NAME: ${{ vars.WORKSPACE_NAME }} + COMPUTE_TARGET: ${{ vars.COMPUTE_TARGET }} + steps: + - name: Checkout Actions + uses: actions/checkout@v4 + + - name: Azure login + uses: azure/login@v1 + with: + creds: ${{ secrets.azure_credentials }} + + - name: Configure Azure ML Agent + uses: ./.github/actions/configure_azureml_agent + + - name: load the current Azure subscription details + id: subscription_details + shell: bash + run: | + export subscriptionId=$(az account show --query id -o tsv) + echo "SUBSCRIPTION_ID=$subscriptionId" >> $GITHUB_OUTPUT + + #===================================== + # Run Promptflow in AML Pipeline + #===================================== + - name: Run Promptflow in AML Pipeline + uses: ./.github/actions/execute_script + with: + step_name: "Run Promptflow in AML Pipeline" + script_parameter: | + python -m pf_aml_pipeline.promptflow_in_aml_pipeline \ + --subscription_id ${{ steps.subscription_details.outputs.SUBSCRIPTION_ID }} \ + --env_name ${{ inputs.env_name || 'dev' }} \ + --base_path ${{ inputs.use_case_base_path || 'web_classification'}} \ + diff --git a/docs/Azure_devops_how_to_setup.md b/docs/Azure_devops_how_to_setup.md index bbba4ecc0..1bbd49c74 100644 --- a/docs/Azure_devops_how_to_setup.md +++ b/docs/Azure_devops_how_to_setup.md @@ -244,6 +244,7 @@ Create a new variable group `llmops_platform_dev_vg` ([follow the documentation] - **rg_name**: Name of the resource group containing the Azure ML Workspace - **ws_name**: Name of the Azure ML Workspace - **kv_name**: Name of the Key Vault associated with the Azure ML Workspace +- **COMPUTE_TARGET**: Name of the compute cluster used in the Azure ML Workspace (Note: this is only needed if you are executing the Promptflow in AML Pipeline) ![Variable group](./images/variable-group.png) @@ -326,9 +327,10 @@ As a result the code for LLMOps Prompt flow template will now be available in Az 6. Create two Azure Pipelines [[how to create a basic Azure Pipeline](https://learn.microsoft.com/en-us/azure/devops/pipelines/create-first-pipeline?view=azure-devops&tabs)] for each scenario (e.g. named_entity_recognition). Both Azure Pipelines should be created based on existing YAML files: -- The first one is based on the [named_entity_recognition_pr_dev_pipeline.yml](../named_entity_recognition/.azure-pipelines/named_entity_recognition_pr_dev_pipeline.yml), and it helps to maintain code quality for all PRs including integration tests for the Azure ML experiment. Usually, we recommend to have a toy dataset for the integration tests to make sure that the Prompt flow job can be completed fast enough - there is not a goal to check prompt quality and we just need to make sure that our job can be executed. + - The first one is based on the [named_entity_recognition_pr_dev_pipeline.yml](../named_entity_recognition/.azure-pipelines/named_entity_recognition_pr_dev_pipeline.yml), and it helps to maintain code quality for all PRs including integration tests for the Azure ML experiment. Usually, we recommend to have a toy dataset for the integration tests to make sure that the Prompt flow job can be completed fast enough - there is not a goal to check prompt quality and we just need to make sure that our job can be executed. + + - The second Azure Pipeline is based on [named_entity_recognition_ci_dev_pipeline.yml](../named_entity_recognition/.azure-pipelines/named_entity_recognition_ci_dev_pipeline.yml) is executed automatically once new PR has been merged into the *development* or *main* branch. The main idea of this pipeline is to execute bulk run, evaluation on the full dataset for all prompt variants. Both the workflow can be modified and extended based on the project's requirements. -- The second Azure Pipeline is based on [named_entity_recognition_ci_dev_pipeline.yml](../named_entity_recognition/.azure-pipelines/named_entity_recognition_ci_dev_pipeline.yml) is executed automatically once new PR has been merged into the *development* or *main* branch. The main idea of this pipeline is to execute bulk run, evaluation on the full dataset for all prompt variants. Both the workflow can be modified and extended based on the project's requirements. These following steps should be executed twice - once for PR pipeline and again for CI pipeline. @@ -372,6 +374,13 @@ From your Azure DevOps project, select `Repos -> Branches -> more options button More details about how to create a policy can be found [here](https://learn.microsoft.com/en-us/azure/devops/repos/git/branch-policies?view=azure-devops&tabs=browser). + +## Steps for executing the Promptflow in AML Pipeline + + There is another azure devops pipeline added :[web_classification_pf_in_aml_pipeline_workflow.yml](../.azure-pipelines/web_classification_pf_in_aml_pipeline_workflow.yml) + - It is used to run the promptflow in AML Pipeline as a parallel component. + - You can use this to run other use cases as well, all you need to do is change the use_case_base_path to other use cases, like math_coding, named_entity_recognition. + ## Test the pipelines From local machine, create a new git branch `featurebranch` from `development` branch. @@ -482,7 +491,12 @@ This Azure DevOps CI pipelines contains the following steps: **Run Prompts in Flow** - Upload bulk run dataset - Bulk run prompt flow based on dataset. -- Bulk run each prompt variant +- Bulk run each prompt variant + +**Run promptflow in AML Pipeline as parallel component** +- It reuses the already registered data assets for input. +- Runs the promptflow in AML Pipeline as a parallel component, where we can control the concurrency and parallelism of the promptflow execution. For more details refer [here](https://microsoft.github.io/promptflow/tutorials/pipeline.html). +- The output of the promptflow is stored in the Azure ML workspace. **Evaluate Results** - Upload ground test dataset @@ -516,4 +530,4 @@ This Azure DevOps CI pipelines contains the following steps: The example scenario can be run and deployed both for Dev environments. When you are satisfied with the performance of the prompt evaluation pipeline, Prompt flow model, and deployment in development, additional pipelines similar to `dev` pipelines can be replicated and deployed in the Production environment. -The sample Prompt flow run & evaluation and Azure DevOps pipelines can be used as a starting point to adapt your own prompt engineering code and data. \ No newline at end of file +The sample Prompt flow run & evaluation and Azure DevOps pipelines can be used as a starting point to adapt your own prompt engineering code and data. diff --git a/docs/github_workflows_how_to_setup.md b/docs/github_workflows_how_to_setup.md index 8a3dfd8ec..defb40693 100644 --- a/docs/github_workflows_how_to_setup.md +++ b/docs/github_workflows_how_to_setup.md @@ -133,6 +133,8 @@ principalId="$(echo $um_details | jq -r '.[2]')" ```bash az role assignment create --assignee $principalId --role "AzureML Data Scientist" --scope "/subscriptions/$subscriptionId/resourcegroups/$rgname/providers/Microsoft.MachineLearningServices/workspaces/$workspace_name" ``` +You need to give additional `Azure ML Operator` permissions to the user managed identity for accessing the workspace, if you are using promptflow in AML Pipeline. +Note: this will not work in serverless. You shall need a compute cluster. 8. Grant the user managed identity permission to access the workspace keyvault (get and list) @@ -243,7 +245,7 @@ From your GitHub project, select **Settings** -> **Secrets and variables**, ** ## Set up GitHub variables for each environment -There are 3 variables expected as GitHub variables: `RESOURCE_GROUP_NAME`, `WORKSPACE_NAME` and `KEY_VAULT_NAME`. These values are environment specific, so we utilize the `Environments` feature in GitHub. +There are 3 variables expected as GitHub variables: `RESOURCE_GROUP_NAME`, `WORKSPACE_NAME` and `KEY_VAULT_NAME`. These values are environment specific, so we utilize the `Environments` feature in GitHub. An additional variable name `COMPUTE_TARGET` is needed to use promptflow in AML Pipeline. From your GitHub project, select **Settings** -> **Environments**, select "New environment" and call it `dev` ![Screenshot of GitHub environments.](images/github-environments-new-env.png) @@ -274,6 +276,12 @@ The configuration for connection used while authoring the repo: ![connection details](images/connection-details.png) +## Steps for executing the Promptflow in AML Pipeline + There is another github workflow added [web_classification_pf_in_aml_pipeline_workflow.yml](../.github/workflows/web_classification_pf_in_aml_pipeline_workflow.yml) peline. + - It is used to run the promptflow in AML Pipeline as a parallel component. + - You can use this to run other use cases as well, all you need to do is change the use_case_base_path to other use cases, like math_coding, named_entity_recognition. + + ## Set up Secrets in GitHub ### Prompt flow Connection @@ -462,6 +470,11 @@ This Github CI workflow contains the following steps: - Execute the evaluation flow on the production log dataset - Generate the evaluation report +**Run promptflow in AML Pipeline as parallel component** +- It reuses the already registered data assets for input. +- Runs the promptflow in AML Pipeline as a parallel component, where we can control the concurrency and parallelism of the promptflow execution. For more details refer [here](https://microsoft.github.io/promptflow/tutorials/pipeline.html). +- The output of the promptflow is stored in the Azure ML workspace. + ### Online Endpoint 1. After the CI pipeline for an example scenario has run successfully, depending on the configuration it will either deploy to diff --git a/llmops/common/experiment_cloud_config.py b/llmops/common/experiment_cloud_config.py index 2b0c37a35..ef830d816 100644 --- a/llmops/common/experiment_cloud_config.py +++ b/llmops/common/experiment_cloud_config.py @@ -53,6 +53,7 @@ def __init__( resource_group_name: Optional[str] = None, workspace_name: Optional[str] = None, env_name: Optional[str] = None, + compute_target: Optional[str] = None, ): self.subscription_id = subscription_id or _try_get_env_var("SUBSCRIPTION_ID") self.resource_group_name = resource_group_name or _try_get_env_var( @@ -60,3 +61,4 @@ def __init__( ) self.workspace_name = workspace_name or _try_get_env_var("WORKSPACE_NAME") self.environment_name = env_name or _get_optional_env_var("ENV_NAME") + self.compute_target = compute_target or _get_optional_env_var("COMPUTE_TARGET") diff --git a/pf_aml_pipeline/components/postprocess.py b/pf_aml_pipeline/components/postprocess.py new file mode 100644 index 000000000..44ea46d89 --- /dev/null +++ b/pf_aml_pipeline/components/postprocess.py @@ -0,0 +1,38 @@ +import argparse + +import pandas as pd +from pathlib import Path + +PF_OUTPUT_FILE_NAME = "parallel_run_step.jsonl" +def parse_args(): + """ + Parses the user arguments. + + Returns: + argparse.Namespace: The parsed user arguments. + """ + parser = argparse.ArgumentParser( + allow_abbrev=False, description="parse user arguments" + ) + parser.add_argument("--input_data_path", type=str) + + args, _ = parser.parse_known_args() + return args + + +def main(): + """ + The main function that orchestrates the data preparation process. + """ + args = parse_args() + + # Read promptflow output file and do some postprocessing + input_data_path = args.input_data_path + '/' + PF_OUTPUT_FILE_NAME + with open((Path(input_data_path)), 'r') as file: + promptflow_output = pd.read_json(file, lines=True) + print(promptflow_output.head()) + + return + +if __name__ == "__main__": + main() diff --git a/pf_aml_pipeline/components/preprocess.py b/pf_aml_pipeline/components/preprocess.py new file mode 100644 index 000000000..5285fdfdf --- /dev/null +++ b/pf_aml_pipeline/components/preprocess.py @@ -0,0 +1,47 @@ +import argparse + +import pandas as pd + + +def parse_args(): + """ + Parses the user arguments. + + Returns: + argparse.Namespace: The parsed user arguments. + """ + parser = argparse.ArgumentParser( + allow_abbrev=False, description="parse user arguments" + ) + parser.add_argument("--max_records", type=int, default=1) + parser.add_argument("--input_data_path", type=str) + parser.add_argument("--output_data_path", type=str) + + args, _ = parser.parse_known_args() + return args + + +def main(): + """ + The main function that orchestrates the data preparation process. + """ + args = parse_args() + print("Maximum records to keep", args.max_records) + + input_data_path = args.input_data_path + input_data_df = pd.read_json(input_data_path, lines=True) + + # take only max_records from input_data_df + input_data_df = input_data_df.head(args.max_records) + + # Write input_data_df to a jsonl file + input_data_df.to_json( + args.output_data_path, orient="records", lines=True + ) + print("Successfully written filtered data") + + return + + +if __name__ == "__main__": + main() diff --git a/pf_aml_pipeline/promptflow_in_aml_pipeline.py b/pf_aml_pipeline/promptflow_in_aml_pipeline.py new file mode 100644 index 000000000..fa7817be7 --- /dev/null +++ b/pf_aml_pipeline/promptflow_in_aml_pipeline.py @@ -0,0 +1,225 @@ +import argparse +import datetime +from typing import Optional +from dotenv import load_dotenv + + +from azure.ai.ml import Input, MLClient, Output, command, dsl, load_component +from azure.ai.ml.constants import AssetTypes, InputOutputModes +from azure.identity import DefaultAzureCredential +from llmops.common.experiment_cloud_config import ExperimentCloudConfig +from llmops.common.experiment import load_experiment +from llmops.common.logger import llmops_logger + +logger = llmops_logger("promptflow_in_aml_pipeline") + +pipeline_components = [] + +def create_dynamic_evaluation_pipeline( + pipeline_name, + input_data_path, +): + """ + Construct evaluation pipeline definition dynamically for a specific app and evaluator. + + Args: + pipeline_name (str): Name of the pipeline. + """ + + @dsl.pipeline( + name=pipeline_name, + input_data_path=input_data_path, + ) + def evaluation_pipeline(name: str, input_data_path: str): + + preprocess_input_path = Input( + path=input_data_path, + type=AssetTypes.URI_FILE, + mode=InputOutputModes.RO_MOUNT, + ) + + preprocess = pipeline_components[0]( + input_data_path=preprocess_input_path, max_records=2 + ) + + experiment = pipeline_components[1]( + data=preprocess.outputs.output_data_path, + url="${data.url}", + ) + + postprocess = pipeline_components[2]( + input_data_path=experiment.outputs.flow_outputs, + ) + + return evaluation_pipeline + + +def build_pipeline(pipeline_name: str, flow_path: str, input_data_path: str): + """ + Constructs an Azure Machine Learning pipeline. It encapsulates the process of defining pipeline inputs, + loading pipeline components from YAMLs, configuring component environments settings, configuring pipeline settings etc. + + Args: + pipeline_name (str): Name of the pipeline. + + Returns: + PipelineJob: Azure Machine Learning pipeline job. + """ + preprocess_component = command( + name="./components/preprocess", + display_name="Data preparation for Promptflow in a pipeline experiment", + description="Reads the input data and prepares it for the Promptflow experiment", + inputs={ + "input_data_path": Input(path="string", type="uri_file", mode="ro_mount"), + "max_records": Input(type="number"), + }, + outputs={ + "output_data_path": Output(type="uri_file", mode="rw_mount"), + }, + # The source folder of the component + code="./pf_aml_pipeline/components/", + command="""python preprocess.py \ + --input_data_path "${{inputs.input_data_path}}" \ + --max_records "${{inputs.max_records}}" \ + --output_data_path "${{outputs.output_data_path}}" \ + """, + environment="azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1", # TODO FIXME + ) + # This step loads the promptflow in the pipeline as a component + evaluation_promptflow_component = load_component( + flow_path, + ) + postprocess_component = command( + name="postprocess", + display_name="Post processing for Promptflow in a pipeline experiment", + description="Reads the output of the Promptflow experiment and does some post processing.", + inputs={ + "input_data_path": Input(type="uri_folder", mode="rw_mount"), + }, + # The source folder of the component + code="./pf_aml_pipeline/components/", + command="""python postprocess.py \ + --input_data_path "${{inputs.input_data_path}}" \ + """, + environment="azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1", + ) + pipeline_components.append(preprocess_component) + pipeline_components.append(evaluation_promptflow_component) + pipeline_components.append(postprocess_component) + + pipeline_definition = create_dynamic_evaluation_pipeline( + pipeline_name=pipeline_name, + input_data_path=input_data_path, + ) + + return pipeline_definition + + +def prepare_and_execute( + exp_filename: Optional[str] = None, + base_path: Optional[str] = None, + subscription_id: Optional[str] = None, + env_name: Optional[str] = None, +): + """ + Run the experimentation loop by executing standard flows. + + reads latest experiment data assets. + identifies all variants across all nodes. + executes the flow creating a new job using + unique variant combination across nodes. + saves the results in both csv and html format. + saves the job ids in text file for later use. + + Returns: + None + """ + config = ExperimentCloudConfig(subscription_id=subscription_id, env_name=env_name) + experiment = load_experiment( + filename=exp_filename, base_path=base_path, env=config.environment_name + ) + + flow_detail = experiment.get_flow_detail() + + logger.info(f"Running experiment {experiment.name}") + for mapped_dataset in experiment.datasets: + logger.info(f"Using dataset {mapped_dataset.dataset.source}") + dataset = mapped_dataset.dataset + + ml_client = MLClient( + DefaultAzureCredential(), subscription_id, config.resource_group_name, config.workspace_name + ) + + experiment_name = f"{experiment.name}_{env_name}" + input_data_uri_file = ml_client.data.get(name=dataset.name, label="latest") + + flow_path = f"{flow_detail.flow_path}/flow.dag.yaml" + build_pipeline("mypipeline", flow_path, input_data_uri_file) + + pipeline_definition = build_pipeline( + pipeline_name="mypipeline", + flow_path=flow_path, + input_data_path=input_data_uri_file, + ) + + pipeline_job = pipeline_definition(name="mypipeline", input_data_path=input_data_uri_file) + pipeline_job.settings.default_compute = config.compute_target + # Execute the ML Pipeline + job = ml_client.jobs.create_or_update( + pipeline_job, + experiment_name=experiment_name, + ) + + ml_client.jobs.stream(name=job.name) + + +def main(): + """ + main() function to run experiment or evaluations. + + Returns: + None + """ + parser = argparse.ArgumentParser("prompt_bulk_run") + parser.add_argument( + "--file", + type=str, + help="The experiment file. Default is 'experiment.yaml'", + required=False, + default="experiment.yaml", + ) + + parser.add_argument( + "--subscription_id", + type=str, + help="Subscription ID, overrides the SUBSCRIPTION_ID environment variable", + default=None, + ) + parser.add_argument( + "--base_path", + type=str, + help="Base path of the use case", + required=True, + ) + parser.add_argument( + "--env_name", + type=str, + help="environment name(dev, test, prod) for execution and deployment, overrides the ENV_NAME environment variable", + default=None, + ) + + args = parser.parse_args() + + prepare_and_execute( + args.file, + args.base_path, + args.subscription_id, + args.env_name, + ) + + +if __name__ == "__main__": + # Load variables from .env file into the environment + load_dotenv(override=True) + + main() \ No newline at end of file