Skip to content

Commit

Permalink
models: refactoring according to interactive sessions db models change
Browse files Browse the repository at this point in the history
  • Loading branch information
audrium committed Oct 9, 2020
1 parent 2173e2c commit 84ce25b
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 49 deletions.
24 changes: 16 additions & 8 deletions reana_workflow_controller/rest/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@

from flask import Blueprint, jsonify, request
from reana_db.database import Session
from reana_db.models import User, Workflow, WorkflowStatus
from reana_db.models import (
User,
Workflow,
WorkflowStatus,
InteractiveSession,
WorkflowSession,
)
from reana_db.utils import _get_workflow_with_uuid_or_name

from reana_workflow_controller.config import (
Expand Down Expand Up @@ -207,9 +213,13 @@ def get_workflows(paginate=None): # noqa
if search:
query = query.filter(Workflow.name.ilike("%{}%".format(search)))
if status_list:
<<<<<<< HEAD
workflow_status = [
WorkflowStatus[status] for status in status_list.split(",")
]
=======
workflow_status = [RunStatus[status] for status in status_list.split(",")]
>>>>>>> 518e94d... models: refacor on recent interactive session model changes
query = query.filter(Workflow.status.in_(workflow_status))
if sort not in ["asc", "desc"]:
sort = "desc"
Expand All @@ -226,14 +236,12 @@ def get_workflows(paginate=None): # noqa
"size": "-",
}
if type_ == "interactive":
if (
not workflow.interactive_session
or not workflow.interactive_session_name
or not workflow.interactive_session_type
):
int_session = workflow.sessions.first()
if not int_session:
continue
workflow_response["session_type"] = workflow.interactive_session_type
workflow_response["session_uri"] = workflow.interactive_session
workflow_response["session_type"] = int_session.type_.name
workflow_response["session_uri"] = int_session.path
workflow_response["session_status"] = int_session.status.name
if verbose:
try:
disk_usage_info = workflow.get_workspace_disk_usage(
Expand Down
6 changes: 4 additions & 2 deletions reana_workflow_controller/rest/workflows_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from flask import Blueprint, jsonify, request
from reana_commons.config import INTERACTIVE_SESSION_TYPES
from reana_db.utils import _get_workflow_with_uuid_or_name
from reana_db.models import WorkflowSession

from reana_workflow_controller.workflow_run_manager import KubernetesWorkflowRunManager

Expand Down Expand Up @@ -203,7 +204,8 @@ def close_interactive_session(workflow_id_or_name): # noqa
user_uuid = request.args["user"]
workflow = None
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid)
if workflow.interactive_session_name is None:
int_session = workflow.sessions.first()
if not int_session:
return (
jsonify(
{
Expand All @@ -215,7 +217,7 @@ def close_interactive_session(workflow_id_or_name): # noqa
404,
)
kwrm = KubernetesWorkflowRunManager(workflow)
kwrm.stop_interactive_session()
kwrm.stop_interactive_session(int_session.id_)
return jsonify({"message": "The interactive session has been closed"}), 200

except (KeyError, ValueError) as e:
Expand Down
73 changes: 51 additions & 22 deletions reana_workflow_controller/workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,18 @@
)
from reana_db.config import SQLALCHEMY_DATABASE_URI
from reana_db.database import Session
from reana_db.models import Job, JobStatus
from reana_db.models import (
Job,
JobStatus,
InteractiveSession,
<<<<<<< HEAD
UserWorkflowSession,
WorkflowStatus,
=======
WorkflowSession,
RunStatus,
>>>>>>> 518e94d... models: refacor on recent interactive session model changes
)

from reana_workflow_controller.errors import (
REANAInteractiveSessionError,
Expand Down Expand Up @@ -276,10 +287,7 @@ def start_interactive_session(self, interactive_session_type, **kwargs):
)
)
access_path = self._generate_interactive_workflow_path()
self.workflow.interactive_session_type = interactive_session_type
self.workflow.interactive_session = access_path
workflow_run_name = self._workflow_run_name_generator("session")
self.workflow.interactive_session_name = workflow_run_name
kubernetes_objects = build_interactive_k8s_objects[
interactive_session_type
](
Expand Down Expand Up @@ -314,29 +322,50 @@ def start_interactive_session(self, interactive_session_type, **kwargs):
"Unkown error while starting interactive workflow run:\n{}".format(e)
)
finally:
if not action_completed:
self.workflow.interactive_session = None
if kubernetes_objects:
delete_k8s_objects_if_exist(
kubernetes_objects, REANA_RUNTIME_KUBERNETES_NAMESPACE
)

if not action_completed and kubernetes_objects:
return delete_k8s_objects_if_exist(
kubernetes_objects, REANA_RUNTIME_KUBERNETES_NAMESPACE
)
int_session = InteractiveSession(
name=workflow_run_name,
path=access_path,
type_=interactive_session_type,
owner_id=self.workflow.owner_id,
)
self.workflow.sessions.append(int_session)
current_db_sessions = Session.object_session(self.workflow)
current_db_sessions.add(self.workflow)
current_db_sessions.commit()

