From c7a623dd170732e18631c950a301fbf05c7da01c Mon Sep 17 00:00:00 2001 From: Audrius Mecionis Date: Thu, 8 Oct 2020 11:49:36 +0200 Subject: [PATCH] models: refactor WorkflowStatus to RunStatus models: refactor interactive session types --- reana_workflow_controller/consumer.py | 16 +++---- reana_workflow_controller/rest/utils.py | 36 +++++++-------- reana_workflow_controller/rest/workflows.py | 8 +--- .../rest/workflows_session.py | 8 ++-- .../workflow_run_manager.py | 37 +++++++--------- requirements.txt | 4 +- setup.py | 4 +- tests/test_utils.py | 22 +++++----- tests/test_views.py | 44 +++++++++---------- tests/test_workflow_run_manager.py | 31 +++++++------ 10 files changed, 100 insertions(+), 110 deletions(-) diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index f958aef8..7ee60fe9 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -30,7 +30,7 @@ calculate_job_input_hash, ) from reana_db.database import Session -from reana_db.models import Job, JobCache, Workflow, WorkflowStatus +from reana_db.models import Job, JobCache, Workflow, RunStatus from sqlalchemy.orm.attributes import flag_modified from reana_workflow_controller.config import ( @@ -74,7 +74,7 @@ def on_message(self, body, message): ) next_status = body_dict.get("status") if next_status: - next_status = WorkflowStatus(next_status) + next_status = RunStatus(next_status) print( " [x] Received workflow_uuid: {0} status: {1}".format( workflow_uuid, next_status @@ -107,20 +107,20 @@ def _update_workflow_status(workflow, status, logs): if workflow.git_ref: _update_commit_status(workflow, status) alive_statuses = [ - WorkflowStatus.created, - WorkflowStatus.running, - WorkflowStatus.queued, + RunStatus.created, + RunStatus.running, + RunStatus.queued, ] if status not in alive_statuses: _delete_workflow_engine_pod(workflow) def _update_commit_status(workflow, status): - if status == WorkflowStatus.finished: + if status == RunStatus.finished: state = "success" - elif status == WorkflowStatus.failed: + elif status == RunStatus.failed: state = "failed" - elif status == WorkflowStatus.stopped or status == WorkflowStatus.deleted: + elif status == RunStatus.stopped or status == RunStatus.deleted: state = "canceled" else: state = "running" diff --git a/reana_workflow_controller/rest/utils.py b/reana_workflow_controller/rest/utils.py index c1d83c7a..b1a74fb6 100644 --- a/reana_workflow_controller/rest/utils.py +++ b/reana_workflow_controller/rest/utils.py @@ -32,7 +32,7 @@ from flask import jsonify from git import Repo from reana_db.database import Session -from reana_db.models import Job, JobCache, Workflow, WorkflowStatus +from reana_db.models import Job, JobCache, Workflow, RunStatus from reana_workflow_controller.config import REANA_GITLAB_HOST, WORKFLOW_TIME_FORMAT from reana_workflow_controller.errors import ( REANAExternalCallError, @@ -47,7 +47,7 @@ def start_workflow(workflow, parameters): """Start a workflow.""" def _start_workflow_db(workflow, parameters): - workflow.status = WorkflowStatus.running + workflow.status = RunStatus.running if parameters: workflow.input_parameters = parameters.get("input_parameters") workflow.operational_options = parameters.get("operational_options") @@ -67,13 +67,13 @@ def _start_workflow_db(workflow, parameters): if "restart" in parameters.keys(): if parameters["restart"]: if workflow.status not in [ - WorkflowStatus.failed, - WorkflowStatus.finished, - WorkflowStatus.queued, + RunStatus.failed, + RunStatus.finished, + RunStatus.queued, ]: raise REANAWorkflowControllerError(failure_message) - elif workflow.status not in [WorkflowStatus.created, WorkflowStatus.queued]: - if workflow.status == WorkflowStatus.deleted: + elif workflow.status not in [RunStatus.created, RunStatus.queued]: + if workflow.status == RunStatus.deleted: raise REANAWorkflowStatusError(failure_message) raise REANAWorkflowControllerError(failure_message) @@ -104,10 +104,10 @@ def _start_workflow_db(workflow, parameters): def stop_workflow(workflow): """Stop a given workflow.""" - if workflow.status == WorkflowStatus.running: + if workflow.status == RunStatus.running: kwrm = KubernetesWorkflowRunManager(workflow) kwrm.stop_batch_workflow_run() - workflow.status = WorkflowStatus.stopped + workflow.status = RunStatus.stopped Session.add(workflow) Session.commit() else: @@ -186,12 +186,12 @@ def remove_workflow_jobs_from_cache(workflow): def delete_workflow(workflow, all_runs=False, hard_delete=False, workspace=False): """Delete workflow.""" if workflow.status in [ - WorkflowStatus.created, - WorkflowStatus.finished, - WorkflowStatus.stopped, - WorkflowStatus.deleted, - WorkflowStatus.failed, - WorkflowStatus.queued, + RunStatus.created, + RunStatus.finished, + RunStatus.stopped, + RunStatus.deleted, + RunStatus.failed, + RunStatus.queued, ]: try: to_be_deleted = [workflow] @@ -200,7 +200,7 @@ def delete_workflow(workflow, all_runs=False, hard_delete=False, workspace=False Session.query(Workflow) .filter( Workflow.name == workflow.name, - Workflow.status != WorkflowStatus.running, + Workflow.status != RunStatus.running, ) .all() ) @@ -229,7 +229,7 @@ def delete_workflow(workflow, all_runs=False, hard_delete=False, workspace=False except Exception as e: logging.error(traceback.format_exc()) return jsonify({"message": str(e)}), 500 - elif workflow.status == WorkflowStatus.running: + elif workflow.status == RunStatus.running: raise REANAWorkflowDeletionError( "Workflow {0}.{1} cannot be deleted as it" " is currently running.".format(workflow.name, workflow.run_number) @@ -244,7 +244,7 @@ def _delete_workflow_row_from_db(workflow): def _mark_workflow_as_deleted_in_db(workflow): """Mark workflow as deleted.""" - workflow.status = WorkflowStatus.deleted + workflow.status = RunStatus.deleted current_db_sessions = Session.object_session(workflow) current_db_sessions.add(workflow) current_db_sessions.commit() diff --git a/reana_workflow_controller/rest/workflows.py b/reana_workflow_controller/rest/workflows.py index f49be285..06813cf1 100644 --- a/reana_workflow_controller/rest/workflows.py +++ b/reana_workflow_controller/rest/workflows.py @@ -16,7 +16,7 @@ from reana_db.models import ( User, Workflow, - WorkflowStatus, + RunStatus, InteractiveSession, WorkflowSession, ) @@ -213,13 +213,7 @@ def get_workflows(paginate=None): # noqa if search: query = query.filter(Workflow.name.ilike("%{}%".format(search))) if status_list: -<<<<<<< HEAD - workflow_status = [ - WorkflowStatus[status] for status in status_list.split(",") - ] -======= workflow_status = [RunStatus[status] for status in status_list.split(",")] ->>>>>>> 518e94d... models: refacor on recent interactive session model changes query = query.filter(Workflow.status.in_(workflow_status)) if sort not in ["asc", "desc"]: sort = "desc" diff --git a/reana_workflow_controller/rest/workflows_session.py b/reana_workflow_controller/rest/workflows_session.py index d4d83f7d..4aa41c65 100644 --- a/reana_workflow_controller/rest/workflows_session.py +++ b/reana_workflow_controller/rest/workflows_session.py @@ -10,9 +10,8 @@ from flask import Blueprint, jsonify, request -from reana_commons.config import INTERACTIVE_SESSION_TYPES from reana_db.utils import _get_workflow_with_uuid_or_name -from reana_db.models import WorkflowSession +from reana_db.models import WorkflowSession, InteractiveSessionType from reana_workflow_controller.workflow_run_manager import KubernetesWorkflowRunManager @@ -109,13 +108,14 @@ def open_interactive_session(workflow_id_or_name, interactive_session_type): # Request failed. Internal controller error. """ try: - if interactive_session_type not in INTERACTIVE_SESSION_TYPES: + if interactive_session_type not in InteractiveSessionType.__members__: return ( jsonify( { "message": "Interactive session type {0} not found, try " "with one of: {1}".format( - interactive_session_type, INTERACTIVE_SESSION_TYPES + interactive_session_type, + [e.name for e in InteractiveSessionType], ) } ), diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index 21fd2ec7..551adc88 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -16,7 +16,6 @@ from kubernetes.client.rest import ApiException from reana_commons.config import ( CVMFS_REPOSITORIES, - INTERACTIVE_SESSION_TYPES, K8S_CERN_EOS_AVAILABLE, REANA_COMPONENT_NAMING_SCHEME, REANA_COMPONENT_PREFIX, @@ -46,13 +45,8 @@ Job, JobStatus, InteractiveSession, -<<<<<<< HEAD - UserWorkflowSession, - WorkflowStatus, -======= - WorkflowSession, + InteractiveSessionType, RunStatus, ->>>>>>> 518e94d... models: refacor on recent interactive session model changes ) from reana_workflow_controller.errors import ( @@ -280,7 +274,7 @@ def start_interactive_session(self, interactive_session_type, **kwargs): """ action_completed = True try: - if interactive_session_type not in INTERACTIVE_SESSION_TYPES: + if interactive_session_type not in InteractiveSessionType.__members__: raise REANAInteractiveSessionError( "Interactive type {} does not exist.".format( interactive_session_type @@ -302,6 +296,19 @@ def start_interactive_session(self, interactive_session_type, **kwargs): instantiate_chained_k8s_objects( kubernetes_objects, REANA_RUNTIME_KUBERNETES_NAMESPACE ) + + # Save interactive session to the database + int_session = InteractiveSession( + name=workflow_run_name, + path=access_path, + type_=interactive_session_type, + owner_id=self.workflow.owner_id, + ) + self.workflow.sessions.append(int_session) + current_db_sessions = Session.object_session(self.workflow) + current_db_sessions.add(self.workflow) + current_db_sessions.commit() + return access_path except KeyError: @@ -323,19 +330,9 @@ def start_interactive_session(self, interactive_session_type, **kwargs): ) finally: if not action_completed and kubernetes_objects: - return delete_k8s_objects_if_exist( + delete_k8s_objects_if_exist( kubernetes_objects, REANA_RUNTIME_KUBERNETES_NAMESPACE ) - int_session = InteractiveSession( - name=workflow_run_name, - path=access_path, - type_=interactive_session_type, - owner_id=self.workflow.owner_id, - ) - self.workflow.sessions.append(int_session) - current_db_sessions = Session.object_session(self.workflow) - current_db_sessions.add(self.workflow) - current_db_sessions.commit() def stop_interactive_session(self, interactive_session_id): """Stop an interactive workflow run.""" @@ -362,7 +359,7 @@ def stop_interactive_session(self, interactive_session_id): ) finally: if action_completed: - int_session.status = WorkflowStatus.stopped + int_session.status = RunStatus.stopped current_db_sessions = Session.object_session(self.workflow) current_db_sessions.add(int_session) current_db_sessions.commit() diff --git a/requirements.txt b/requirements.txt index 8c6fe61d..564d2e8a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -51,8 +51,8 @@ python-dateutil==2.8.1 # via alembic, bravado, bravado-core, kubernetes python-editor==1.0.4 # via alembic pytz==2020.1 # via bravado-core, fs pyyaml==5.3.1 # via bravado, bravado-core, kubernetes, reana-commons, swagger-spec-validator -reana-commons[kubernetes]==0.8.0a2 # via reana-db, reana-workflow-controller (setup.py) -reana-db==0.8.0a3 # via reana-workflow-controller (setup.py) +reana-commons[kubernetes]==0.8.0a3 # via reana-db, reana-workflow-controller (setup.py) +reana-db==0.8.0a5 # via reana-workflow-controller (setup.py) requests-oauthlib==1.3.0 # via kubernetes requests==2.20.0 # via bravado, kubernetes, reana-workflow-controller (setup.py), requests-oauthlib rfc3987==1.3.8 # via jsonschema diff --git a/setup.py b/setup.py index 43b7abef..aa056ce2 100644 --- a/setup.py +++ b/setup.py @@ -51,8 +51,8 @@ "jsonpickle>=0.9.6", "marshmallow>2.13.0,<=2.20.1", "packaging>=18.0", - "reana-commons[kubernetes]>=0.8.0a2,<0.9.0", - "reana-db>=0.8.0a3,<0.9.0", + "reana-commons[kubernetes]>=0.8.0a3,<0.9.0", + "reana-db>=0.8.0a5,<0.9.0", "requests==2.20.0", "sqlalchemy-utils>=0.31.0", "uwsgi-tools>=1.1.1", diff --git a/tests/test_utils.py b/tests/test_utils.py index b88ac43c..5ca023b8 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -13,7 +13,7 @@ from pathlib import Path import pytest -from reana_db.models import Job, JobCache, Workflow, WorkflowStatus +from reana_db.models import Job, JobCache, Workflow, RunStatus from reana_workflow_controller.rest.utils import ( create_workflow_workspace, @@ -25,12 +25,12 @@ @pytest.mark.parametrize( "status", [ - WorkflowStatus.created, - WorkflowStatus.failed, - WorkflowStatus.finished, - WorkflowStatus.stopped, - pytest.param(WorkflowStatus.deleted, marks=pytest.mark.xfail), - pytest.param(WorkflowStatus.running, marks=pytest.mark.xfail), + RunStatus.created, + RunStatus.failed, + RunStatus.finished, + RunStatus.stopped, + pytest.param(RunStatus.deleted, marks=pytest.mark.xfail), + pytest.param(RunStatus.running, marks=pytest.mark.xfail), ], ) @pytest.mark.parametrize("hard_delete", [True, False]) @@ -44,7 +44,7 @@ def test_delete_workflow( delete_workflow(sample_yadage_workflow_in_db, hard_delete=hard_delete) if not hard_delete: - assert sample_yadage_workflow_in_db.status == WorkflowStatus.deleted + assert sample_yadage_workflow_in_db.status == RunStatus.deleted else: assert ( session.query(Workflow) @@ -72,7 +72,7 @@ def test_delete_all_workflow_runs( ) session.add(workflow) if i == 4: - workflow.status = WorkflowStatus.running + workflow.status = RunStatus.running not_deleted_one = workflow.id_ session.commit() @@ -87,9 +87,9 @@ def test_delete_all_workflow_runs( session.query(Workflow).filter_by(name=first_workflow.name).all() ): if not_deleted_one == workflow.id_: - assert workflow.status == WorkflowStatus.running + assert workflow.status == RunStatus.running else: - assert workflow.status == WorkflowStatus.deleted + assert workflow.status == RunStatus.deleted else: # the one running should not be deleted assert ( diff --git a/tests/test_views.py b/tests/test_views.py index d4230521..8d39c766 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -20,7 +20,7 @@ Job, JobCache, Workflow, - WorkflowStatus, + RunStatus, InteractiveSession, ) from werkzeug.utils import secure_filename @@ -33,8 +33,8 @@ from reana_workflow_controller.workflow_run_manager import WorkflowRunManager status_dict = { - START: WorkflowStatus.running, - STOP: WorkflowStatus.finished, + START: RunStatus.running, + STOP: RunStatus.finished, } @@ -46,7 +46,7 @@ def test_get_workflows(app, session, default_user, cwl_workflow_with_name): workflow = Workflow( id_=workflow_uuid, name=workflow_name, - status=WorkflowStatus.finished, + status=RunStatus.finished, owner_id=default_user.id_, reana_specification=cwl_workflow_with_name["reana_specification"], type_=cwl_workflow_with_name["reana_specification"]["type"], @@ -398,7 +398,7 @@ def test_get_workflow_status_with_uuid( ) json_response = json.loads(res.data.decode()) assert json_response.get("status") == workflow.status.name - workflow.status = WorkflowStatus.finished + workflow.status = RunStatus.finished session.commit() res = client.get( @@ -422,7 +422,7 @@ def test_get_workflow_status_with_name( workflow = Workflow( id_=workflow_uuid, name=workflow_name, - status=WorkflowStatus.finished, + status=RunStatus.finished, owner_id=default_user.id_, reana_specification=cwl_workflow_with_name["reana_specification"], type_=cwl_workflow_with_name["reana_specification"]["type"], @@ -444,7 +444,7 @@ def test_get_workflow_status_with_name( json_response = json.loads(res.data.decode()) assert json_response.get("status") == workflow.status.name - workflow.status = WorkflowStatus.finished + workflow.status = RunStatus.finished session.commit() res = client.get( @@ -530,7 +530,7 @@ def test_set_workflow_status( response_data = json.loads(res.get_data(as_text=True)) workflow_created_uuid = response_data.get("workflow_id") workflow = Workflow.query.filter(Workflow.id_ == workflow_created_uuid).first() - assert workflow.status == WorkflowStatus.created + assert workflow.status == RunStatus.created payload = START with mock.patch( "reana_workflow_controller.workflow_run_manager." @@ -576,7 +576,7 @@ def test_start_already_started_workflow( response_data = json.loads(res.get_data(as_text=True)) workflow_created_uuid = response_data.get("workflow_id") workflow = Workflow.query.filter(Workflow.id_ == workflow_created_uuid).first() - assert workflow.status == WorkflowStatus.created + assert workflow.status == RunStatus.created payload = START with mock.patch( "reana_workflow_controller.workflow_run_manager." @@ -617,10 +617,10 @@ def test_start_already_started_workflow( "current_status, expected_status, expected_http_status_code, " "k8s_stop_call_count", [ - (WorkflowStatus.created, WorkflowStatus.created, 409, 0), - (WorkflowStatus.running, WorkflowStatus.stopped, 200, 1), - (WorkflowStatus.failed, WorkflowStatus.failed, 409, 0), - (WorkflowStatus.finished, WorkflowStatus.finished, 409, 0), + (RunStatus.created, RunStatus.created, 409, 0), + (RunStatus.running, RunStatus.stopped, 200, 1), + (RunStatus.failed, RunStatus.failed, 409, 0), + (RunStatus.finished, RunStatus.finished, 409, 0), ], ) def test_stop_workflow( @@ -880,12 +880,12 @@ def test_start_input_parameters( """Test start workflow with inupt parameters.""" with app.test_client() as client: # create workflow - sample_serial_workflow_in_db.status = WorkflowStatus.created + sample_serial_workflow_in_db.status = RunStatus.created workflow_created_uuid = sample_serial_workflow_in_db.id_ session.add(sample_serial_workflow_in_db) session.commit() workflow = Workflow.query.filter(Workflow.id_ == workflow_created_uuid).first() - assert workflow.status == WorkflowStatus.created + assert workflow.status == RunStatus.created payload = START parameters = {"input_parameters": {"first": "test"}, "operational_options": {}} with mock.patch( @@ -990,11 +990,11 @@ def test_start_workflow_kubernetes_failure( @pytest.mark.parametrize( "status", [ - WorkflowStatus.created, - WorkflowStatus.failed, - WorkflowStatus.finished, - pytest.param(WorkflowStatus.deleted, marks=pytest.mark.xfail), - pytest.param(WorkflowStatus.running, marks=pytest.mark.xfail), + RunStatus.created, + RunStatus.failed, + RunStatus.finished, + pytest.param(RunStatus.deleted, marks=pytest.mark.xfail), + pytest.param(RunStatus.running, marks=pytest.mark.xfail), ], ) @pytest.mark.parametrize("hard_delete", [True, False]) @@ -1016,7 +1016,7 @@ def test_delete_workflow( data=json.dumps({"hard_delete": hard_delete}), ) if not hard_delete: - assert sample_yadage_workflow_in_db.status == WorkflowStatus.deleted + assert sample_yadage_workflow_in_db.status == RunStatus.deleted else: assert ( session.query(Workflow) @@ -1063,7 +1063,7 @@ def test_delete_all_workflow_runs( for workflow in ( session.query(Workflow).filter_by(name=first_workflow.name).all() ): - assert workflow.status == WorkflowStatus.deleted + assert workflow.status == RunStatus.deleted else: assert session.query(Workflow).filter_by(name=first_workflow.name).all() == [] diff --git a/tests/test_workflow_run_manager.py b/tests/test_workflow_run_manager.py index 0ec4fed4..ed1bbdd3 100644 --- a/tests/test_workflow_run_manager.py +++ b/tests/test_workflow_run_manager.py @@ -13,8 +13,7 @@ import pytest from kubernetes.client.rest import ApiException from mock import DEFAULT, Mock, patch -from reana_commons.config import INTERACTIVE_SESSION_TYPES -from reana_db.models import WorkflowStatus, InteractiveSession +from reana_db.models import RunStatus, InteractiveSession, InteractiveSessionType from reana_workflow_controller.errors import REANAInteractiveSessionError from reana_workflow_controller.workflow_run_manager import KubernetesWorkflowRunManager @@ -29,8 +28,8 @@ def test_start_interactive_session(sample_serial_workflow_in_db): current_k8s_appsv1_api_client=DEFAULT, ) as mocks: kwrm = KubernetesWorkflowRunManager(sample_serial_workflow_in_db) - if len(INTERACTIVE_SESSION_TYPES): - kwrm.start_interactive_session(INTERACTIVE_SESSION_TYPES[0]) + if len(InteractiveSessionType): + kwrm.start_interactive_session(InteractiveSessionType(0).name) mocks[ "current_k8s_appsv1_api_client" ].create_namespaced_deployment.assert_called_once() @@ -58,8 +57,8 @@ def test_start_interactive_workflow_k8s_failure(sample_serial_workflow_in_db): REANAInteractiveSessionError, match=r".*Kubernetes has failed.*" ): kwrm = KubernetesWorkflowRunManager(sample_serial_workflow_in_db) - if len(INTERACTIVE_SESSION_TYPES): - kwrm.start_interactive_session(INTERACTIVE_SESSION_TYPES[0]) + if len(InteractiveSessionType): + kwrm.start_interactive_session(InteractiveSessionType(0).name) def test_atomic_creation_of_interactive_session(sample_serial_workflow_in_db): @@ -84,8 +83,8 @@ def test_atomic_creation_of_interactive_session(sample_serial_workflow_in_db): ) as mocks: try: kwrm = KubernetesWorkflowRunManager(sample_serial_workflow_in_db) - if len(INTERACTIVE_SESSION_TYPES): - kwrm.start_interactive_session(INTERACTIVE_SESSION_TYPES[0]) + if len(InteractiveSessionType): + kwrm.start_interactive_session(InteractiveSessionType(0).name) except REANAInteractiveSessionError: mocks[ "current_k8s_corev1_api_client" @@ -94,7 +93,7 @@ def test_atomic_creation_of_interactive_session(sample_serial_workflow_in_db): "current_k8s_networking_v1beta1" ].delete_namespaced_ingress.assert_called_once() mocked_k8s_client.delete_namespaced_deployment.assert_called_once() - assert sample_serial_workflow_in_db.interactive_session is None + assert not sample_serial_workflow_in_db.sessions.all() def test_stop_workflow_backend_only_kubernetes( @@ -102,7 +101,7 @@ def test_stop_workflow_backend_only_kubernetes( ): """Test deletion of workflows with only Kubernetes based jobs.""" workflow = sample_serial_workflow_in_db - workflow.status = WorkflowStatus.running + workflow.status = RunStatus.running workflow_jobs = add_kubernetes_jobs_to_workflow(workflow) backend_job_ids = [job.backend_job_id for job in workflow_jobs] with patch( @@ -129,15 +128,15 @@ def test_interactive_session_closure(sample_serial_workflow_in_db, session): current_k8s_corev1_api_client=DEFAULT, ) as mocks: kwrm = KubernetesWorkflowRunManager(workflow) - if len(INTERACTIVE_SESSION_TYPES): - kwrm.start_interactive_session(INTERACTIVE_SESSION_TYPES[0]) + if len(InteractiveSessionType): + kwrm.start_interactive_session(InteractiveSessionType(0).name) int_session = InteractiveSession.query.filter_by( - owner_id=workflow.owner_id, type_=INTERACTIVE_SESSION_TYPES[0], + owner_id=workflow.owner_id, type_=InteractiveSessionType(0).name, ).first() - assert int_session.status == WorkflowStatus.created + assert int_session.status == RunStatus.created kwrm.stop_interactive_session(int_session.id_) int_session = InteractiveSession.query.filter_by( - owner_id=workflow.owner_id, type_=INTERACTIVE_SESSION_TYPES[0], + owner_id=workflow.owner_id, type_=InteractiveSessionType(0).name, ).first() - assert int_session.status == WorkflowStatus.stopped + assert int_session.status == RunStatus.stopped