Skip to content

Commit

Permalink
Added Workspace Task Analytics Cron
Browse files Browse the repository at this point in the history
  • Loading branch information
Shanks0465 committed Dec 27, 2024
1 parent 0aed468 commit 2c096f2
Show file tree
Hide file tree
Showing 4 changed files with 411 additions and 34 deletions.
5 changes: 5 additions & 0 deletions backend/shoonya_backend/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,18 @@
"schedule": crontab(minute=0, hour=0), # every mid night
},
"fetchTaskCounts": {"task": "fetchTaskCounts", "schedule": crontab(minute="*/10")},
"fetchWorkspaceTaskCounts": {
"task": "fetchWorkspaceTaskCounts",
"schedule": crontab(minute="*/10"),
},
}


@worker_ready.connect
def at_start(sender, **k):
with sender.app.connection() as conn:
sender.app.send_task("fetchTaskCounts", connection=conn)
sender.app.send_task("fetchWorkspaceTaskCounts", connection=conn)


# Celery Task related settings
Expand Down
347 changes: 347 additions & 0 deletions backend/user_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,3 +558,350 @@ def fetch_task_counts():
)
final_result_for_all__types[pjt_type] = formatted_result
upsert_stat("task_count", org, final_result_for_all__types)


def fetch_workspace_task_counts():