def stop_interactive_session(self):
def stop_interactive_session(self, interactive_session_id):
"""Stop an interactive workflow run."""
delete_k8s_ingress_object(
ingress_name=self.workflow.interactive_session_name,
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
)
self.workflow.interactive_session_name = None
self.workflow.interactive_session = None
self.workflow.interactive_session_type = None
current_db_sessions = Session.object_session(self.workflow)
current_db_sessions.add(self.workflow)
current_db_sessions.commit()
int_session = InteractiveSession.query.filter_by(
id_=interactive_session_id
).first()

if not int_session:
raise REANAInteractiveSessionError(
"Interactive session for workflow {} does not exist.".format(
self.workflow.name
)
)
action_completed = True
try:
delete_k8s_ingress_object(
ingress_name=int_session.name,
namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE,
)
except Exception as e:
action_completed = False
raise REANAInteractiveSessionError(
"Unkown error while stopping interactive session:\n{}".format(e)
)
finally:
if action_completed:
int_session.status = WorkflowStatus.stopped
current_db_sessions = Session.object_session(self.workflow)
current_db_sessions.add(int_session)
current_db_sessions.commit()

def stop_batch_workflow_run(self):
"""Stop a batch workflow run along with all its dependent jobs."""
Expand Down
21 changes: 13 additions & 8 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
import mock
import pytest
from flask import url_for
from reana_db.models import Job, JobCache, Workflow, WorkflowStatus
from reana_db.models import (
Job,
JobCache,
Workflow,
WorkflowStatus,
InteractiveSession,
)
from werkzeug.utils import secure_filename

from reana_workflow_controller.rest.utils import (
Expand Down Expand Up @@ -1338,12 +1344,12 @@ def test_close_interactive_session(
):
"""Test close an interactive session."""
expected_data = {"message": "The interactive session has been closed"}
sample_serial_workflow_in_db.interactive_session = (
"/5d9b30fd-f225-4615-9107-b1373afec070"
)
sample_serial_workflow_in_db.interactive_session_name = (
"interactive-jupyter-5d9b30fd-f225-4615-9107-b1373afec070-5lswkp"
path = "/5d9b30fd-f225-4615-9107-b1373afec070"
name = "interactive-jupyter-5d9b30fd-f225-4615-9107-b1373afec070-5lswkp"
int_session = InteractiveSession(
name=name, path=path, owner_id=sample_serial_workflow_in_db.owner_id,
)
sample_serial_workflow_in_db.sessions.append(int_session)
session.add(sample_serial_workflow_in_db)
session.commit()
with app.test_client() as client:
Expand Down Expand Up @@ -1371,8 +1377,7 @@ def test_close_interactive_session_not_opened(
)
}
with app.test_client() as client:
sample_serial_workflow_in_db.interactive_session = None
sample_serial_workflow_in_db.interactive_session_name = None
sample_serial_workflow_in_db.sessions = []
session.add(sample_serial_workflow_in_db)
session.commit()
res = client.post(
Expand Down
21 changes: 12 additions & 9 deletions tests/test_workflow_run_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from kubernetes.client.rest import ApiException
from mock import DEFAULT, Mock, patch
from reana_commons.config import INTERACTIVE_SESSION_TYPES
from reana_db.models import WorkflowStatus
from reana_db.models import WorkflowStatus, InteractiveSession

from reana_workflow_controller.errors import REANAInteractiveSessionError
from reana_workflow_controller.workflow_run_manager import KubernetesWorkflowRunManager
Expand Down Expand Up @@ -118,7 +118,7 @@ def test_stop_workflow_backend_only_kubernetes(
assert not backend_job_ids


def test_interactive_session_closure(sample_serial_workflow_in_db):
def test_interactive_session_closure(sample_serial_workflow_in_db, session):
"""Test closure of an interactive sessions."""
mocked_k8s_client = Mock()
workflow = sample_serial_workflow_in_db
Expand All @@ -131,10 +131,13 @@ def test_interactive_session_closure(sample_serial_workflow_in_db):
kwrm = KubernetesWorkflowRunManager(workflow)
if len(INTERACTIVE_SESSION_TYPES):
kwrm.start_interactive_session(INTERACTIVE_SESSION_TYPES[0])
assert workflow.interactive_session_name
assert workflow.interactive_session
assert workflow.interactive_session_type
kwrm.stop_interactive_session()
assert workflow.interactive_session_name is None
assert workflow.interactive_session is None
assert workflow.interactive_session_type is None

int_session = InteractiveSession.query.filter_by(
owner_id=workflow.owner_id, type_=INTERACTIVE_SESSION_TYPES[0],
).first()
assert int_session.status == WorkflowStatus.created
kwrm.stop_interactive_session(int_session.id_)
int_session = InteractiveSession.query.filter_by(
owner_id=workflow.owner_id, type_=INTERACTIVE_SESSION_TYPES[0],
).first()
assert int_session.status == WorkflowStatus.stopped

0 comments on commit 84ce25b

Please sign in to comment.