diff --git a/docs/openapi.json b/docs/openapi.json index d5747319..abbed2ba 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -3674,6 +3674,11 @@ }, { "description": "Required. New workflow status.", + "enum": [ + "start", + "stop", + "deleted" + ], "in": "query", "name": "status", "required": true, @@ -3687,19 +3692,30 @@ "type": "string" }, { - "description": "Optional. Additional input parameters and operational options.", + "description": "Optional. Additional parameters to customise the change of workflow status.", "in": "body", "name": "parameters", "required": false, "schema": { "properties": { - "CACHE": { - "type": "string" - }, "all_runs": { + "description": "Optional. If true, delete all runs of the workflow. Only allowed when status is `deleted`.", + "type": "boolean" + }, + "input_parameters": { + "description": "Optional. Additional input parameters that override the ones in the workflow specification. Only allowed when status is `start`.", + "type": "object" + }, + "operational_options": { + "description": "Optional. Operational options for workflow execution. Only allowed when status is `start`.", + "type": "object" + }, + "restart": { + "description": "Optional. If true, the workflow is a restart of another one. Only allowed when status is `start`.", "type": "boolean" }, "workspace": { + "description": "Optional, but must be set to true if provided. If true, delete also the workspace of the workflow. Only allowed when status is `deleted`.", "type": "boolean" } }, diff --git a/reana_server/rest/workflows.py b/reana_server/rest/workflows.py index f228ee22..a8b7527f 100644 --- a/reana_server/rest/workflows.py +++ b/reana_server/rest/workflows.py @@ -1131,6 +1131,75 @@ def get_workflow_status(workflow_id_or_name, user): # noqa return jsonify({"message": str(e)}), 500 +def _start_workflow(workflow_id_or_name, user, **parameters): + """Start given workflow by publishing it to the submission queue. + + This function is used by both the `set_workflow_status` and `start_workflow`. + """ + operational_options = parameters.get("operational_options", {}) + input_parameters = parameters.get("input_parameters", {}) + restart = parameters.get("restart", False) + reana_specification = parameters.get("reana_specification") + + try: + if not workflow_id_or_name: + raise ValueError("workflow_id_or_name is not supplied") + + workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_)) + operational_options = validate_operational_options( + workflow.type_, operational_options + ) + + restart_type = None + if restart: + if workflow.status not in [RunStatus.finished, RunStatus.failed]: + raise ValueError("Only finished or failed workflows can be restarted.") + if workflow.workspace_has_pending_retention_rules(): + raise ValueError( + "The workflow cannot be restarted because some retention rules are " + "currently being applied to the workspace. Please retry later." + ) + if reana_specification: + restart_type = reana_specification.get("workflow", {}).get("type", None) + workflow = clone_workflow(workflow, reana_specification, restart_type) + elif workflow.status != RunStatus.created: + raise ValueError( + "Workflow {} is already {} and cannot be started " + "again.".format(workflow.get_full_workflow_name(), workflow.status.name) + ) + if "yadage" in (workflow.type_, restart_type): + _load_and_save_yadage_spec(workflow, operational_options) + + validate_workflow( + workflow.reana_specification, input_parameters=input_parameters + ) + + # when starting the workflow, the scheduler will call RWC's `set_workflow_status` + # with the given `parameters` + publish_workflow_submission(workflow, user.id_, parameters) + response = { + "message": "Workflow submitted.", + "workflow_id": workflow.id_, + "workflow_name": workflow.name, + "status": RunStatus.queued.name, + "run_number": workflow.run_number, + "user": str(user.id_), + } + return response, 200 + except HTTPError as e: + logging.error(traceback.format_exc()) + return e.response.json(), e.response.status_code + except (REANAValidationError, ValidationError) as e: + logging.error(traceback.format_exc()) + return {"message": str(e)}, 400 + except ValueError as e: + logging.error(traceback.format_exc()) + return {"message": str(e)}, 403 + except Exception as e: + logging.error(traceback.format_exc()) + return {"message": str(e)}, 500 + + @blueprint.route("/workflows//start", methods=["POST"]) @signin_required() @use_kwargs( @@ -1302,74 +1371,25 @@ def start_workflow(workflow_id_or_name, user, **parameters): # noqa "message": "Status resume is not supported yet." } """ - - operational_options = parameters.get("operational_options", {}) - input_parameters = parameters.get("input_parameters", {}) - restart = parameters.get("restart", False) - reana_specification = parameters.get("reana_specification") - - try: - if not workflow_id_or_name: - raise ValueError("workflow_id_or_name is not supplied") - - workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, str(user.id_)) - operational_options = validate_operational_options( - workflow.type_, operational_options - ) - - restart_type = None - if restart: - if workflow.status not in [RunStatus.finished, RunStatus.failed]: - raise ValueError("Only finished or failed workflows can be restarted.") - if workflow.workspace_has_pending_retention_rules(): - raise ValueError( - "The workflow cannot be restarted because some retention rules are " - "currently being applied to the workspace. Please retry later." - ) - if reana_specification: - restart_type = reana_specification.get("workflow", {}).get("type", None) - workflow = clone_workflow(workflow, reana_specification, restart_type) - elif workflow.status != RunStatus.created: - raise ValueError( - "Workflow {} is already {} and cannot be started " - "again.".format(workflow.get_full_workflow_name(), workflow.status.name) - ) - if "yadage" in (workflow.type_, restart_type): - _load_and_save_yadage_spec(workflow, operational_options) - - validate_workflow( - workflow.reana_specification, input_parameters=input_parameters - ) - - # when starting the workflow, the scheduler will call RWC's `set_workflow_status` - # with the given `parameters` - publish_workflow_submission(workflow, user.id_, parameters) - response = { - "message": "Workflow submitted.", - "workflow_id": workflow.id_, - "workflow_name": workflow.name, - "status": RunStatus.queued.name, - "run_number": workflow.run_number, - "user": str(user.id_), - } - return jsonify(response), 200 - except HTTPError as e: - logging.error(traceback.format_exc()) - return jsonify(e.response.json()), e.response.status_code - except (REANAValidationError, ValidationError) as e: - logging.error(traceback.format_exc()) - return jsonify({"message": str(e)}), 400 - except ValueError as e: - logging.error(traceback.format_exc()) - return jsonify({"message": str(e)}), 403 - except Exception as e: - logging.error(traceback.format_exc()) - return jsonify({"message": str(e)}), 500 + response, status_code = _start_workflow(workflow_id_or_name, user, **parameters) + return jsonify(response), status_code @blueprint.route("/workflows//status", methods=["PUT"]) @signin_required() -def set_workflow_status(workflow_id_or_name, user): # noqa +@use_kwargs( + { + "status": fields.Str(required=True, location="query"), + # parameters for "start" + "input_parameters": fields.Dict(location="json"), + "operational_options": fields.Dict(location="json"), + "restart": fields.Boolean(location="json"), + # parameters for "deleted" + "all_runs": fields.Boolean(location="json"), + "workspace": fields.Boolean(location="json"), + } +) +def set_workflow_status(workflow_id_or_name, user, status, **parameters): # noqa r"""Set workflow status. --- put: @@ -1393,6 +1413,10 @@ def set_workflow_status(workflow_id_or_name, user): # noqa description: Required. New workflow status. required: true type: string + enum: + - start + - stop + - deleted - name: access_token in: query description: The API access_token of workflow owner. @@ -1401,18 +1425,37 @@ def set_workflow_status(workflow_id_or_name, user): # noqa - name: parameters in: body description: >- - Optional. Additional input parameters and operational options. + Optional. Additional parameters to customise the change of workflow status. required: false schema: type: object properties: - CACHE: - type: string + operational_options: + description: >- + Optional. Operational options for workflow execution. + Only allowed when status is `start`. + type: object + input_parameters: + description: >- + Optional. Additional input parameters that override the ones + in the workflow specification. Only allowed when status is `start`. + type: object + restart: + description: >- + Optional. If true, the workflow is a restart of another one. + Only allowed when status is `start`. + type: boolean all_runs: + description: >- + Optional. If true, delete all runs of the workflow. + Only allowed when status is `deleted`. type: boolean workspace: + description: >- + Optional, but must be set to true if provided. + If true, delete also the workspace of the workflow. + Only allowed when status is `deleted`. type: boolean - responses: 200: description: >- @@ -1528,7 +1571,20 @@ def set_workflow_status(workflow_id_or_name, user): # noqa try: if not workflow_id_or_name: raise ValueError("workflow_id_or_name is not supplied") - status = request.args.get("status") + + if status == "start": + # We can't call directly RWC when starting a workflow, as otherwise + # the workflow would skip the queue. Instead, we do what the + # `start_workflow` endpoint does. + response, status_code = _start_workflow( + workflow_id_or_name, user, **parameters + ) + if "run_number" in response: + # run_number is returned by `start_workflow`, + # but not by `set_status_workflow` + del response["run_number"] + return jsonify(response), status_code + parameters = request.json if request.is_json else None response, http_response = current_rwc_api_client.api.set_workflow_status( user=str(user.id_), diff --git a/tests/test_views.py b/tests/test_views.py index d9f0c98a..6d6afa93 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -319,7 +319,7 @@ def test_get_workflow_status(app, default_user, _get_user_mock): def test_set_workflow_status(app, default_user, _get_user_mock): - """Test get_workflow_logs view.""" + """Test set_workflow_status view.""" with app.test_client() as client: with patch( "reana_server.rest.workflows.current_rwc_api_client", @@ -341,7 +341,7 @@ def test_set_workflow_status(app, default_user, _get_user_mock): headers={"Content-Type": "application/json"}, query_string={"access_token": default_user.access_token}, ) - assert res.status_code == 500 + assert res.status_code == 422 res = client.put( url_for("workflows.set_workflow_status", workflow_id_or_name="1"),