""" Task service layer for business logic and database operations. This module handles all database operations for tasks, providing a clean separation between the API routes and data access layer. """ from typing import Optional from uuid import UUID from fastapi import HTTPException, status from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from api.models.task import Task as TaskModel from api.models.session import Session as SessionModel from api.models.client import Client from api.models.project import Project from api.schemas.task import TaskCreate, TaskUpdate def get_tasks(db: Session, skip: int = 0, limit: int = 100) -> tuple[list[TaskModel], int]: """ Retrieve a paginated list of tasks. Args: db: Database session skip: Number of records to skip (for pagination) limit: Maximum number of records to return Returns: tuple: (list of tasks, total count) Example: ```python tasks, total = get_tasks(db, skip=0, limit=50) print(f"Retrieved {len(tasks)} of {total} tasks") ``` """ # Get total count total = db.query(TaskModel).count() # Get paginated results, ordered by task_order ascending tasks = ( db.query(TaskModel) .order_by(TaskModel.task_order.asc()) .offset(skip) .limit(limit) .all() ) return tasks, total def get_task_by_id(db: Session, task_id: UUID) -> TaskModel: """ Retrieve a single task by its ID. Args: db: Database session task_id: UUID of the task to retrieve Returns: TaskModel: The task object Raises: HTTPException: 404 if task not found Example: ```python task = get_task_by_id(db, task_id) print(f"Found task: {task.title}") ``` """ task = db.query(TaskModel).filter(TaskModel.id == str(task_id)).first() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Task with ID {task_id} not found" ) return task def get_tasks_by_session(db: Session, session_id: UUID, skip: int = 0, limit: int = 100) -> tuple[list[TaskModel], int]: """ Retrieve tasks for a specific session. Args: db: Database session session_id: UUID of the session skip: Number of records to skip (for pagination) limit: Maximum number of records to return Returns: tuple: (list of tasks, total count) Example: ```python tasks, total = get_tasks_by_session(db, session_id) print(f"Found {total} tasks for session") ``` """ # Get total count total = db.query(TaskModel).filter(TaskModel.session_id == str(session_id)).count() # Get paginated results tasks = ( db.query(TaskModel) .filter(TaskModel.session_id == str(session_id)) .order_by(TaskModel.task_order.asc()) .offset(skip) .limit(limit) .all() ) return tasks, total def get_tasks_by_status(db: Session, status_filter: str, skip: int = 0, limit: int = 100) -> tuple[list[TaskModel], int]: """ Retrieve tasks by status. Args: db: Database session status_filter: Status to filter by (pending, in_progress, blocked, completed, cancelled) skip: Number of records to skip (for pagination) limit: Maximum number of records to return Returns: tuple: (list of tasks, total count) Example: ```python tasks, total = get_tasks_by_status(db, "in_progress") print(f"Found {total} in-progress tasks") ``` """ # Get total count total = db.query(TaskModel).filter(TaskModel.status == status_filter).count() # Get paginated results tasks = ( db.query(TaskModel) .filter(TaskModel.status == status_filter) .order_by(TaskModel.task_order.asc()) .offset(skip) .limit(limit) .all() ) return tasks, total def create_task(db: Session, task_data: TaskCreate) -> TaskModel: """ Create a new task. Args: db: Database session task_data: Task creation data Returns: TaskModel: The created task object Raises: HTTPException: 404 if referenced session, client, or project not found HTTPException: 422 if validation fails HTTPException: 500 if database error occurs Example: ```python task_data = TaskCreate( title="Implement authentication", task_order=1, status="pending", session_id="123e4567-e89b-12d3-a456-426614174000" ) task = create_task(db, task_data) print(f"Created task: {task.id}") ``` """ try: # Validate foreign keys if provided if task_data.session_id: session = db.query(SessionModel).filter(SessionModel.id == str(task_data.session_id)).first() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Session with ID {task_data.session_id} not found" ) if task_data.client_id: client = db.query(Client).filter(Client.id == str(task_data.client_id)).first() if not client: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Client with ID {task_data.client_id} not found" ) if task_data.project_id: project = db.query(Project).filter(Project.id == str(task_data.project_id)).first() if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with ID {task_data.project_id} not found" ) if task_data.parent_task_id: parent_task = db.query(TaskModel).filter(TaskModel.id == str(task_data.parent_task_id)).first() if not parent_task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Parent task with ID {task_data.parent_task_id} not found" ) # Create new task instance db_task = TaskModel(**task_data.model_dump()) # Add to database db.add(db_task) db.commit() db.refresh(db_task) return db_task except HTTPException: db.rollback() raise except IntegrityError as e: db.rollback() # Handle foreign key constraint violations if "session_id" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid session_id: {task_data.session_id}" ) elif "client_id" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid client_id: {task_data.client_id}" ) elif "project_id" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid project_id: {task_data.project_id}" ) elif "parent_task_id" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid parent_task_id: {task_data.parent_task_id}" ) elif "ck_tasks_type" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid task_type. Must be one of: implementation, research, review, deployment, testing, documentation, bugfix, analysis" ) elif "ck_tasks_status" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid status. Must be one of: pending, in_progress, blocked, completed, cancelled" ) elif "ck_tasks_complexity" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid estimated_complexity. Must be one of: trivial, simple, moderate, complex, very_complex" ) else: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Database error: {str(e)}" ) except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create task: {str(e)}" ) def update_task(db: Session, task_id: UUID, task_data: TaskUpdate) -> TaskModel: """ Update an existing task. Args: db: Database session task_id: UUID of the task to update task_data: Task update data (only provided fields will be updated) Returns: TaskModel: The updated task object Raises: HTTPException: 404 if task, session, client, or project not found HTTPException: 422 if validation fails HTTPException: 500 if database error occurs Example: ```python update_data = TaskUpdate( status="completed", completed_at=datetime.now() ) task = update_task(db, task_id, update_data) print(f"Updated task: {task.title}") ``` """ # Get existing task task = get_task_by_id(db, task_id) try: # Update only provided fields update_data = task_data.model_dump(exclude_unset=True) # Validate foreign keys if being updated if "session_id" in update_data and update_data["session_id"]: session = db.query(SessionModel).filter(SessionModel.id == str(update_data["session_id"])).first() if not session: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Session with ID {update_data['session_id']} not found" ) if "client_id" in update_data and update_data["client_id"]: client = db.query(Client).filter(Client.id == str(update_data["client_id"])).first() if not client: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Client with ID {update_data['client_id']} not found" ) if "project_id" in update_data and update_data["project_id"]: project = db.query(Project).filter(Project.id == str(update_data["project_id"])).first() if not project: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Project with ID {update_data['project_id']} not found" ) if "parent_task_id" in update_data and update_data["parent_task_id"]: parent_task = db.query(TaskModel).filter(TaskModel.id == str(update_data["parent_task_id"])).first() if not parent_task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Parent task with ID {update_data['parent_task_id']} not found" ) # Apply updates for field, value in update_data.items(): setattr(task, field, value) db.commit() db.refresh(task) return task except HTTPException: db.rollback() raise except IntegrityError as e: db.rollback() if "session_id" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid session_id" ) elif "client_id" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid client_id" ) elif "project_id" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid project_id" ) elif "parent_task_id" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Invalid parent_task_id" ) elif "ck_tasks_type" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid task_type. Must be one of: implementation, research, review, deployment, testing, documentation, bugfix, analysis" ) elif "ck_tasks_status" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid status. Must be one of: pending, in_progress, blocked, completed, cancelled" ) elif "ck_tasks_complexity" in str(e.orig): raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid estimated_complexity. Must be one of: trivial, simple, moderate, complex, very_complex" ) else: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Database error: {str(e)}" ) except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update task: {str(e)}" ) def delete_task(db: Session, task_id: UUID) -> dict: """ Delete a task by its ID. Args: db: Database session task_id: UUID of the task to delete Returns: dict: Success message Raises: HTTPException: 404 if task not found HTTPException: 500 if database error occurs Example: ```python result = delete_task(db, task_id) print(result["message"]) # "Task deleted successfully" ``` """ # Get existing task (raises 404 if not found) task = get_task_by_id(db, task_id) try: db.delete(task) db.commit() return { "message": "Task deleted successfully", "task_id": str(task_id) } except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete task: {str(e)}" )