Skip to content

Commit

Permalink
models: refactor WorkflowStatus to RunStatus
Browse files Browse the repository at this point in the history
models: refactor interactive session types
  • Loading branch information
audrium committed Oct 9, 2020
1 parent 84ce25b commit c7a623d
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 110 deletions.
16 changes: 8 additions & 8 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
calculate_job_input_hash,
)
from reana_db.database import Session
from reana_db.models import Job, JobCache, Workflow, WorkflowStatus
from reana_db.models import Job, JobCache, Workflow, RunStatus
from sqlalchemy.orm.attributes import flag_modified

from reana_workflow_controller.config import (
Expand Down Expand Up @@ -74,7 +74,7 @@ def on_message(self, body, message):
)
next_status = body_dict.get("status")
if next_status:
next_status = WorkflowStatus(next_status)
next_status = RunStatus(next_status)
print(
" [x] Received workflow_uuid: {0} status: {1}".format(
workflow_uuid, next_status
Expand Down Expand Up @@ -107,20 +107,20 @@ def _update_workflow_status(workflow, status, logs):
if workflow.git_ref:
_update_commit_status(workflow, status)
alive_statuses = [
WorkflowStatus.created,
WorkflowStatus.running,
WorkflowStatus.queued,
RunStatus.created,
RunStatus.running,
RunStatus.queued,
]
if status not in alive_statuses:
_delete_workflow_engine_pod(workflow)


def _update_commit_status(workflow, status):
if status == WorkflowStatus.finished:
if status == RunStatus.finished:
state = "success"
elif status == WorkflowStatus.failed:
elif status == RunStatus.failed:
state = "failed"
elif status == WorkflowStatus.stopped or status == WorkflowStatus.deleted:
elif status == RunStatus.stopped or status == RunStatus.deleted:
state = "canceled"
else:
state = "running"
Expand Down
36 changes: 18 additions & 18 deletions reana_workflow_controller/rest/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from flask import jsonify
from git import Repo
from reana_db.database import Session
from reana_db.models import Job, JobCache, Workflow, WorkflowStatus
from reana_db.models import Job, JobCache, Workflow, RunStatus
from reana_workflow_controller.config import REANA_GITLAB_HOST, WORKFLOW_TIME_FORMAT
from reana_workflow_controller.errors import (
REANAExternalCallError,
Expand All @@ -47,7 +47,7 @@ def start_workflow(workflow, parameters):
"""Start a workflow."""

def _start_workflow_db(workflow, parameters):
workflow.status = WorkflowStatus.running
workflow.status = RunStatus.running
if parameters:
workflow.input_parameters = parameters.get("input_parameters")
workflow.operational_options = parameters.get("operational_options")
Expand All @@ -67,13 +67,13 @@ def _start_workflow_db(workflow, parameters):
if "restart" in parameters.keys():
if parameters["restart"]:
if workflow.status not in [
WorkflowStatus.failed,
WorkflowStatus.finished,
WorkflowStatus.queued,
RunStatus.failed,
RunStatus.finished,
RunStatus.queued,
]:
raise REANAWorkflowControllerError(failure_message)
elif workflow.status not in [WorkflowStatus.created, WorkflowStatus.queued]:
if workflow.status == WorkflowStatus.deleted:
elif workflow.status not in [RunStatus.created, RunStatus.queued]:
if workflow.status == RunStatus.deleted:
raise REANAWorkflowStatusError(failure_message)
raise REANAWorkflowControllerError(failure_message)

Expand Down Expand Up @@ -104,10 +104,10 @@ def _start_workflow_db(workflow, parameters):

def stop_workflow(workflow):
"""Stop a given workflow."""
if workflow.status == WorkflowStatus.running:
if workflow.status == RunStatus.running:
kwrm = KubernetesWorkflowRunManager(workflow)
kwrm.stop_batch_workflow_run()
workflow.status = WorkflowStatus.stopped
workflow.status = RunStatus.stopped
Session.add(workflow)
Session.commit()
else:
Expand Down Expand Up @@ -186,12 +186,12 @@ def remove_workflow_jobs_from_cache(workflow):
def delete_workflow(workflow, all_runs=False, hard_delete=False, workspace=False):
"""Delete workflow."""
if workflow.status in [
WorkflowStatus.created,
WorkflowStatus.finished,
WorkflowStatus.stopped,
WorkflowStatus.deleted,
WorkflowStatus.failed,
WorkflowStatus.queued,
RunStatus.created,
RunStatus.finished,
RunStatus.stopped,
RunStatus.deleted,
RunStatus.failed,
RunStatus.queued,
]:
try:
to_be_deleted = [workflow]
Expand All @@ -200,7 +200,7 @@ def delete_workflow(workflow, all_runs=False, hard_delete=False, workspace=False
Session.query(Workflow)
.filter(
Workflow.name == workflow.name,
Workflow.status != WorkflowStatus.running,
Workflow.status != RunStatus.running,
)
.all()
)
Expand Down Expand Up @@ -229,7 +229,7 @@ def delete_workflow(workflow, all_runs=False, hard_delete=False, workspace=False
except Exception as e:
logging.error(traceback.format_exc())
return jsonify({"message": str(e)}), 500
elif workflow.status == WorkflowStatus.running:
elif workflow.status == RunStatus.running:
raise REANAWorkflowDeletionError(
"Workflow {0}.{1} cannot be deleted as it"
" is currently running.".format(workflow.name, workflow.run_number)
Expand All @@ -244,7 +244,7 @@ def _delete_workflow_row_from_db(workflow):

def _mark_workflow_as_deleted_in_db(workflow):
"""Mark workflow as deleted."""
workflow.status = WorkflowStatus.deleted
workflow.status = RunStatus.deleted
current_db_sessions = Session.object_session(workflow)
current_db_sessions.add(workflow)
current_db_sessions.commit()
Expand Down
8 changes: 1 addition & 7 deletions reana_workflow_controller/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from reana_db.models import (
User,
Workflow,
WorkflowStatus,
RunStatus,
InteractiveSession,
WorkflowSession,
)
Expand Down Expand Up @@ -213,13 +213,7 @@ def get_workflows(paginate=None): # noqa
if search:
query = query.filter(Workflow.name.ilike("%{}%".format(search)))
if status_list:
<<<<<<< HEAD
workflow_status = [
WorkflowStatus[status] for status in status_list.split(",")
]
=======
workflow_status = [RunStatus[status] for status in status_list.split(",")]
>>>>>>> 518e94d... models: refacor on recent interactive session model changes
query = query.filter(Workflow.status.in_(workflow_status))
if sort not in ["asc", "desc"]:
sort = "desc"
Expand Down
8 changes: 4 additions & 4 deletions reana_workflow_controller/rest/workflows_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@


from flask import Blueprint, jsonify, request
from reana_commons.config import INTERACTIVE_SESSION_TYPES
from reana_db.utils import _get_workflow_with_uuid_or_name
from reana_db.models import WorkflowSession
from reana_db.models import WorkflowSession, InteractiveSessionType

from reana_workflow_controller.workflow_run_manager import KubernetesWorkflowRunManager

Expand Down Expand Up @@ -109,13 +108,14 @@ def open_interactive_session(workflow_id_or_name, interactive_session_type): #
Request failed. Internal controller error.
"""
try:
if interactive_session_type not in INTERACTIVE_SESSION_TYPES:
if interactive_session_type not in InteractiveSessionType.__members__:
return (
jsonify(
{
"message": "Interactive session type {0} not found, try "
"with one of: {1}".format(
interactive_session_type, INTERACTIVE_SESSION_TYPES
interactive_session_type,
[e.name for e in InteractiveSessionType],
)
}
),
Expand Down
37 changes: 17 additions & 20 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from kubernetes.client.rest import ApiException
from reana_commons.config import (
CVMFS_REPOSITORIES,
INTERACTIVE_SESSION_TYPES,
K8S_CERN_EOS_AVAILABLE,
REANA_COMPONENT_NAMING_SCHEME,
REANA_COMPONENT_PREFIX,
Expand Down Expand Up @@ -46,13 +45,8 @@
Job,
JobStatus,
InteractiveSession,
<<<<<<< HEAD
UserWorkflowSession,
WorkflowStatus,
=======
WorkflowSession,
InteractiveSessionType,
RunStatus,
>>>>>>> 518e94d... models: refacor on recent interactive session model changes
)

from reana_workflow_controller.errors import (
Expand Down Expand Up @@ -280,7 +274,7 @@ def start_interactive_session(self, interactive_session_type, **kwargs):
"""
action_completed = True
try:
if interactive_session_type not in INTERACTIVE_SESSION_TYPES:
if interactive_session_type not in InteractiveSessionType.__members__:
raise REANAInteractiveSessionError(
"Interactive type {} does not exist.".format(
interactive_session_type
Expand All @@ -302,6 +296,19 @@ def start_interactive_session(self, interactive_session_type, **kwargs):
instantiate_chained_k8s_objects(
kubernetes_objects, REANA_RUNTIME_KUBERNETES_NAMESPACE
)

# Save interactive session to the database
int_session = InteractiveSession(
name=workflow_run_name,
path=access_path,
type_=interactive_session_type,
owner_id=self.workflow.owner_id,
)
self.workflow.sessions.append(int_session)
current_db_sessions = Session.object_session(self.workflow)
current_db_sessions.add(self.workflow)
current_db_sessions.commit()

return access_path

except KeyError:
Expand All @@ -323,19 +330,9 @@ def start_interactive_session(self, interactive_session_type, **kwargs):
)
finally:
if not action_completed and kubernetes_objects:
return delete_k8s_objects_if_exist(
delete_k8s_objects_if_exist(
kubernetes_objects, REANA_RUNTIME_KUBERNETES_NAMESPACE
)
int_session = InteractiveSession(
name=workflow_run_name,
path=access_path,
type_=interactive_session_type,
owner_id=self.workflow.owner_id,
)
self.workflow.sessions.append(int_session)
current_db_sessions = Session.object_session(self.workflow)
current_db_sessions.add(self.workflow)
current_db_sessions.commit()

def stop_interactive_session(self, interactive_session_id):
"""Stop an interactive workflow run."""
Expand All @@ -362,7 +359,7 @@ def stop_interactive_session(self, interactive_session_id):
)
finally:
if action_completed:
int_session.status = WorkflowStatus.stopped
int_session.status = RunStatus.stopped
current_db_sessions = Session.object_session(self.workflow)
current_db_sessions.add(int_session)
current_db_sessions.commit()
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ python-dateutil==2.8.1 # via alembic, bravado, bravado-core, kubernetes
python-editor==1.0.4 # via alembic
pytz==2020.1 # via bravado-core, fs
pyyaml==5.3.1 # via bravado, bravado-core, kubernetes, reana-commons, swagger-spec-validator
reana-commons[kubernetes]==0.8.0a2 # via reana-db, reana-workflow-controller (setup.py)
reana-db==0.8.0a3 # via reana-workflow-controller (setup.py)
reana-commons[kubernetes]==0.8.0a3 # via reana-db, reana-workflow-controller (setup.py)
reana-db==0.8.0a5 # via reana-workflow-controller (setup.py)
requests-oauthlib==1.3.0 # via kubernetes
requests==2.20.0 # via bravado, kubernetes, reana-workflow-controller (setup.py), requests-oauthlib
rfc3987==1.3.8 # via jsonschema
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@
"jsonpickle>=0.9.6",
"marshmallow>2.13.0,<=2.20.1",
"packaging>=18.0",
"reana-commons[kubernetes]>=0.8.0a2,<0.9.0",
"reana-db>=0.8.0a3,<0.9.0",
"reana-commons[kubernetes]>=0.8.0a3,<0.9.0",
"reana-db>=0.8.0a5,<0.9.0",
"requests==2.20.0",
"sqlalchemy-utils>=0.31.0",
"uwsgi-tools>=1.1.1",
Expand Down
22 changes: 11 additions & 11 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pathlib import Path

import pytest
from reana_db.models import Job, JobCache, Workflow, WorkflowStatus
from reana_db.models import Job, JobCache, Workflow, RunStatus

from reana_workflow_controller.rest.utils import (
create_workflow_workspace,
Expand All @@ -25,12 +25,12 @@
@pytest.mark.parametrize(
"status",
[
WorkflowStatus.created,
WorkflowStatus.failed,
WorkflowStatus.finished,
WorkflowStatus.stopped,
pytest.param(WorkflowStatus.deleted, marks=pytest.mark.xfail),
pytest.param(WorkflowStatus.running, marks=pytest.mark.xfail),
RunStatus.created,
RunStatus.failed,
RunStatus.finished,
RunStatus.stopped,
pytest.param(RunStatus.deleted, marks=pytest.mark.xfail),
pytest.param(RunStatus.running, marks=pytest.mark.xfail),
],
)
@pytest.mark.parametrize("hard_delete", [True, False])
Expand All @@ -44,7 +44,7 @@ def test_delete_workflow(

delete_workflow(sample_yadage_workflow_in_db, hard_delete=hard_delete)
if not hard_delete:
assert sample_yadage_workflow_in_db.status == WorkflowStatus.deleted
assert sample_yadage_workflow_in_db.status == RunStatus.deleted
else:
assert (
session.query(Workflow)
Expand Down Expand Up @@ -72,7 +72,7 @@ def test_delete_all_workflow_runs(
)
session.add(workflow)
if i == 4:
workflow.status = WorkflowStatus.running
workflow.status = RunStatus.running
not_deleted_one = workflow.id_
session.commit()

Expand All @@ -87,9 +87,9 @@ def test_delete_all_workflow_runs(
session.query(Workflow).filter_by(name=first_workflow.name).all()
):
if not_deleted_one == workflow.id_:
assert workflow.status == WorkflowStatus.running
assert workflow.status == RunStatus.running
else:
assert workflow.status == WorkflowStatus.deleted
assert workflow.status == RunStatus.deleted
else:
# the one running should not be deleted
assert (
Expand Down
Loading

0 comments on commit c7a623d

Please sign in to comment.