From 84ce25b318774b8ff7fcea34961d5f40d0965c59 Mon Sep 17 00:00:00 2001 From: Audrius Mecionis Date: Wed, 7 Oct 2020 16:42:34 +0200 Subject: [PATCH] models: refactoring according to interactive sessions db models change closes https://github.com/reanahub/reana-db/issues/89 --- reana_workflow_controller/rest/workflows.py | 24 ++++-- .../rest/workflows_session.py | 6 +- .../workflow_run_manager.py | 73 +++++++++++++------ tests/test_views.py | 21 ++++-- tests/test_workflow_run_manager.py | 21 +++--- 5 files changed, 96 insertions(+), 49 deletions(-) diff --git a/reana_workflow_controller/rest/workflows.py b/reana_workflow_controller/rest/workflows.py index 72b7166f..f49be285 100644 --- a/reana_workflow_controller/rest/workflows.py +++ b/reana_workflow_controller/rest/workflows.py @@ -13,7 +13,13 @@ from flask import Blueprint, jsonify, request from reana_db.database import Session -from reana_db.models import User, Workflow, WorkflowStatus +from reana_db.models import ( + User, + Workflow, + WorkflowStatus, + InteractiveSession, + WorkflowSession, +) from reana_db.utils import _get_workflow_with_uuid_or_name from reana_workflow_controller.config import ( @@ -207,9 +213,13 @@ def get_workflows(paginate=None): # noqa if search: query = query.filter(Workflow.name.ilike("%{}%".format(search))) if status_list: +<<<<<<< HEAD workflow_status = [ WorkflowStatus[status] for status in status_list.split(",") ] +======= + workflow_status = [RunStatus[status] for status in status_list.split(",")] +>>>>>>> 518e94d... models: refacor on recent interactive session model changes query = query.filter(Workflow.status.in_(workflow_status)) if sort not in ["asc", "desc"]: sort = "desc" @@ -226,14 +236,12 @@ def get_workflows(paginate=None): # noqa "size": "-", } if type_ == "interactive": - if ( - not workflow.interactive_session - or not workflow.interactive_session_name - or not workflow.interactive_session_type - ): + int_session = workflow.sessions.first() + if not int_session: continue - workflow_response["session_type"] = workflow.interactive_session_type - workflow_response["session_uri"] = workflow.interactive_session + workflow_response["session_type"] = int_session.type_.name + workflow_response["session_uri"] = int_session.path + workflow_response["session_status"] = int_session.status.name if verbose: try: disk_usage_info = workflow.get_workspace_disk_usage( diff --git a/reana_workflow_controller/rest/workflows_session.py b/reana_workflow_controller/rest/workflows_session.py index 805990a8..d4d83f7d 100644 --- a/reana_workflow_controller/rest/workflows_session.py +++ b/reana_workflow_controller/rest/workflows_session.py @@ -12,6 +12,7 @@ from flask import Blueprint, jsonify, request from reana_commons.config import INTERACTIVE_SESSION_TYPES from reana_db.utils import _get_workflow_with_uuid_or_name +from reana_db.models import WorkflowSession from reana_workflow_controller.workflow_run_manager import KubernetesWorkflowRunManager @@ -203,7 +204,8 @@ def close_interactive_session(workflow_id_or_name): # noqa user_uuid = request.args["user"] workflow = None workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid) - if workflow.interactive_session_name is None: + int_session = workflow.sessions.first() + if not int_session: return ( jsonify( { @@ -215,7 +217,7 @@ def close_interactive_session(workflow_id_or_name): # noqa 404, ) kwrm = KubernetesWorkflowRunManager(workflow) - kwrm.stop_interactive_session() + kwrm.stop_interactive_session(int_session.id_) return jsonify({"message": "The interactive session has been closed"}), 200 except (KeyError, ValueError) as e: diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index 0bdbb262..21fd2ec7 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -42,7 +42,18 @@ ) from reana_db.config import SQLALCHEMY_DATABASE_URI from reana_db.database import Session -from reana_db.models import Job, JobStatus +from reana_db.models import ( + Job, + JobStatus, + InteractiveSession, +<<<<<<< HEAD + UserWorkflowSession, + WorkflowStatus, +======= + WorkflowSession, + RunStatus, +>>>>>>> 518e94d... models: refacor on recent interactive session model changes +) from reana_workflow_controller.errors import ( REANAInteractiveSessionError, @@ -276,10 +287,7 @@ def start_interactive_session(self, interactive_session_type, **kwargs): ) ) access_path = self._generate_interactive_workflow_path() - self.workflow.interactive_session_type = interactive_session_type - self.workflow.interactive_session = access_path workflow_run_name = self._workflow_run_name_generator("session") - self.workflow.interactive_session_name = workflow_run_name kubernetes_objects = build_interactive_k8s_objects[ interactive_session_type ]( @@ -314,29 +322,50 @@ def start_interactive_session(self, interactive_session_type, **kwargs): "Unkown error while starting interactive workflow run:\n{}".format(e) ) finally: - if not action_completed: - self.workflow.interactive_session = None - if kubernetes_objects: - delete_k8s_objects_if_exist( - kubernetes_objects, REANA_RUNTIME_KUBERNETES_NAMESPACE - ) - + if not action_completed and kubernetes_objects: + return delete_k8s_objects_if_exist( + kubernetes_objects, REANA_RUNTIME_KUBERNETES_NAMESPACE + ) + int_session = InteractiveSession( + name=workflow_run_name, + path=access_path, + type_=interactive_session_type, + owner_id=self.workflow.owner_id, + ) + self.workflow.sessions.append(int_session) current_db_sessions = Session.object_session(self.workflow) current_db_sessions.add(self.workflow) current_db_sessions.commit() - def stop_interactive_session(self): + def stop_interactive_session(self, interactive_session_id): """Stop an interactive workflow run.""" - delete_k8s_ingress_object( - ingress_name=self.workflow.interactive_session_name, - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - ) - self.workflow.interactive_session_name = None - self.workflow.interactive_session = None - self.workflow.interactive_session_type = None - current_db_sessions = Session.object_session(self.workflow) - current_db_sessions.add(self.workflow) - current_db_sessions.commit() + int_session = InteractiveSession.query.filter_by( + id_=interactive_session_id + ).first() + + if not int_session: + raise REANAInteractiveSessionError( + "Interactive session for workflow {} does not exist.".format( + self.workflow.name + ) + ) + action_completed = True + try: + delete_k8s_ingress_object( + ingress_name=int_session.name, + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, + ) + except Exception as e: + action_completed = False + raise REANAInteractiveSessionError( + "Unkown error while stopping interactive session:\n{}".format(e) + ) + finally: + if action_completed: + int_session.status = WorkflowStatus.stopped + current_db_sessions = Session.object_session(self.workflow) + current_db_sessions.add(int_session) + current_db_sessions.commit() def stop_batch_workflow_run(self): """Stop a batch workflow run along with all its dependent jobs.""" diff --git a/tests/test_views.py b/tests/test_views.py index 2c6a666b..d4230521 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -16,7 +16,13 @@ import mock import pytest from flask import url_for -from reana_db.models import Job, JobCache, Workflow, WorkflowStatus +from reana_db.models import ( + Job, + JobCache, + Workflow, + WorkflowStatus, + InteractiveSession, +) from werkzeug.utils import secure_filename from reana_workflow_controller.rest.utils import ( @@ -1338,12 +1344,12 @@ def test_close_interactive_session( ): """Test close an interactive session.""" expected_data = {"message": "The interactive session has been closed"} - sample_serial_workflow_in_db.interactive_session = ( - "/5d9b30fd-f225-4615-9107-b1373afec070" - ) - sample_serial_workflow_in_db.interactive_session_name = ( - "interactive-jupyter-5d9b30fd-f225-4615-9107-b1373afec070-5lswkp" + path = "/5d9b30fd-f225-4615-9107-b1373afec070" + name = "interactive-jupyter-5d9b30fd-f225-4615-9107-b1373afec070-5lswkp" + int_session = InteractiveSession( + name=name, path=path, owner_id=sample_serial_workflow_in_db.owner_id, ) + sample_serial_workflow_in_db.sessions.append(int_session) session.add(sample_serial_workflow_in_db) session.commit() with app.test_client() as client: @@ -1371,8 +1377,7 @@ def test_close_interactive_session_not_opened( ) } with app.test_client() as client: - sample_serial_workflow_in_db.interactive_session = None - sample_serial_workflow_in_db.interactive_session_name = None + sample_serial_workflow_in_db.sessions = [] session.add(sample_serial_workflow_in_db) session.commit() res = client.post( diff --git a/tests/test_workflow_run_manager.py b/tests/test_workflow_run_manager.py index 519f6ce7..0ec4fed4 100644 --- a/tests/test_workflow_run_manager.py +++ b/tests/test_workflow_run_manager.py @@ -14,7 +14,7 @@ from kubernetes.client.rest import ApiException from mock import DEFAULT, Mock, patch from reana_commons.config import INTERACTIVE_SESSION_TYPES -from reana_db.models import WorkflowStatus +from reana_db.models import WorkflowStatus, InteractiveSession from reana_workflow_controller.errors import REANAInteractiveSessionError from reana_workflow_controller.workflow_run_manager import KubernetesWorkflowRunManager @@ -118,7 +118,7 @@ def test_stop_workflow_backend_only_kubernetes( assert not backend_job_ids -def test_interactive_session_closure(sample_serial_workflow_in_db): +def test_interactive_session_closure(sample_serial_workflow_in_db, session): """Test closure of an interactive sessions.""" mocked_k8s_client = Mock() workflow = sample_serial_workflow_in_db @@ -131,10 +131,13 @@ def test_interactive_session_closure(sample_serial_workflow_in_db): kwrm = KubernetesWorkflowRunManager(workflow) if len(INTERACTIVE_SESSION_TYPES): kwrm.start_interactive_session(INTERACTIVE_SESSION_TYPES[0]) - assert workflow.interactive_session_name - assert workflow.interactive_session - assert workflow.interactive_session_type - kwrm.stop_interactive_session() - assert workflow.interactive_session_name is None - assert workflow.interactive_session is None - assert workflow.interactive_session_type is None + + int_session = InteractiveSession.query.filter_by( + owner_id=workflow.owner_id, type_=INTERACTIVE_SESSION_TYPES[0], + ).first() + assert int_session.status == WorkflowStatus.created + kwrm.stop_interactive_session(int_session.id_) + int_session = InteractiveSession.query.filter_by( + owner_id=workflow.owner_id, type_=INTERACTIVE_SESSION_TYPES[0], + ).first() + assert int_session.status == WorkflowStatus.stopped