Skip to content

Commit

Permalink
Merge pull request #764 from Project-MONAI/samrooke/AC-2027-update-ar…
Browse files Browse the repository at this point in the history
…go-plugin-to-override-resources

AC-2127: update argo plugin for resources override
  • Loading branch information
samrooke authored Apr 25, 2023
2 parents 864bfd0 + cf87de7 commit 4c630ca
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 105 deletions.
56 changes: 21 additions & 35 deletions docs/setup/mwm-workflow-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ A workflow is a standard template that contains a list of tasks that can be ran

The first task to be ran, will always be the first task in the list. The next task/tasks to be ran must be listed in the [Task Destinations](#task-destinations) of the task. A workflow requires at least one task.

Workflows can be created or updated via the [Workflow API](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/docs/api/rest/workflow.md).
Workflows can be created or updated via the [Workflow API](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/docs/api/rest/workflow.md).

# Contents

Expand Down Expand Up @@ -142,7 +142,7 @@ The following is an example of the structure of a workflow.
The following is an example of a complete workflow:
![scenario1](../images/workflow_examples/scenario1.png)

An example of a workflow with two tasks:
An example of a workflow with two tasks:

1. Argo task
2. Export Task
Expand Down Expand Up @@ -240,7 +240,7 @@ It also defines the "PROD_PACS" output destination, meaning that it can be used:
Tasks are the basic building block of a workflow. They are provided as a list - the first Task in the list is executed when the workflow is triggered.
Subsequent tasks are triggered by the `task_destinations` specified by previous tasks.

# Task Object
# Task Object

### Task Types
These tasks are borken down into different types:
Expand Down Expand Up @@ -292,8 +292,8 @@ The following are examples of the task json structure including required args fo
]
},
"task_destinations": [
{
"name": "export-task-id"
{
"name": "export-task-id"
}
]
}
Expand Down Expand Up @@ -364,7 +364,7 @@ Depending of the type of task, the task object may contain additional fields.
Router tasks don't have additional fields. They are used to contain `task_destinations` so that workflow processing can be directed to the desired next step.

#### Export
These are task types that allow for artifacts to be exported based on the input artifacts list. This task type should not have Out artifacts listed.
These are task types that allow for artifacts to be exported based on the input artifacts list. This task type should not have Out artifacts listed.
The task also requires these extra attributes:-

