Skip to content

Commit

Permalink
fix(set_workflow_status): validate endpoint arguments (reanahub#589)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdonadoni committed Jun 25, 2024
1 parent 13d1c5d commit aa993b2
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 46 deletions.
27 changes: 21 additions & 6 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -1116,19 +1116,34 @@
"type": "string"
},
{
"description": "Optional. Additional input parameters and operational options for workflow execution. Possible parameters are `CACHE=on/off`, passed to disable caching of results in serial workflows, `all_runs=True/False` deletes all runs of a given workflow if status is set to deleted and `workspace=True/False` which deletes the workspace of a workflow.",
"description": "Optional. Additional input parameters and operational options for workflow execution.",
"in": "body",
"name": "parameters",
"required": false,
"schema": {
"properties": {
"CACHE": {
"type": "string"
"input_parameters": {
"type": "object"
},
"all_runs": {
"type": "boolean"
"operational_options": {
"description": "Possible parameters are `CACHE=on/off`, passed to disable caching of results in serial workflows, `all_runs=True/False` deletes all runs of a given workflow if status is set to deleted and `workspace=True/False` which deletes the workspace of a workflow.",
"properties": {
"CACHE": {
"type": "string"
},
"all_runs": {
"type": "boolean"
},
"workspace": {
"type": "boolean"
}
},
"type": "object"
},
"reana_specification": {
"type": "object"
},
"workspace": {
"restart": {
"type": "boolean"
}
},
Expand Down
22 changes: 11 additions & 11 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ def start_workflow(workflow, parameters):
def _start_workflow_db(workflow, parameters):
workflow.status = RunStatus.pending
if parameters:
workflow.input_parameters = parameters.get("input_parameters")
workflow.operational_options = parameters.get("operational_options")
workflow.input_parameters = parameters["input_parameters"]
workflow.operational_options = parameters["operational_options"]
current_db_sessions.add(workflow)
current_db_sessions.commit()

Expand All @@ -95,15 +95,15 @@ def _start_workflow_db(workflow, parameters):
verb=get_workflow_status_change_verb(workflow.status.name),
status=str(workflow.status.name),
)
if "restart" in parameters.keys():
if parameters["restart"]:
if workflow.status not in [
RunStatus.failed,
RunStatus.finished,
RunStatus.queued,
RunStatus.pending,
]:
raise REANAWorkflowControllerError(failure_message)

if parameters.get("restart"):
if workflow.status not in [
RunStatus.failed,
RunStatus.finished,
RunStatus.queued,
RunStatus.pending,
]:
raise REANAWorkflowControllerError(failure_message)
elif workflow.status not in [RunStatus.created, RunStatus.queued]:
if workflow.status == RunStatus.deleted:
raise REANAWorkflowStatusError(failure_message)
Expand Down
73 changes: 46 additions & 27 deletions reana_workflow_controller/rest/workflows_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import json

from flask import Blueprint, jsonify, request
from webargs import fields, validate
from webargs.flaskparser import use_kwargs, use_args


from reana_commons.config import WORKFLOW_TIME_FORMAT
from reana_commons.errors import REANASecretDoesNotExist
Expand Down Expand Up @@ -314,7 +317,25 @@ def get_workflow_status(workflow_id_or_name): # noqa


@blueprint.route("/workflows/<workflow_id_or_name>/status", methods=["PUT"])
def set_workflow_status(workflow_id_or_name): # noqa
@use_kwargs(
{
"operational_options": fields.Dict(missing={}),
"input_parameters": fields.Dict(missing={}),
"restart": fields.Boolean(),
"reana_specification": fields.Raw(),
},
location="json",
)
@use_kwargs(
{
"user": fields.Str(required=True),
"status": fields.Str(required=True),
},
location="query",
)
def set_workflow_status(
workflow_id_or_name: str, user: str, status: str, **parameters: dict
): # noqa
r"""Set workflow status.
---
Expand Down Expand Up @@ -349,20 +370,31 @@ def set_workflow_status(workflow_id_or_name): # noqa
in: body
description: >-
Optional. Additional input parameters and operational options for
workflow execution. Possible parameters are `CACHE=on/off`, passed
to disable caching of results in serial workflows,
`all_runs=True/False` deletes all runs of a given workflow
if status is set to deleted and `workspace=True/False` which deletes
the workspace of a workflow.
workflow execution.
required: false
schema:
type: object
properties:
CACHE:
type: string
all_runs:
type: boolean
workspace:
operational_options:
description: >-
Possible parameters are `CACHE=on/off`, passed
to disable caching of results in serial workflows,
`all_runs=True/False` deletes all runs of a given workflow
if status is set to deleted and `workspace=True/False` which deletes
the workspace of a workflow.
type: object
properties:
CACHE:
type: string
all_runs:
type: boolean
workspace:
type: boolean
input_parameters:
type: object
reana_specification:
type: object
restart:
type: boolean
responses:
200:
Expand Down Expand Up @@ -456,24 +488,11 @@ def set_workflow_status(workflow_id_or_name): # noqa
"""

try:
user_uuid = request.args["user"]
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid)
status = request.args.get("status")
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user)
if not (status in STATUSES):
return (
jsonify(
{
"message": "Status {0} is not one of: {1}".format(
status, ", ".join(STATUSES)
)
}
),
400,
)
error_msg = f"Status {status} is not one of: {', '.join(STATUSES)}"
return jsonify({"message": error_msg}), 400

parameters = {}
if request.is_json:
parameters = request.json
if status == START:
start_workflow(workflow, parameters)
return (
Expand Down
50 changes: 48 additions & 2 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,10 +923,11 @@ def test_set_workflow_status_unknown_workflow(
url_for(
"statuses.set_workflow_status", workflow_id_or_name=random_workflow_uuid
),
query_string={"user": default_user.id_},
query_string={"user": default_user.id_, "status": payload},
content_type="application/json",
data=json.dumps(payload),
data=json.dumps({}),
)
print(res.json)
assert res.status_code == 404


Expand Down Expand Up @@ -1151,6 +1152,51 @@ def test_start_input_parameters(
assert workflow.input_parameters == parameters["input_parameters"]


def test_start_no_input_parameters(
app,
session,
default_user,
user_secrets,
corev1_api_client_with_user_secrets,
sample_serial_workflow_in_db,
):
"""Test start workflow with inupt parameters."""
workflow = sample_serial_workflow_in_db
workflow_uuid = str(sample_serial_workflow_in_db.id_)

with app.test_client() as client:
# create workflow
workflow.status = RunStatus.created
session.add(workflow)
session.commit()

payload = START
parameters = {"operational_options": {}}
with mock.patch(
"reana_workflow_controller.workflow_run_manager."
"current_k8s_batchv1_api_client"
):
# provide user secret store
with mock.patch(
"reana_commons.k8s.secrets.current_k8s_corev1_api_client",
corev1_api_client_with_user_secrets(user_secrets),
):
# set workflow status to START and pass parameters
res = client.put(
url_for(
"statuses.set_workflow_status",
workflow_id_or_name=workflow_uuid,
),
query_string={"user": default_user.id_, "status": "start"},
content_type="application/json",
data=json.dumps(parameters),
)
json_response = json.loads(res.data.decode())
assert json_response["status"] == status_dict[payload].name
workflow = Workflow.query.filter(Workflow.id_ == workflow_uuid).first()
assert workflow.input_parameters == dict()


def test_start_workflow_db_failure(
app,
session,
Expand Down

0 comments on commit aa993b2

Please sign in to comment.