diff --git a/api/genie/migrations/0003_notebookobject_retrycount.py b/api/genie/migrations/0003_notebookobject_retrycount.py
new file mode 100644
index 00000000..1761f3c5
--- /dev/null
+++ b/api/genie/migrations/0003_notebookobject_retrycount.py
@@ -0,0 +1,18 @@
+# Generated by Django 3.2.4 on 2021-11-26 03:01
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('genie', '0002_runstatus_updatetimestamp'),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name='notebookobject',
+ name='retryCount',
+ field=models.IntegerField(default=0),
+ ),
+ ]
diff --git a/api/genie/migrations/0004_runstatus_retryremaining.py b/api/genie/migrations/0004_runstatus_retryremaining.py
new file mode 100644
index 00000000..e102b503
--- /dev/null
+++ b/api/genie/migrations/0004_runstatus_retryremaining.py
@@ -0,0 +1,18 @@
+# Generated by Django 3.2.4 on 2021-11-26 11:43
+
+from django.db import migrations, models
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('genie', '0003_notebookobject_retrycount'),
+ ]
+
+ operations = [
+ migrations.AddField(
+ model_name='runstatus',
+ name='retryRemaining',
+ field=models.IntegerField(default=0),
+ ),
+ ]
diff --git a/api/genie/migrations/0005_alter_notebookobject_notebooktemplate.py b/api/genie/migrations/0005_alter_notebookobject_notebooktemplate.py
new file mode 100644
index 00000000..09a356a1
--- /dev/null
+++ b/api/genie/migrations/0005_alter_notebookobject_notebooktemplate.py
@@ -0,0 +1,19 @@
+# Generated by Django 3.2.4 on 2021-11-26 15:52
+
+from django.db import migrations, models
+import django.db.models.deletion
+
+
+class Migration(migrations.Migration):
+
+ dependencies = [
+ ('genie', '0004_runstatus_retryremaining'),
+ ]
+
+ operations = [
+ migrations.AlterField(
+ model_name='notebookobject',
+ name='notebookTemplate',
+ field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='genie.notebooktemplate'),
+ ),
+ ]
diff --git a/api/genie/models.py b/api/genie/models.py
index bfa25199..acdd16c1 100644
--- a/api/genie/models.py
+++ b/api/genie/models.py
@@ -35,6 +35,7 @@ class RunStatus(models.Model):
workflowRun = models.ForeignKey(WorkflowRun, null=True, blank=True, on_delete=models.SET_NULL)
taskId = models.CharField(max_length=200, default="")
zeppelinServerId = models.CharField(max_length=200, default="")
+ retryRemaining = models.IntegerField(default=0)
# Connection Models
@@ -85,8 +86,9 @@ class CustomSchedule(CrontabSchedule):
class NotebookObject(models.Model):
notebookZeppelinId = models.CharField(max_length=10)
connection = models.ForeignKey(Connection, on_delete=models.CASCADE, blank=True, null=True)
- notebookTemplate = models.ForeignKey(NotebookTemplate, on_delete=models.CASCADE)
+ notebookTemplate = models.ForeignKey(NotebookTemplate, on_delete=models.CASCADE, blank=True, null=True)
defaultPayload = models.JSONField(default={})
+ retryCount = models.IntegerField(default=0)
signals.pre_delete.connect(PeriodicTasks.changed, sender=NotebookJob)
diff --git a/api/genie/services/notebookJobs.py b/api/genie/services/notebookJobs.py
index 8987e787..c8d3ab57 100644
--- a/api/genie/services/notebookJobs.py
+++ b/api/genie/services/notebookJobs.py
@@ -62,8 +62,10 @@ def getNotebooks(offset: int = 0, limit: int = None , searchQuery: str = None, s
for notebook in notebooks:
notebook["name"] = notebook["path"]
notebookObj = next((notebookObj for notebookObj in notebookObjects if notebookObj.notebookZeppelinId == notebook["id"]), False)
+ notebook["retryCount"] = 0
if notebookObj:
notebook["notebookObjId"] = notebookObj.id
+ notebook["retryCount"] = notebookObj.retryCount
notebookJob = next((notebookJob for notebookJob in notebookJobs if notebookJob.notebookId == notebook["id"]), False)
if notebookJob:
notebook["isScheduled"] = True
@@ -301,6 +303,18 @@ def addNotebookJob(notebookId: str, scheduleId: int):
res.update(True, "NotebookJob added successfully", None)
return res
+ @staticmethod
+ def updateNotebookRetryCount(notebookId: str, retryCount: int):
+ """
+ Service to add a new NotebookJob
+ :param notebookId: ID of the notebook for which to create job
+ :param scheduleId: ID of schedule
+ """
+ res = ApiResponse()
+ notebookObject = NotebookObject.objects.update_or_create(notebookZeppelinId=notebookId, defaults={"retryCount":retryCount})
+ res.update(True, "Notebook retry count added successfully", None)
+ return res
+
@staticmethod
def deleteNotebookJob(notebookId: int):
"""
@@ -318,11 +332,22 @@ def runNotebookJob(notebookId: str):
Service to run notebook job
"""
res = ApiResponse("Error in running notebook")
- runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Manual")
+ retryRemaining = NotebookJobServices.__getRetryRemaining(notebookId)
+ runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Manual", retryRemaining=retryRemaining)
runNotebookJobTask.delay(notebookId=notebookId, runStatusId=runStatus.id, runType="Manual")
res.update(True, "Notebook triggered successfully", None)
return res
+ @staticmethod
+ def __getRetryRemaining(notebookId: int):
+ """
+ Get retry count set for given notebook in notebook object
+ """
+ notebookObjects = NotebookObject.objects.filter(notebookZeppelinId=notebookId)
+ if notebookObjects.count():
+ return notebookObjects[0].retryCount
+ return 0
+
@staticmethod
def stopNotebookJob(notebookId: str):
"""
diff --git a/api/genie/tasks.py b/api/genie/tasks.py
index a8caaf68..444f5f57 100644
--- a/api/genie/tasks.py
+++ b/api/genie/tasks.py
@@ -7,7 +7,7 @@
from celery import shared_task
from django.conf import settings
-from genie.models import RunStatus, NOTEBOOK_STATUS_SUCCESS, NOTEBOOK_STATUS_ERROR, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_FINISHED, NOTEBOOK_STATUS_ABORT, NOTEBOOK_STATUS_QUEUED
+from genie.models import RunStatus, NotebookObject, NOTEBOOK_STATUS_SUCCESS, NOTEBOOK_STATUS_ERROR, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_FINISHED, NOTEBOOK_STATUS_ABORT, NOTEBOOK_STATUS_QUEUED
from system.services import NotificationServices
from utils.zeppelinAPI import ZeppelinAPI
from utils.kubernetesAPI import Kubernetes
@@ -49,6 +49,9 @@ def runNotebookJob(notebookId: str, runStatusId: int = None, runType: str = "Sch
__evaluateScaleDownZeppelin()
except Exception as ex:
logger.error(f"Error occured in notebook {notebookId}. Error: {str(ex)}")
+ if runStatus.retryRemaining:
+ __retryNotebook(runStatus)
+
runStatus.status = NOTEBOOK_STATUS_ERROR
runStatus.message = str(ex)
runStatus.endTimestamp = dt.datetime.now()
@@ -56,6 +59,9 @@ def runNotebookJob(notebookId: str, runStatusId: int = None, runType: str = "Sch
NotificationServices.notify(notebookName=notebookName, isSuccess=False, message=str(ex))
else:
logger.error(f"Error occured in notebook {notebookId}. Error: Failed to trigger notebook job")
+ if runStatus.retryRemaining:
+ __retryNotebook(runStatus)
+
runStatus.status=NOTEBOOK_STATUS_ERROR
runStatus.message = "Failed running notebook"
runStatus.endTimestamp = dt.datetime.now()
@@ -63,12 +69,29 @@ def runNotebookJob(notebookId: str, runStatusId: int = None, runType: str = "Sch
except Exception as ex:
logger.error(f"Error occured in notebook {notebookId}. Error: {str(ex)}")
+
+ if runStatus.retryRemaining:
+ __retryNotebook(runStatus)
+
runStatus.status=NOTEBOOK_STATUS_ERROR
runStatus.message = str(ex)
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()
NotificationServices.notify(notebookName=notebookName if notebookName else notebookId, isSuccess=False, message=str(ex))
+def __retryNotebook(runStatus):
+ """
+ Sets up job
+ """
+ newRunStatus = RunStatus.objects.create(
+ notebookId=runStatus.notebookId, status=NOTEBOOK_STATUS_QUEUED, runType=runStatus.runType, workflowRun_id=runStatus.workflowRun_id, retryRemaining=runStatus.retryRemaining-1
+ )
+ response = runNotebookJob.delay(notebookId=newRunStatus.notebookId, runStatusId=newRunStatus.id)
+ newRunStatus.taskId = response.id
+ newRunStatus.save()
+ return newRunStatus.id
+
+
def __allocateZeppelinServer(runStatus: RunStatus):
"""
Creates or allocates a zeppelin server to run the notebook on
@@ -116,7 +139,8 @@ def __getOrCreateRunStatus(runStatusId: int, notebookId: str, runType: str, task
Gets or creates a notebook run status object
"""
if not runStatusId:
- runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_RUNNING, runType=runType, taskId=taskId)
+ retryRemaining=__getRetryRemaining(notebookId)
+ runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_RUNNING, runType=runType, taskId=taskId, retryRemaining=retryRemaining)
else:
runStatus = RunStatus.objects.get(id=runStatusId)
runStatus.startTimestamp = dt.datetime.now()
@@ -125,6 +149,15 @@ def __getOrCreateRunStatus(runStatusId: int, notebookId: str, runType: str, task
runStatus.save()
return runStatus
+def __getRetryRemaining(notebookId: int):
+ """
+ Get retry count set for given notebook in notebook object
+ """
+ notebookObjects = NotebookObject.objects.filter(notebookZeppelinId=notebookId)
+ if notebookObjects.count():
+ return notebookObjects[0].retryCount
+ return 0
+
def __checkIfNotebookRunning(notebookId: str, zeppelin: ZeppelinAPI):
"""
Checks if notebook is running and returns tuple of isNotebookRunning, notebookName
@@ -182,11 +215,15 @@ def __setNotebookStatus(response, runStatus: RunStatus):
notebookName = response.get("name", "")
for paragraph in paragraphs:
if paragraph.get("status") != "FINISHED":
+ if paragraph.get("status") != "ABORT" and runStatus.retryRemaining:
+ __retryNotebook(runStatus)
runStatus.status=NOTEBOOK_STATUS_ABORT if paragraph.get("status") == "ABORT" else NOTEBOOK_STATUS_ERROR
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()
NotificationServices.notify(notebookName=notebookName, isSuccess=False, message=paragraph.get("title", "") + " " + paragraph.get("id","") + " failed")
return
+ if not response and runStatus.retryRemaining:
+ __retryNotebook(runStatus)
runStatus.status=NOTEBOOK_STATUS_SUCCESS if response else NOTEBOOK_STATUS_ERROR
runStatus.endTimestamp = dt.datetime.now()
runStatus.save()
diff --git a/api/genie/tests/test_views_notebookJobs.py b/api/genie/tests/test_views_notebookJobs.py
index ef200274..fdc7f33a 100644
--- a/api/genie/tests/test_views_notebookJobs.py
+++ b/api/genie/tests/test_views_notebookJobs.py
@@ -19,7 +19,7 @@ def test_getNotebooks(client, populate_seed_data, mocker):
response = client.get(path, content_type="application/json")
assert response.status_code == 200
assert response.data['data']["count"] == 1
- assert response.data['data']["notebooks"] == [{'path': 'notebook', 'id': 'BX976MDDE', 'name': 'notebook', 'isScheduled': False, 'assignedWorkflow': []}]
+ assert response.data['data']["notebooks"] == [{'path': 'notebook', 'id': 'BX976MDDE', 'name': 'notebook', 'isScheduled': False, 'assignedWorkflow': [], 'retryCount': 0}]
@pytest.mark.django_db
@@ -104,3 +104,25 @@ def test_notebookJob(client, populate_seed_data, mocker):
assert response.status_code == 200
assert response.data['success'] == True
+
+@pytest.mark.django_db
+def test_notebookJob(client, populate_seed_data, mocker):
+ path = reverse('notebooksJobView')
+ data = {"notebookId": "BX976MDDE", "retryCount": 4}
+ response = client.post(path, data=data, content_type="application/json")
+ assert response.status_code == 200
+ assert response.data['success'] == True
+
+ # Test if it was updated
+ path = reverse('notebooks', kwargs={"offset": 0})
+ mocker.patch("utils.zeppelinAPI.ZeppelinAPI.getAllNotebooks", return_value = [{"path": "notebook", "id": "BX976MDDE"}])
+ response = client.get(path, content_type="application/json")
+ assert response.status_code == 200
+ assert response.data['data']["count"] == 1
+ assert response.data['data']["notebooks"][0]["retryCount"] == 4
+
+
+# write test case for update retryCount view
+
+# write test case
+
diff --git a/api/genie/views.py b/api/genie/views.py
index 4a9055ba..c7e6eb82 100644
--- a/api/genie/views.py
+++ b/api/genie/views.py
@@ -96,8 +96,13 @@ def get(self, request, notebookId=None):
def post(self, request):
notebookId = request.data["notebookId"]
- scheduleId = request.data["scheduleId"]
- res = NotebookJobServices.addNotebookJob(notebookId=notebookId, scheduleId=scheduleId)
+ scheduleId = request.data.get("scheduleId")
+ retryCount = request.data.get("retryCount", None)
+
+ if retryCount == None:
+ res = NotebookJobServices.addNotebookJob(notebookId=notebookId, scheduleId=scheduleId)
+ else:
+ res = NotebookJobServices.updateNotebookRetryCount(notebookId=notebookId, retryCount=retryCount)
return Response(res.json())
def delete(self, request, notebookId=None):
diff --git a/api/workflows/taskUtils.py b/api/workflows/taskUtils.py
index 984e6094..a7075ee3 100644
--- a/api/workflows/taskUtils.py
+++ b/api/workflows/taskUtils.py
@@ -15,7 +15,7 @@
from utils.zeppelinAPI import Zeppelin
from genie.tasks import runNotebookJob as runNotebookJobTask
-from genie.models import NOTEBOOK_STATUS_QUEUED, RunStatus, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_SUCCESS
+from genie.models import NotebookObject, NOTEBOOK_STATUS_QUEUED, RunStatus, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_SUCCESS
# Get an instance of a logger
logger = logging.getLogger(__name__)
@@ -37,7 +37,7 @@ def runWorkflow(workflowId: int, taskId: str, workflowRunId: int = None):
workflowRun = TaskUtils.__getOrCreateWorkflowRun(workflowId, taskId, workflowRunId)
notebookRunStatusIds = TaskUtils.__runNotebookJobsFromList(notebookIds, workflowRun.id)
workflowStatus = polling.poll(
- lambda: TaskUtils.__checkGivenRunStatuses(notebookRunStatusIds),
+ lambda: TaskUtils.__checkGivenRunStatuses(workflowRun.id),
check_success= lambda x: x != "RUNNING",
step=3,
timeout=3600*6,
@@ -59,8 +59,9 @@ def __runNotebookJobsFromList(notebookIds: List[int], workflowRunId: int):
"""
notebookRunStatusIds = []
for notebookId in notebookIds:
+ retryRemaining = TaskUtils.__getRetryRemaining(notebookId)
runStatus = RunStatus.objects.create(
- notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Workflow", workflowRun_id=workflowRunId
+ notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Workflow", workflowRun_id=workflowRunId, retryRemaining=retryRemaining
)
response = runNotebookJobTask.delay(notebookId=notebookId, runStatusId=runStatus.id)
runStatus.taskId = response.id
@@ -68,6 +69,17 @@ def __runNotebookJobsFromList(notebookIds: List[int], workflowRunId: int):
notebookRunStatusIds.append(runStatus.id)
time.sleep(0.2) # Sleep for 200ms to make sure zeppelin server has been allocated to previous notebook
return notebookRunStatusIds
+
+
+ def __getRetryRemaining(notebookId: int):
+ """
+ Get retry count set for given notebook in notebook object
+ """
+ notebookObjects = NotebookObject.objects.filter(notebookZeppelinId=notebookId)
+ if notebookObjects.count():
+ return notebookObjects[0].retryCount
+ return 0
+
@staticmethod
def __getNotebookIdsInWorkflow(workflowId: int):
@@ -98,14 +110,14 @@ def __getOrCreateWorkflowRun(workflowId: int, taskId: str, workflowRunId: int =
return workflowRun
@staticmethod
- def __checkGivenRunStatuses(notebookRunStatusIds: List[int]):
+ def __checkGivenRunStatuses(workflowRunId: int):
"""
Check if given runStatuses are status is SUCCESS
"""
- runningAndQueuedNotebookCount = RunStatus.objects.filter(id__in=notebookRunStatusIds).exclude(status=NOTEBOOK_STATUS_RUNNING).exclude(status=NOTEBOOK_STATUS_QUEUED).count()
- if (len(notebookRunStatusIds) == runningAndQueuedNotebookCount):
- successfulNotebookCount = RunStatus.objects.filter(id__in=notebookRunStatusIds, status=NOTEBOOK_STATUS_SUCCESS).count()
- logger.info(f"Batch completed. Successfull Notebooks : {str(successfulNotebookCount)}. Notebooks in batch: {str(len(notebookRunStatusIds))}")
- logger.info(f"Notebook Run Status Ids: {str(notebookRunStatusIds)}")
- return (len(notebookRunStatusIds) == successfulNotebookCount)
+ runningAndQueuedNotebookCount = RunStatus.objects.filter(workflowRun_id=workflowRunId).filter(status__in=[NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_QUEUED]).count()
+ if not runningAndQueuedNotebookCount:
+ successfulNotebookCount = RunStatus.objects.filter(workflowRun_id=workflowRunId, status=NOTEBOOK_STATUS_SUCCESS).count()
+ logger.info(f"Batch completed. Successfull Notebooks : {str(successfulNotebookCount)}.")
+ # logger.info(f"Notebook Run Status Ids: {str(notebookRunStatusIds)}")
+ return RunStatus.objects.filter(workflowRun_id=workflowRunId).exclude(status=NOTEBOOK_STATUS_SUCCESS).filter(retryRemaining=0).count() == 0
return "RUNNING"
diff --git a/ui/src/components/Notebooks/NotebookTable.js b/ui/src/components/Notebooks/NotebookTable.js
index 18e0ca17..17c4b4e4 100644
--- a/ui/src/components/Notebooks/NotebookTable.js
+++ b/ui/src/components/Notebooks/NotebookTable.js
@@ -14,9 +14,10 @@ import {
Drawer,
Popconfirm,
Menu,
- Dropdown
+ Dropdown,
+ InputNumber
} from "antd";
-import { MoreOutlined, PlayCircleOutlined, UnorderedListOutlined, StopOutlined, FileTextOutlined, DeleteOutlined, CopyOutlined, CloseOutlined, EditOutlined } from '@ant-design/icons';
+import { MoreOutlined, PlayCircleOutlined, UnorderedListOutlined, StopOutlined, FileTextOutlined, DeleteOutlined, CopyOutlined, CloseCircleOutlined, CloseOutlined, EditOutlined } from '@ant-design/icons';
import NotebookRunLogs from "./NotebookRunLogs.js"
import AddNotebook from "./AddNotebook.js"
import EditNotebook from "./EditNotebook.js"
@@ -124,7 +125,7 @@ export default function NotebookTable() {
if(response.success){
message.success(response.message)
}
- else{
+ else {
message.error(response.message)
}
setSelectedNotebook(null)
@@ -135,6 +136,19 @@ export default function NotebookTable() {
}
}
+ const editNotebookRetryCount = async (notebookId, newRetryCount) => {
+ if (newRetryCount == null) return;
+ // console.log(notebookId, newRetryCount)
+ const response = await notebookService.updateNotebookRetryCount(notebookId, newRetryCount);
+ if(response.success){
+ message.success(response.message)
+ } else {
+ message.error(response.message)
+ }
+ setSelectedNotebook(null)
+ getNotebooks((currentPage -1)*limit)
+ }
+
const handleTableChange = (event, filter, sorter) => {
setSorter({columnKey: sorter.columnKey, order: sorter.order})
setFilter(filter)
@@ -345,6 +359,34 @@ export default function NotebookTable() {
)
}
},
+ {
+ title: "Retry Count",
+ dataIndex: "retryCount",
+ key: "retryCount",
+ align: "right",
+ width: "10%",
+ sorter: ()=>{},
+ sortOrder: sorter.columnKey === 'retryCount' && sorter.order,
+ ellipsis: true,
+ render: (retryCount, notebook) => {
+ return (
+ <>
+ {
+ selectedNotebook === notebook.id ?
+
+ editNotebookRetryCount(notebook.id, val)} />
+ setSelectedNotebook(null)} />
+
+ :
+
+ {retryCount + " "}
+ false} className={style.linkText} onClick={()=>setSelectedNotebook(notebook.id)}>
+
+ }
+ >
+ );
+ }
+ },
{
title: "Latest Run",
dataIndex: "lastRun",
diff --git a/ui/src/services/notebooks.js b/ui/src/services/notebooks.js
index 21ef869b..fdcdab59 100644
--- a/ui/src/services/notebooks.js
+++ b/ui/src/services/notebooks.js
@@ -78,6 +78,11 @@ class NotebookService {
return response
}
+ async updateNotebookRetryCount(notebookId, retryCount){
+ const response = await apiService.post("genie/notebookjob/", {notebookId: notebookId, retryCount: retryCount})
+ return response
+ }
+
async getTimezones(){
const response = await apiService.get("genie/timezones/")
if(response.success === true)