| Property | Type | Description |
Expand Down Expand Up @@ -403,7 +403,7 @@ Example (output sent to another task if the patient is female, otherwise to PACS
Export destinations define an external location to which the output of the task can be sent. This will take the form of an event published to a pub/sub service notifying of an available export to a specific destination reference. Most commonly, the export location will be a PACs system and the notification will be picked up by the Monai Informatics Gateway.

#### Plugin
These are tasks are Named the same as the installed Pluging.
These are tasks are Named the same as the installed Pluging.
The task also requires these extra attributes:-

| Property | Type | Description |
Expand All @@ -414,7 +414,7 @@ The task also requires these extra attributes:-
The args requirements for argo plugin can be found [here](#argo).

### Task Arguments
Each task plugin requires specific arguments to be provided in the args dictionary. This allows all task types to support as many additional values as necessary without the need to bloat the workflow spec.
Each task plugin requires specific arguments to be provided in the args dictionary. This allows all task types to support as many additional values as necessary without the need to bloat the workflow spec.

#### Argo
The Argo plugin triggers workflows pre-deployed onto an [Argo workflow server](https://argoproj.github.io/argo-events/).
Expand All @@ -425,25 +425,11 @@ The Task's "args" object should contain the following fields:

| Property | Type | Required | Description |
|------|------|------|------|
|workflow_template_name|str|Yes|The ID of this workflow as registered on the Argo server.|
|namespace|str|Yes|The namespace of the argo workflow.|
|server_url|url|Yes|The URL of the Argo server.|
|allow_insecure|bool|No|Allow insecure connections to argo from the plug-in.|
|parameters|dictionary|No|Key value pairs, Argo parameters that will be passed on to the Argo workflow.|
|priority_class|string|No|The name of a valid Kubernetes priority class to be assigned to the Argo workflow pods|
|resources|dictionary|No|A resource requests & limits object (see below). These will be applied to the Argo workflow pods|

##### Resource Request Object

Resource request parameters should be included in the task args object dictionary, as a string dictionary. The resources dictionary and all included values below are optional.

| Property | Type | Description |
|------|------|------|
|memory_reservation|str|A valid [Kubernetes memory request value](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-memory).|
|cpu_reservation|url|A valid [Kubernetes CPU request value](https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#meaning-of-cpu).|
|gpu_limit|dictionary|The number of GPUs to be used by this task.|
|memory_limit|string|The maximum amount of memory this task may use|
|cpu_limit|object|The maximum amount of CPU this task may use. See |
|workflow_template_name|string|Yes|The ID of this workflow as registered on the Argo server.|
|priority_class|string|No|The name of a valid Kubernetes priority class to be assigned to the Argo workflow pods.|
|gpu_required|string|No|Whether a GPU is to be used by this task.|
|memory_gb|string|No|The maximum amount of memory in gigabytes this task may use.|
|cpu|string|No|The maximum amount of CPU this task may use.|

For more information about Kubernetes requests & limits, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/.

Expand Down Expand Up @@ -501,7 +487,7 @@ As you can see in the example below, input artifacts require a _value_. This is

#### DICOM Input

If payload DICOM inputs are to be used in a given task, the value of the input must be `context.input.dicom`. This will to resolve to the `{payloadId}/dcm` folder within Minio / S3.
If payload DICOM inputs are to be used in a given task, the value of the input must be `context.input.dicom`. This will to resolve to the `{payloadId}/dcm` folder within Minio / S3.

Example:
```json
Expand Down Expand Up @@ -700,11 +686,11 @@ The following examples both function the same and act as an AND condition.
## Evaluators
Conditional evaluators are logical statement strings that may be used to determine which tasks are executed. They can make use of the execution context _metadata_ and dicom tags. All conditions must evaluate to true in order for the task to be triggered.

[A detailed breakdown of conditional logic can be found here.](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/guidelines/mwm-conditionals.md)
[A detailed breakdown of conditional logic can be found here.](https://github.com/Project-MONAI/monai-deploy-workflow-manager/blob/develop/guidelines/mwm-conditionals.md)

### Supported Evaulators


Conditional evaluators should support evaluating workflow variables against predefined values with the following operators:

< (Valid for integers)
Expand Down Expand Up @@ -765,9 +751,9 @@ Example (status):

#### Result Metadata & Execution Stats - Using Dictionary Values

The Result Metadata and Execution Stats are populated by the plugin and are added to the workflow instance once a task is completed to provide some output of a task. Each plugin will have its own implementation to populate the result metadata.
The Result Metadata and Execution Stats are populated by the plugin and are added to the workflow instance once a task is completed to provide some output of a task. Each plugin will have its own implementation to populate the result metadata.

Because `result` and `execution_stats` are a dictionary, the section after `context.executions.task_id.result` or `context.executions.task_id.execution_stats` is the key to be checked in the result/execution_stats dictionary.
Because `result` and `execution_stats` are a dictionary, the section after `context.executions.task_id.result` or `context.executions.task_id.execution_stats` is the key to be checked in the result/execution_stats dictionary.

For conditional statements, the key specified is case sensitive and must match exactly to the key which has been output by the model and saved in the result/execution_stats dictionary.

Expand Down Expand Up @@ -807,9 +793,9 @@ The result metadata for an Argo task is populated by a `metadata.json` that is i
}
```

If metadata is to be used in a conditional the `metadata.json` must be present somewhere in the output directory and a valid JSON dictionary. It will automatically be imported if it is in the directory.
If metadata is to be used in a conditional the `metadata.json` must be present somewhere in the output directory and a valid JSON dictionary. It will automatically be imported if it is in the directory.

An example format of the metadata.json can be found below:
An example format of the metadata.json can be found below:

execution stats are populated from the argo execution values returned automatically.

Expand Down Expand Up @@ -916,4 +902,4 @@ Name:
Description:
```python
{{context.workflow.description}} == 'This workflow is a valid workflow'
```
```
2 changes: 1 addition & 1 deletion src/Shared/Shared/ValidationConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static class ValidationConstants
/// <summary>
/// Key for the GPU.
/// </summary>
public static readonly string Gpu = "gpu";
public static readonly string GpuRequired = "gpu_required";

public enum ModeValues
{
Expand Down
56 changes: 9 additions & 47 deletions src/TaskManager/Plug-ins/Argo/ArgoPlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -432,46 +432,23 @@ private async Task<Workflow> BuildWorkflowWrapper(CancellationToken cancellation
private void ProcessTaskPluginArguments(Workflow workflow)
{
Guard.Against.Null(workflow);

var resources = Event.GetTaskPluginArgumentsParameter<Dictionary<string, string>>(Keys.ArgoResource);
var priorityClassName = Event.GetTaskPluginArgumentsParameter(Keys.TaskPriorityClassName) ?? "standard";
var argoParameters = Event.GetTaskPluginArgumentsParameter<Dictionary<string, string>>(Keys.ArgoParameters);

if (argoParameters is not null)
{
foreach (var item in argoParameters)
{
if (workflow.Spec.Arguments is null)
{
workflow.Spec.Arguments = new Arguments();
}

if (workflow.Spec.Arguments.Parameters is null)
{
workflow.Spec.Arguments.Parameters = new List<Parameter>();
}

workflow.Spec.Arguments.Parameters.Add(new Parameter() { Name = item.Key, Value = item.Value });
}
}

foreach (var template in workflow.Spec.Templates)
{
AddLimit(resources, template, ResourcesKeys.CpuLimit);
AddLimit(resources, template, ResourcesKeys.MemoryLimit);
AddRequest(resources, template, ResourcesKeys.CpuReservation);
AddRequest(resources, template, ResourcesKeys.MemoryReservation);
AddRequest(resources, template, ResourcesKeys.GpuLimit);
AddLimit(template, ResourcesKeys.CpuLimit);
AddLimit(template, ResourcesKeys.MemoryLimit);
AddLimit(template, ResourcesKeys.GpuLimit);
template.PriorityClassName = priorityClassName;
}
workflow.Spec.PodPriorityClassName = priorityClassName;
}

private static void AddLimit(Dictionary<string, string>? resources, Template2 template, ResourcesKey key)
private void AddLimit(Template2 template, ResourcesKey key)
{
Guard.Against.Null(template);
Guard.Against.Null(key);
if (template.Container is null || resources is null || !resources.TryGetValue(key.TaskKey, out var value))
if (template.Container is null || !Event.TaskPluginArguments.TryGetValue(key.TaskKey, out var value) || string.IsNullOrWhiteSpace(value))
{
return;
}
Expand All @@ -484,28 +461,13 @@ private static void AddLimit(Dictionary<string, string>? resources, Template2 te
template.Container.Resources.Limits = new Dictionary<string, string>();
}

template.Container.Resources.Limits.Add(key.ArgoKey, value);
}

private static void AddRequest(Dictionary<string, string>? resources, Template2 template, ResourcesKey key)
{
Guard.Against.Null(template);
Guard.Against.Null(key);
if (template.Container is null || resources is null || !resources.TryGetValue(key.TaskKey, out var value) || string.IsNullOrWhiteSpace(value))
// Convert true / false value to 0 or 1 for number of GPU
if (key.TaskKey == ResourcesKeys.GpuLimit.TaskKey)
{
return;
value = bool.TryParse(value, out bool gpuRequired) && gpuRequired ? "1" : "0";
}

if (template.Container.Resources is null)
{
template.Container.Resources = new ResourceRequirements();
}
if (template.Container.Resources.Requests is null)
{
template.Container.Resources.Requests = new Dictionary<string, string>();
}

template.Container.Resources.Requests.Add(key.ArgoKey, value);
template.Container.Resources.Limits.Add(key.ArgoKey, value);
}

private async Task AddMainWorkflowTemplate(Workflow workflow, CancellationToken cancellationToken)
Expand Down
6 changes: 3 additions & 3 deletions src/TaskManager/Plug-ins/Argo/StaticValues/ResourcesKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ public static class ResourcesKeys

public static readonly ResourcesKey CpuReservation = new() { TaskKey = "cpu_reservation", ArgoKey = "requests.cpu" };

public static readonly ResourcesKey GpuLimit = new() { TaskKey = "gpu_limit", ArgoKey = "nvidia.com/gpu" };
public static readonly ResourcesKey GpuLimit = new() { TaskKey = "gpu_required", ArgoKey = "nvidia.com/gpu" };

public static readonly ResourcesKey MemoryLimit = new() { TaskKey = "memory_limit", ArgoKey = "limits.memory" };
public static readonly ResourcesKey MemoryLimit = new() { TaskKey = "memory_gb", ArgoKey = "limits.memory" };

public static readonly ResourcesKey CpuLimit = new() { TaskKey = "cpu_limit", ArgoKey = "limits.cpu" };
public static readonly ResourcesKey CpuLimit = new() { TaskKey = "cpu", ArgoKey = "limits.cpu" };
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -353,21 +353,18 @@ private void ValidateArgoTask(TaskObject currentTask)
{
if (
currentTask.Args.TryGetValue(key, out var val) &&
!string.IsNullOrEmpty(val) &&
double.TryParse(val, out double parsedVal) &&
(parsedVal < 1 || Math.Truncate(parsedVal) != parsedVal))
(string.IsNullOrEmpty(val) ||
(double.TryParse(val, out double parsedVal) && (parsedVal < 1 || Math.Truncate(parsedVal) != parsedVal))))
{
Errors.Add($"Task: '{currentTask.Id}' value '{val}' provided for argument '{key}' is not valid. The value needs to be a whole number greater than 0.");
}
});

if (
currentTask.Args.TryGetValue(Gpu, out var gpu) &&
!string.IsNullOrEmpty(gpu) &&
double.TryParse(gpu, out double parsedGpu) &&
(parsedGpu != 0 || parsedGpu != 1))
currentTask.Args.TryGetValue(GpuRequired, out var gpuRequired) &&
(string.IsNullOrEmpty(gpuRequired) || !bool.TryParse(gpuRequired, out var _)))
{
Errors.Add($"Task: '{currentTask.Id}' value '{gpu}' provided for argument '{Gpu}' is not valid. The value needs to be 0 or 1.");
Errors.Add($"Task: '{currentTask.Id}' value '{gpuRequired}' provided for argument '{GpuRequired}' is not valid. The value needs to be 'true' or 'false'.");
}
}

Expand Down
15 changes: 6 additions & 9 deletions tests/UnitTests/TaskManager.Argo.Tests/ArgoPluginTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ public async Task ArgoPlugin_ExecuteTask_WorkflowTemplates(string filename, int
Assert.NotNull(argoTemplate);

var message = GenerateTaskDispatchEventWithValidArguments(withoutDefaultArguments);
message.TaskPluginArguments["resources"] = "{\"memory_reservation\": \"string\",\"cpu_reservation\": \"string\",\"gpu_limit\": 1,\"memory_limit\": \"string\",\"cpu_limit\": \"string\"}";
message.TaskPluginArguments["gpu_required"] = "true";
message.TaskPluginArguments["memory_gb"] = "1";
message.TaskPluginArguments["cpu"] = "1";
message.TaskPluginArguments["priority"] = "Helo";
Workflow? submittedArgoTemplate = null;

Expand Down Expand Up @@ -367,18 +369,13 @@ private static void ValidateSimpleTemplate(TaskDispatchEvent message, Workflow w
{
Assert.True(template.Container.Resources is not null);
Assert.True(template.Container.Resources?.Limits is not null);
Assert.True(template.Container.Resources?.Requests is not null);
var value = "";

Assert.True(template.Container.Resources?.Requests?.TryGetValue("requests.memory", out value));
Assert.True(value == "string");
Assert.True(template.Container.Resources?.Requests?.TryGetValue("requests.cpu", out value));
Assert.True(value == "string");
Assert.True(template.Container.Resources?.Limits?.TryGetValue("limits.memory", out value));
Assert.True(value == "string");
Assert.True(value == "1");
Assert.True(template.Container.Resources?.Limits?.TryGetValue("limits.cpu", out value));
Assert.True(value == "string");
Assert.True(template.Container.Resources?.Requests?.TryGetValue("nvidia.com/gpu", out value));
Assert.True(value == "1");
Assert.True(template.Container.Resources?.Limits?.TryGetValue("nvidia.com/gpu", out value));
Assert.True(value == "1");

Assert.True(template.PriorityClassName == "Helo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public async Task ValidateWorkflow_ValidatesAWorkflow_ReturnsErrorsAndHasCorrect
{ "example", "value" },
{ "cpu", "0.1" },
{ "memory_gb", "0.1" },
{ "gpu", "2" }
{ "gpu_required", "2" }
},
TaskDestinations = new TaskDestination[]
{
Expand Down Expand Up @@ -392,7 +392,7 @@ public async Task ValidateWorkflow_ValidatesAWorkflow_ReturnsErrorsAndHasCorrect
var invalidArgoArg2 = "Task: 'test-argo-task' value '0.1' provided for argument 'memory_gb' is not valid. The value needs to be a whole number greater than 0.";
Assert.Contains(invalidArgoArg2, errors);

var invalidArgoArg3 = "Task: 'test-argo-task' value '2' provided for argument 'gpu' is not valid. The value needs to be 0 or 1.";
var invalidArgoArg3 = "Task: 'test-argo-task' value '2' provided for argument 'gpu_required' is not valid. The value needs to be 'true' or 'false'.";
Assert.Contains(invalidArgoArg3, errors);

var incorrectClinicalReviewValueFormat = $"Invalid Value property on input artifact 'Invalid Value Format' in task: 'test-clinical-review'. Incorrect format.";
Expand Down

0 comments on commit 4c630ca

Please sign in to comment.