Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: user and task_status filter + sort by date #2

Open
wants to merge 1 commit into
base: development
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions src/backend/app/tasks/tasks_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
#

import json
from typing import List
from typing import List, Optional

from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile, Query
from sqlalchemy.orm import Session
from sqlalchemy.sql import text

Expand All @@ -29,6 +29,7 @@
from ..projects import project_crud, project_schemas
from ..users import user_schemas
from . import tasks_crud, tasks_schemas
from ..db import db_models

router = APIRouter(
prefix="/tasks",
Expand All @@ -37,15 +38,46 @@
responses={404: {"description": "Not found"}},
)

from datetime import datetime

@router.get("/task-list", response_model=List[tasks_schemas.TaskOut])
async def read_task_list(
project_id: int,
limit: int = 1000,
db: Session = Depends(database.get_db),
user: Optional[str] = None,
status: Optional[str] = Query(None, description="Select Task Status", enum =[status.name for status in TaskStatus])
):
tasks = tasks_crud.get_tasks(db, project_id, limit)
if tasks:
user_ids = None
if user:
user_ids = [
user.id
for user in db.query(db_models.DbUser.id).filter(db_models.DbUser.username.ilike(f"%{user}%")).all()
]
if user_ids:
tasks = [
task
for task in tasks
if any(
user_id_for_task in user_ids
for user_id_for_task in [
task.lock_holder,
task.locked_by,
task.mapped_by,
task.mapper,
task.validated_by,
]
)
]
else:
raise HTTPException(status_code = 404, detail="User not found")
if status:
tasks = [task for task in tasks if task.task_status == TaskStatus[status.upper()]]

# Sorting tasks based on date
tasks = sorted(tasks, key=lambda x: max((entry.action_date for entry in x.task_history), default=datetime.min), reverse=True)
return tasks
else:
raise HTTPException(status_code=404, detail="Tasks not found")
Expand Down