sql_query = """
WITH
ANNOTATION_TASKS (LANGUAGE, PROJECT_TYPE, WORKSPACE_ID, COUNT) AS (
SELECT
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID,
COUNT(TSK.ID)
FROM
TASKS_TASK AS TSK,
PROJECTS_PROJECT AS PJT
WHERE
TSK.PROJECT_ID_ID = PJT.ID
AND TSK.TASK_STATUS IN ('annotated', 'reviewed', 'super_checked')
GROUP BY
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID
),
REVIEWER_TASKS (LANGUAGE, PROJECT_TYPE, WORKSPACE_ID, COUNT) AS (
SELECT
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID,
COUNT(TSK.ID)
FROM
TASKS_TASK AS TSK,
PROJECTS_PROJECT AS PJT
WHERE
TSK.PROJECT_ID_ID = PJT.ID
AND TSK.TASK_STATUS IN ('reviewed', 'super_checked')
AND PJT.PROJECT_STAGE IN (2, 3)
GROUP BY
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID
),
SUPERCHECKER_TASKS (LANGUAGE, PROJECT_TYPE, WORKSPACE_ID, COUNT) AS (
SELECT
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID,
COUNT(TSK.ID)
FROM
TASKS_TASK AS TSK,
PROJECTS_PROJECT AS PJT
WHERE
TSK.PROJECT_ID_ID = PJT.ID
AND TSK.TASK_STATUS IN ('super_checked')
AND PJT.PROJECT_STAGE IN (3)
GROUP BY
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID
),
ANNOTATION_TASKS_EXPORTED (LANGUAGE, PROJECT_TYPE, WORKSPACE_ID, COUNT) AS (
SELECT
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID,
COUNT(TSK.ID)
FROM
TASKS_TASK AS TSK,
PROJECTS_PROJECT AS PJT
WHERE
TSK.PROJECT_ID_ID = PJT.ID
AND TSK.TASK_STATUS IN ('exported')
AND PJT.PROJECT_STAGE IN (1)
GROUP BY
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID
),
REVIEWER_TASKS_EXPORTED (LANGUAGE, PROJECT_TYPE, WORKSPACE_ID, COUNT) AS (
SELECT
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID,
COUNT(TSK.ID)
FROM
TASKS_TASK AS TSK,
PROJECTS_PROJECT AS PJT
WHERE
TSK.PROJECT_ID_ID = PJT.ID
AND TSK.TASK_STATUS IN ('exported')
AND PJT.PROJECT_STAGE IN (2)
GROUP BY
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID
),
SUPERCHECK_TASKS_EXPORTED (LANGUAGE, PROJECT_TYPE, WORKSPACE_ID, COUNT) AS (
SELECT
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID,
COUNT(TSK.ID)
FROM
TASKS_TASK AS TSK,
PROJECTS_PROJECT AS PJT
WHERE
TSK.PROJECT_ID_ID = PJT.ID
AND TSK.TASK_STATUS IN ('exported')
AND PJT.PROJECT_STAGE IN (3)
GROUP BY
PJT.TGT_LANGUAGE,
PJT.PROJECT_TYPE,
PJT.WORKSPACE_ID_ID
),
REVIEWER_TASKS_COUNT (LANGUAGE, PROJECT_TYPE, WORKSPACE_ID, COUNT, TAG) AS (
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
SUM(COUNT) AS TASK_COUNT,
'rew'
FROM
(
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT
FROM
REVIEWER_TASKS
UNION ALL
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT
FROM
REVIEWER_TASKS_EXPORTED
UNION ALL
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT
FROM
SUPERCHECK_TASKS_EXPORTED
) AS MERGED_TABLES
GROUP BY
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID
),
ANNOTATION_TASKS_COUNT (LANGUAGE, PROJECT_TYPE, WORKSPACE_ID, COUNT, TAG) AS (
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
SUM(COUNT) AS TASK_COUNT,
'ann'
FROM
(
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT
FROM
ANNOTATION_TASKS
UNION ALL
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT
FROM
ANNOTATION_TASKS_EXPORTED
UNION ALL
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT
FROM
REVIEWER_TASKS_EXPORTED
UNION ALL
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT
FROM
SUPERCHECK_TASKS_EXPORTED
) AS MERGED_TABLES
GROUP BY
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID
),
SUPERCHECK_TASKS_COUNT (LANGUAGE, PROJECT_TYPE, WORKSPACE_ID, COUNT, TAG) AS (
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
SUM(COUNT) AS TASK_COUNT,
'sup'
FROM
(
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT
FROM
SUPERCHECKER_TASKS
UNION ALL
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT
FROM
SUPERCHECK_TASKS_EXPORTED
) AS MERGED_TABLES
GROUP BY
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID
),
CUMULATIVE_TASK_COUNTS (LANGUAGE, PROJECT_TYPE, WORKSPACE_ID, COUNT, TAG) AS (
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT,
TAG
FROM
ANNOTATION_TASKS_COUNT
UNION ALL
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT,
TAG
FROM
REVIEWER_TASKS_COUNT
UNION ALL
SELECT
LANGUAGE,
PROJECT_TYPE,
WORKSPACE_ID,
COUNT,
TAG
FROM
SUPERCHECK_TASKS_COUNT
),
WORKSPACE_COUNTS (
WORKSPACE_ID,
LANGUAGE,
PROJECT_TYPE,
ANNOTATION_COUNT,
REVIEWER_COUNT,
SUPERCHECKER_COUNT
) AS (
SELECT
CTC.WORKSPACE_ID,
COALESCE(CTC.LANGUAGE, 'Others'),
CTC.PROJECT_TYPE,
SUM(
CASE
WHEN TAG = 'ann' THEN CTC.COUNT
ELSE 0
END
) AS ANNOTATION_COUNT,
SUM(
CASE
WHEN TAG = 'rew' THEN CTC.COUNT
ELSE 0
END
) AS REVIEWER_COUNT,
SUM(
CASE
WHEN TAG = 'sup' THEN CTC.COUNT
ELSE 0
END
) AS SUPERCHECKER_COUNT
FROM
CUMULATIVE_TASK_COUNTS AS CTC
JOIN WORKSPACES_WORKSPACE AS WSP ON WSP.ID = CTC.WORKSPACE_ID
GROUP BY
CTC.LANGUAGE,
CTC.PROJECT_TYPE,
CTC.WORKSPACE_ID,
WSP.WORKSPACE_NAME
),
AGGREGATED_DATA (WORKSPACE_ID, PROJECT_TYPE, PROJECT_DATA) AS (
SELECT
WORKSPACE_ID,
PROJECT_TYPE,
JSON_AGG(
JSON_BUILD_OBJECT(
'language',
LANGUAGE,
'ann_cumulative_tasks_count',
ANNOTATION_COUNT,
'rew_cumulative_tasks_count',
REVIEWER_COUNT,
'sup_cumulative_tasks_count',
SUPERCHECKER_COUNT
)
) AS PROJECT_DATA
FROM
WORKSPACE_COUNTS
GROUP BY
PROJECT_TYPE,
WORKSPACE_ID
),
WORKSPACE_TASK_COUNTS (WORKSPACE_ID, ORGANIZATION_ID, RESULT) AS (
SELECT
ADT.WORKSPACE_ID,
WSP.ORGANIZATION_ID,
JSON_OBJECT_AGG(ADT.PROJECT_TYPE, ADT.PROJECT_DATA) AS RESULT
FROM
AGGREGATED_DATA AS ADT
JOIN WORKSPACES_WORKSPACE AS WSP ON WSP.ID = ADT.WORKSPACE_ID
GROUP BY
ADT.WORKSPACE_ID,
WSP.ORGANIZATION_ID
)
SELECT
ORGANIZATION_ID,
JSONB_OBJECT_AGG(WORKSPACE_ID, RESULT) as workspace_task_counts
FROM
WORKSPACE_TASK_COUNTS
GROUP BY
ORGANIZATION_ID
"""
with connection.cursor() as cursor:

cursor.execute(sql=sql_query)
result = cursor.fetchall()
for org_id, workspace_task_counts in result:
workspace_task_counts = json.loads(workspace_task_counts)
upsert_stat(
stat_type="workspace_task_counts",
org_id=org_id,
result=workspace_task_counts,
)
12 changes: 11 additions & 1 deletion backend/users/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from django.core.mail import send_mail
from celery.schedules import crontab
from shoonya_backend.celery import celery_app
from user_reports import calculate_reports, fetch_task_counts
from user_reports import (
calculate_reports,
fetch_task_counts,
fetch_workspace_task_counts,
)
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)
Expand All @@ -18,3 +22,9 @@ def send_mail_task():
def fetchTaskCounts():
fetch_task_counts()
logger.info("Completed Task Count Update")


@shared_task(name="fetchWorkspaceTaskCounts")
def fetchWorkspaceTaskCounts():
fetch_workspace_task_counts()
logger.info("Completed Workspace Task Count Update")
Loading

0 comments on commit 2c096f2

Please sign in to comment.