diff --git a/api/genie/migrations/0003_notebookobject_retrycount.py b/api/genie/migrations/0003_notebookobject_retrycount.py new file mode 100644 index 00000000..1761f3c5 --- /dev/null +++ b/api/genie/migrations/0003_notebookobject_retrycount.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.4 on 2021-11-26 03:01 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('genie', '0002_runstatus_updatetimestamp'), + ] + + operations = [ + migrations.AddField( + model_name='notebookobject', + name='retryCount', + field=models.IntegerField(default=0), + ), + ] diff --git a/api/genie/migrations/0004_runstatus_retryremaining.py b/api/genie/migrations/0004_runstatus_retryremaining.py new file mode 100644 index 00000000..e102b503 --- /dev/null +++ b/api/genie/migrations/0004_runstatus_retryremaining.py @@ -0,0 +1,18 @@ +# Generated by Django 3.2.4 on 2021-11-26 11:43 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('genie', '0003_notebookobject_retrycount'), + ] + + operations = [ + migrations.AddField( + model_name='runstatus', + name='retryRemaining', + field=models.IntegerField(default=0), + ), + ] diff --git a/api/genie/migrations/0005_alter_notebookobject_notebooktemplate.py b/api/genie/migrations/0005_alter_notebookobject_notebooktemplate.py new file mode 100644 index 00000000..09a356a1 --- /dev/null +++ b/api/genie/migrations/0005_alter_notebookobject_notebooktemplate.py @@ -0,0 +1,19 @@ +# Generated by Django 3.2.4 on 2021-11-26 15:52 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ('genie', '0004_runstatus_retryremaining'), + ] + + operations = [ + migrations.AlterField( + model_name='notebookobject', + name='notebookTemplate', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='genie.notebooktemplate'), + ), + ] diff --git a/api/genie/models.py b/api/genie/models.py index bfa25199..acdd16c1 100644 --- a/api/genie/models.py +++ b/api/genie/models.py @@ -35,6 +35,7 @@ class RunStatus(models.Model): workflowRun = models.ForeignKey(WorkflowRun, null=True, blank=True, on_delete=models.SET_NULL) taskId = models.CharField(max_length=200, default="") zeppelinServerId = models.CharField(max_length=200, default="") + retryRemaining = models.IntegerField(default=0) # Connection Models @@ -85,8 +86,9 @@ class CustomSchedule(CrontabSchedule): class NotebookObject(models.Model): notebookZeppelinId = models.CharField(max_length=10) connection = models.ForeignKey(Connection, on_delete=models.CASCADE, blank=True, null=True) - notebookTemplate = models.ForeignKey(NotebookTemplate, on_delete=models.CASCADE) + notebookTemplate = models.ForeignKey(NotebookTemplate, on_delete=models.CASCADE, blank=True, null=True) defaultPayload = models.JSONField(default={}) + retryCount = models.IntegerField(default=0) signals.pre_delete.connect(PeriodicTasks.changed, sender=NotebookJob) diff --git a/api/genie/services/notebookJobs.py b/api/genie/services/notebookJobs.py index 8987e787..c8d3ab57 100644 --- a/api/genie/services/notebookJobs.py +++ b/api/genie/services/notebookJobs.py @@ -62,8 +62,10 @@ def getNotebooks(offset: int = 0, limit: int = None , searchQuery: str = None, s for notebook in notebooks: notebook["name"] = notebook["path"] notebookObj = next((notebookObj for notebookObj in notebookObjects if notebookObj.notebookZeppelinId == notebook["id"]), False) + notebook["retryCount"] = 0 if notebookObj: notebook["notebookObjId"] = notebookObj.id + notebook["retryCount"] = notebookObj.retryCount notebookJob = next((notebookJob for notebookJob in notebookJobs if notebookJob.notebookId == notebook["id"]), False) if notebookJob: notebook["isScheduled"] = True @@ -301,6 +303,18 @@ def addNotebookJob(notebookId: str, scheduleId: int): res.update(True, "NotebookJob added successfully", None) return res + @staticmethod + def updateNotebookRetryCount(notebookId: str, retryCount: int): + """ + Service to add a new NotebookJob + :param notebookId: ID of the notebook for which to create job + :param scheduleId: ID of schedule + """ + res = ApiResponse() + notebookObject = NotebookObject.objects.update_or_create(notebookZeppelinId=notebookId, defaults={"retryCount":retryCount}) + res.update(True, "Notebook retry count added successfully", None) + return res + @staticmethod def deleteNotebookJob(notebookId: int): """ @@ -318,11 +332,22 @@ def runNotebookJob(notebookId: str): Service to run notebook job """ res = ApiResponse("Error in running notebook") - runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Manual") + retryRemaining = NotebookJobServices.__getRetryRemaining(notebookId) + runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Manual", retryRemaining=retryRemaining) runNotebookJobTask.delay(notebookId=notebookId, runStatusId=runStatus.id, runType="Manual") res.update(True, "Notebook triggered successfully", None) return res + @staticmethod + def __getRetryRemaining(notebookId: int): + """ + Get retry count set for given notebook in notebook object + """ + notebookObjects = NotebookObject.objects.filter(notebookZeppelinId=notebookId) + if notebookObjects.count(): + return notebookObjects[0].retryCount + return 0 + @staticmethod def stopNotebookJob(notebookId: str): """ diff --git a/api/genie/tasks.py b/api/genie/tasks.py index a8caaf68..444f5f57 100644 --- a/api/genie/tasks.py +++ b/api/genie/tasks.py @@ -7,7 +7,7 @@ from celery import shared_task from django.conf import settings -from genie.models import RunStatus, NOTEBOOK_STATUS_SUCCESS, NOTEBOOK_STATUS_ERROR, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_FINISHED, NOTEBOOK_STATUS_ABORT, NOTEBOOK_STATUS_QUEUED +from genie.models import RunStatus, NotebookObject, NOTEBOOK_STATUS_SUCCESS, NOTEBOOK_STATUS_ERROR, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_FINISHED, NOTEBOOK_STATUS_ABORT, NOTEBOOK_STATUS_QUEUED from system.services import NotificationServices from utils.zeppelinAPI import ZeppelinAPI from utils.kubernetesAPI import Kubernetes @@ -49,6 +49,9 @@ def runNotebookJob(notebookId: str, runStatusId: int = None, runType: str = "Sch __evaluateScaleDownZeppelin() except Exception as ex: logger.error(f"Error occured in notebook {notebookId}. Error: {str(ex)}") + if runStatus.retryRemaining: + __retryNotebook(runStatus) + runStatus.status = NOTEBOOK_STATUS_ERROR runStatus.message = str(ex) runStatus.endTimestamp = dt.datetime.now() @@ -56,6 +59,9 @@ def runNotebookJob(notebookId: str, runStatusId: int = None, runType: str = "Sch NotificationServices.notify(notebookName=notebookName, isSuccess=False, message=str(ex)) else: logger.error(f"Error occured in notebook {notebookId}. Error: Failed to trigger notebook job") + if runStatus.retryRemaining: + __retryNotebook(runStatus) + runStatus.status=NOTEBOOK_STATUS_ERROR runStatus.message = "Failed running notebook" runStatus.endTimestamp = dt.datetime.now() @@ -63,12 +69,29 @@ def runNotebookJob(notebookId: str, runStatusId: int = None, runType: str = "Sch except Exception as ex: logger.error(f"Error occured in notebook {notebookId}. Error: {str(ex)}") + + if runStatus.retryRemaining: + __retryNotebook(runStatus) + runStatus.status=NOTEBOOK_STATUS_ERROR runStatus.message = str(ex) runStatus.endTimestamp = dt.datetime.now() runStatus.save() NotificationServices.notify(notebookName=notebookName if notebookName else notebookId, isSuccess=False, message=str(ex)) +def __retryNotebook(runStatus): + """ + Sets up job + """ + newRunStatus = RunStatus.objects.create( + notebookId=runStatus.notebookId, status=NOTEBOOK_STATUS_QUEUED, runType=runStatus.runType, workflowRun_id=runStatus.workflowRun_id, retryRemaining=runStatus.retryRemaining-1 + ) + response = runNotebookJob.delay(notebookId=newRunStatus.notebookId, runStatusId=newRunStatus.id) + newRunStatus.taskId = response.id + newRunStatus.save() + return newRunStatus.id + + def __allocateZeppelinServer(runStatus: RunStatus): """ Creates or allocates a zeppelin server to run the notebook on @@ -116,7 +139,8 @@ def __getOrCreateRunStatus(runStatusId: int, notebookId: str, runType: str, task Gets or creates a notebook run status object """ if not runStatusId: - runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_RUNNING, runType=runType, taskId=taskId) + retryRemaining=__getRetryRemaining(notebookId) + runStatus = RunStatus.objects.create(notebookId=notebookId, status=NOTEBOOK_STATUS_RUNNING, runType=runType, taskId=taskId, retryRemaining=retryRemaining) else: runStatus = RunStatus.objects.get(id=runStatusId) runStatus.startTimestamp = dt.datetime.now() @@ -125,6 +149,15 @@ def __getOrCreateRunStatus(runStatusId: int, notebookId: str, runType: str, task runStatus.save() return runStatus +def __getRetryRemaining(notebookId: int): + """ + Get retry count set for given notebook in notebook object + """ + notebookObjects = NotebookObject.objects.filter(notebookZeppelinId=notebookId) + if notebookObjects.count(): + return notebookObjects[0].retryCount + return 0 + def __checkIfNotebookRunning(notebookId: str, zeppelin: ZeppelinAPI): """ Checks if notebook is running and returns tuple of isNotebookRunning, notebookName @@ -182,11 +215,15 @@ def __setNotebookStatus(response, runStatus: RunStatus): notebookName = response.get("name", "") for paragraph in paragraphs: if paragraph.get("status") != "FINISHED": + if paragraph.get("status") != "ABORT" and runStatus.retryRemaining: + __retryNotebook(runStatus) runStatus.status=NOTEBOOK_STATUS_ABORT if paragraph.get("status") == "ABORT" else NOTEBOOK_STATUS_ERROR runStatus.endTimestamp = dt.datetime.now() runStatus.save() NotificationServices.notify(notebookName=notebookName, isSuccess=False, message=paragraph.get("title", "") + " " + paragraph.get("id","") + " failed") return + if not response and runStatus.retryRemaining: + __retryNotebook(runStatus) runStatus.status=NOTEBOOK_STATUS_SUCCESS if response else NOTEBOOK_STATUS_ERROR runStatus.endTimestamp = dt.datetime.now() runStatus.save() diff --git a/api/genie/tests/test_views_notebookJobs.py b/api/genie/tests/test_views_notebookJobs.py index ef200274..fdc7f33a 100644 --- a/api/genie/tests/test_views_notebookJobs.py +++ b/api/genie/tests/test_views_notebookJobs.py @@ -19,7 +19,7 @@ def test_getNotebooks(client, populate_seed_data, mocker): response = client.get(path, content_type="application/json") assert response.status_code == 200 assert response.data['data']["count"] == 1 - assert response.data['data']["notebooks"] == [{'path': 'notebook', 'id': 'BX976MDDE', 'name': 'notebook', 'isScheduled': False, 'assignedWorkflow': []}] + assert response.data['data']["notebooks"] == [{'path': 'notebook', 'id': 'BX976MDDE', 'name': 'notebook', 'isScheduled': False, 'assignedWorkflow': [], 'retryCount': 0}] @pytest.mark.django_db @@ -104,3 +104,25 @@ def test_notebookJob(client, populate_seed_data, mocker): assert response.status_code == 200 assert response.data['success'] == True + +@pytest.mark.django_db +def test_notebookJob(client, populate_seed_data, mocker): + path = reverse('notebooksJobView') + data = {"notebookId": "BX976MDDE", "retryCount": 4} + response = client.post(path, data=data, content_type="application/json") + assert response.status_code == 200 + assert response.data['success'] == True + + # Test if it was updated + path = reverse('notebooks', kwargs={"offset": 0}) + mocker.patch("utils.zeppelinAPI.ZeppelinAPI.getAllNotebooks", return_value = [{"path": "notebook", "id": "BX976MDDE"}]) + response = client.get(path, content_type="application/json") + assert response.status_code == 200 + assert response.data['data']["count"] == 1 + assert response.data['data']["notebooks"][0]["retryCount"] == 4 + + +# write test case for update retryCount view + +# write test case + diff --git a/api/genie/views.py b/api/genie/views.py index 4a9055ba..c7e6eb82 100644 --- a/api/genie/views.py +++ b/api/genie/views.py @@ -96,8 +96,13 @@ def get(self, request, notebookId=None): def post(self, request): notebookId = request.data["notebookId"] - scheduleId = request.data["scheduleId"] - res = NotebookJobServices.addNotebookJob(notebookId=notebookId, scheduleId=scheduleId) + scheduleId = request.data.get("scheduleId") + retryCount = request.data.get("retryCount", None) + + if retryCount == None: + res = NotebookJobServices.addNotebookJob(notebookId=notebookId, scheduleId=scheduleId) + else: + res = NotebookJobServices.updateNotebookRetryCount(notebookId=notebookId, retryCount=retryCount) return Response(res.json()) def delete(self, request, notebookId=None): diff --git a/api/workflows/taskUtils.py b/api/workflows/taskUtils.py index 984e6094..a7075ee3 100644 --- a/api/workflows/taskUtils.py +++ b/api/workflows/taskUtils.py @@ -15,7 +15,7 @@ from utils.zeppelinAPI import Zeppelin from genie.tasks import runNotebookJob as runNotebookJobTask -from genie.models import NOTEBOOK_STATUS_QUEUED, RunStatus, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_SUCCESS +from genie.models import NotebookObject, NOTEBOOK_STATUS_QUEUED, RunStatus, NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_SUCCESS # Get an instance of a logger logger = logging.getLogger(__name__) @@ -37,7 +37,7 @@ def runWorkflow(workflowId: int, taskId: str, workflowRunId: int = None): workflowRun = TaskUtils.__getOrCreateWorkflowRun(workflowId, taskId, workflowRunId) notebookRunStatusIds = TaskUtils.__runNotebookJobsFromList(notebookIds, workflowRun.id) workflowStatus = polling.poll( - lambda: TaskUtils.__checkGivenRunStatuses(notebookRunStatusIds), + lambda: TaskUtils.__checkGivenRunStatuses(workflowRun.id), check_success= lambda x: x != "RUNNING", step=3, timeout=3600*6, @@ -59,8 +59,9 @@ def __runNotebookJobsFromList(notebookIds: List[int], workflowRunId: int): """ notebookRunStatusIds = [] for notebookId in notebookIds: + retryRemaining = TaskUtils.__getRetryRemaining(notebookId) runStatus = RunStatus.objects.create( - notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Workflow", workflowRun_id=workflowRunId + notebookId=notebookId, status=NOTEBOOK_STATUS_QUEUED, runType="Workflow", workflowRun_id=workflowRunId, retryRemaining=retryRemaining ) response = runNotebookJobTask.delay(notebookId=notebookId, runStatusId=runStatus.id) runStatus.taskId = response.id @@ -68,6 +69,17 @@ def __runNotebookJobsFromList(notebookIds: List[int], workflowRunId: int): notebookRunStatusIds.append(runStatus.id) time.sleep(0.2) # Sleep for 200ms to make sure zeppelin server has been allocated to previous notebook return notebookRunStatusIds + + + def __getRetryRemaining(notebookId: int): + """ + Get retry count set for given notebook in notebook object + """ + notebookObjects = NotebookObject.objects.filter(notebookZeppelinId=notebookId) + if notebookObjects.count(): + return notebookObjects[0].retryCount + return 0 + @staticmethod def __getNotebookIdsInWorkflow(workflowId: int): @@ -98,14 +110,14 @@ def __getOrCreateWorkflowRun(workflowId: int, taskId: str, workflowRunId: int = return workflowRun @staticmethod - def __checkGivenRunStatuses(notebookRunStatusIds: List[int]): + def __checkGivenRunStatuses(workflowRunId: int): """ Check if given runStatuses are status is SUCCESS """ - runningAndQueuedNotebookCount = RunStatus.objects.filter(id__in=notebookRunStatusIds).exclude(status=NOTEBOOK_STATUS_RUNNING).exclude(status=NOTEBOOK_STATUS_QUEUED).count() - if (len(notebookRunStatusIds) == runningAndQueuedNotebookCount): - successfulNotebookCount = RunStatus.objects.filter(id__in=notebookRunStatusIds, status=NOTEBOOK_STATUS_SUCCESS).count() - logger.info(f"Batch completed. Successfull Notebooks : {str(successfulNotebookCount)}. Notebooks in batch: {str(len(notebookRunStatusIds))}") - logger.info(f"Notebook Run Status Ids: {str(notebookRunStatusIds)}") - return (len(notebookRunStatusIds) == successfulNotebookCount) + runningAndQueuedNotebookCount = RunStatus.objects.filter(workflowRun_id=workflowRunId).filter(status__in=[NOTEBOOK_STATUS_RUNNING, NOTEBOOK_STATUS_QUEUED]).count() + if not runningAndQueuedNotebookCount: + successfulNotebookCount = RunStatus.objects.filter(workflowRun_id=workflowRunId, status=NOTEBOOK_STATUS_SUCCESS).count() + logger.info(f"Batch completed. Successfull Notebooks : {str(successfulNotebookCount)}.") + # logger.info(f"Notebook Run Status Ids: {str(notebookRunStatusIds)}") + return RunStatus.objects.filter(workflowRun_id=workflowRunId).exclude(status=NOTEBOOK_STATUS_SUCCESS).filter(retryRemaining=0).count() == 0 return "RUNNING" diff --git a/ui/src/components/Notebooks/NotebookTable.js b/ui/src/components/Notebooks/NotebookTable.js index 18e0ca17..17c4b4e4 100644 --- a/ui/src/components/Notebooks/NotebookTable.js +++ b/ui/src/components/Notebooks/NotebookTable.js @@ -14,9 +14,10 @@ import { Drawer, Popconfirm, Menu, - Dropdown + Dropdown, + InputNumber } from "antd"; -import { MoreOutlined, PlayCircleOutlined, UnorderedListOutlined, StopOutlined, FileTextOutlined, DeleteOutlined, CopyOutlined, CloseOutlined, EditOutlined } from '@ant-design/icons'; +import { MoreOutlined, PlayCircleOutlined, UnorderedListOutlined, StopOutlined, FileTextOutlined, DeleteOutlined, CopyOutlined, CloseCircleOutlined, CloseOutlined, EditOutlined } from '@ant-design/icons'; import NotebookRunLogs from "./NotebookRunLogs.js" import AddNotebook from "./AddNotebook.js" import EditNotebook from "./EditNotebook.js" @@ -124,7 +125,7 @@ export default function NotebookTable() { if(response.success){ message.success(response.message) } - else{ + else { message.error(response.message) } setSelectedNotebook(null) @@ -135,6 +136,19 @@ export default function NotebookTable() { } } + const editNotebookRetryCount = async (notebookId, newRetryCount) => { + if (newRetryCount == null) return; + // console.log(notebookId, newRetryCount) + const response = await notebookService.updateNotebookRetryCount(notebookId, newRetryCount); + if(response.success){ + message.success(response.message) + } else { + message.error(response.message) + } + setSelectedNotebook(null) + getNotebooks((currentPage -1)*limit) + } + const handleTableChange = (event, filter, sorter) => { setSorter({columnKey: sorter.columnKey, order: sorter.order}) setFilter(filter) @@ -345,6 +359,34 @@ export default function NotebookTable() { ) } }, + { + title: "Retry Count", + dataIndex: "retryCount", + key: "retryCount", + align: "right", + width: "10%", + sorter: ()=>{}, + sortOrder: sorter.columnKey === 'retryCount' && sorter.order, + ellipsis: true, + render: (retryCount, notebook) => { + return ( + <> + { + selectedNotebook === notebook.id ? + + editNotebookRetryCount(notebook.id, val)} /> + setSelectedNotebook(null)} /> + + : + + {retryCount + " "} + false} className={style.linkText} onClick={()=>setSelectedNotebook(notebook.id)}> + + } + + ); + } + }, { title: "Latest Run", dataIndex: "lastRun", diff --git a/ui/src/services/notebooks.js b/ui/src/services/notebooks.js index 21ef869b..fdcdab59 100644 --- a/ui/src/services/notebooks.js +++ b/ui/src/services/notebooks.js @@ -78,6 +78,11 @@ class NotebookService { return response } + async updateNotebookRetryCount(notebookId, retryCount){ + const response = await apiService.post("genie/notebookjob/", {notebookId: notebookId, retryCount: retryCount}) + return response + } + async getTimezones(){ const response = await apiService.get("genie/timezones/") if(response.success === true)