Files
claudetools/api/services/task_service.py
Mike Swanson 390b10b32c Complete Phase 6: MSP Work Tracking with Context Recall System
Implements production-ready MSP platform with cross-machine persistent memory for Claude.

API Implementation:
- 130 REST API endpoints across 21 entities
- JWT authentication on all endpoints
- AES-256-GCM encryption for credentials
- Automatic audit logging
- Complete OpenAPI documentation

Database:
- 43 tables in MariaDB (172.16.3.20:3306)
- 42 SQLAlchemy models with modern 2.0 syntax
- Full Alembic migration system
- 99.1% CRUD test pass rate

Context Recall System (Phase 6):
- Cross-machine persistent memory via database
- Automatic context injection via Claude Code hooks
- Automatic context saving after task completion
- 90-95% token reduction with compression utilities
- Relevance scoring with time decay
- Tag-based semantic search
- One-command setup script

Security Features:
- JWT tokens with Argon2 password hashing
- AES-256-GCM encryption for all sensitive data
- Comprehensive audit trail for credentials
- HMAC tamper detection
- Secure configuration management

Test Results:
- Phase 3: 38/38 CRUD tests passing (100%)
- Phase 4: 34/35 core API tests passing (97.1%)
- Phase 5: 62/62 extended API tests passing (100%)
- Phase 6: 10/10 compression tests passing (100%)
- Overall: 144/145 tests passing (99.3%)

Documentation:
- Comprehensive architecture guides
- Setup automation scripts
- API documentation at /api/docs
- Complete test reports
- Troubleshooting guides

Project Status: 95% Complete (Production-Ready)
Phase 7 (optional work context APIs) remains for future enhancement.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-17 06:00:26 -07:00

450 lines
14 KiB
Python

"""
Task service layer for business logic and database operations.
This module handles all database operations for tasks, providing a clean
separation between the API routes and data access layer.
"""
from typing import Optional
from uuid import UUID
from fastapi import HTTPException, status
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from api.models.task import Task as TaskModel
from api.models.session import Session as SessionModel
from api.models.client import Client
from api.models.project import Project
from api.schemas.task import TaskCreate, TaskUpdate
def get_tasks(db: Session, skip: int = 0, limit: int = 100) -> tuple[list[TaskModel], int]:
"""
Retrieve a paginated list of tasks.
Args:
db: Database session
skip: Number of records to skip (for pagination)
limit: Maximum number of records to return
Returns:
tuple: (list of tasks, total count)
Example:
```python
tasks, total = get_tasks(db, skip=0, limit=50)
print(f"Retrieved {len(tasks)} of {total} tasks")
```
"""
# Get total count
total = db.query(TaskModel).count()
# Get paginated results, ordered by task_order ascending
tasks = (
db.query(TaskModel)
.order_by(TaskModel.task_order.asc())
.offset(skip)
.limit(limit)
.all()
)
return tasks, total
def get_task_by_id(db: Session, task_id: UUID) -> TaskModel:
"""
Retrieve a single task by its ID.
Args:
db: Database session
task_id: UUID of the task to retrieve
Returns:
TaskModel: The task object
Raises:
HTTPException: 404 if task not found
Example:
```python
task = get_task_by_id(db, task_id)
print(f"Found task: {task.title}")
```
"""
task = db.query(TaskModel).filter(TaskModel.id == str(task_id)).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task with ID {task_id} not found"
)
return task
def get_tasks_by_session(db: Session, session_id: UUID, skip: int = 0, limit: int = 100) -> tuple[list[TaskModel], int]:
"""
Retrieve tasks for a specific session.
Args:
db: Database session
session_id: UUID of the session
skip: Number of records to skip (for pagination)
limit: Maximum number of records to return
Returns:
tuple: (list of tasks, total count)
Example:
```python
tasks, total = get_tasks_by_session(db, session_id)
print(f"Found {total} tasks for session")
```
"""
# Get total count
total = db.query(TaskModel).filter(TaskModel.session_id == str(session_id)).count()
# Get paginated results
tasks = (
db.query(TaskModel)
.filter(TaskModel.session_id == str(session_id))
.order_by(TaskModel.task_order.asc())
.offset(skip)
.limit(limit)
.all()
)
return tasks, total
def get_tasks_by_status(db: Session, status_filter: str, skip: int = 0, limit: int = 100) -> tuple[list[TaskModel], int]:
"""
Retrieve tasks by status.
Args:
db: Database session
status_filter: Status to filter by (pending, in_progress, blocked, completed, cancelled)
skip: Number of records to skip (for pagination)
limit: Maximum number of records to return
Returns:
tuple: (list of tasks, total count)
Example:
```python
tasks, total = get_tasks_by_status(db, "in_progress")
print(f"Found {total} in-progress tasks")
```
"""
# Get total count
total = db.query(TaskModel).filter(TaskModel.status == status_filter).count()
# Get paginated results
tasks = (
db.query(TaskModel)
.filter(TaskModel.status == status_filter)
.order_by(TaskModel.task_order.asc())
.offset(skip)
.limit(limit)
.all()
)
return tasks, total
def create_task(db: Session, task_data: TaskCreate) -> TaskModel:
"""
Create a new task.
Args:
db: Database session
task_data: Task creation data
Returns:
TaskModel: The created task object
Raises:
HTTPException: 404 if referenced session, client, or project not found
HTTPException: 422 if validation fails
HTTPException: 500 if database error occurs
Example:
```python
task_data = TaskCreate(
title="Implement authentication",
task_order=1,
status="pending",
session_id="123e4567-e89b-12d3-a456-426614174000"
)
task = create_task(db, task_data)
print(f"Created task: {task.id}")
```
"""
try:
# Validate foreign keys if provided
if task_data.session_id:
session = db.query(SessionModel).filter(SessionModel.id == str(task_data.session_id)).first()
if not session:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session with ID {task_data.session_id} not found"
)
if task_data.client_id:
client = db.query(Client).filter(Client.id == str(task_data.client_id)).first()
if not client:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Client with ID {task_data.client_id} not found"
)
if task_data.project_id:
project = db.query(Project).filter(Project.id == str(task_data.project_id)).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Project with ID {task_data.project_id} not found"
)
if task_data.parent_task_id:
parent_task = db.query(TaskModel).filter(TaskModel.id == str(task_data.parent_task_id)).first()
if not parent_task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Parent task with ID {task_data.parent_task_id} not found"
)
# Create new task instance
db_task = TaskModel(**task_data.model_dump())
# Add to database
db.add(db_task)
db.commit()
db.refresh(db_task)
return db_task
except HTTPException:
db.rollback()
raise
except IntegrityError as e:
db.rollback()
# Handle foreign key constraint violations
if "session_id" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid session_id: {task_data.session_id}"
)
elif "client_id" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid client_id: {task_data.client_id}"
)
elif "project_id" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid project_id: {task_data.project_id}"
)
elif "parent_task_id" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid parent_task_id: {task_data.parent_task_id}"
)
elif "ck_tasks_type" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid task_type. Must be one of: implementation, research, review, deployment, testing, documentation, bugfix, analysis"
)
elif "ck_tasks_status" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid status. Must be one of: pending, in_progress, blocked, completed, cancelled"
)
elif "ck_tasks_complexity" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid estimated_complexity. Must be one of: trivial, simple, moderate, complex, very_complex"
)
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create task: {str(e)}"
)
def update_task(db: Session, task_id: UUID, task_data: TaskUpdate) -> TaskModel:
"""
Update an existing task.
Args:
db: Database session
task_id: UUID of the task to update
task_data: Task update data (only provided fields will be updated)
Returns:
TaskModel: The updated task object
Raises:
HTTPException: 404 if task, session, client, or project not found
HTTPException: 422 if validation fails
HTTPException: 500 if database error occurs
Example:
```python
update_data = TaskUpdate(
status="completed",
completed_at=datetime.now()
)
task = update_task(db, task_id, update_data)
print(f"Updated task: {task.title}")
```
"""
# Get existing task
task = get_task_by_id(db, task_id)
try:
# Update only provided fields
update_data = task_data.model_dump(exclude_unset=True)
# Validate foreign keys if being updated
if "session_id" in update_data and update_data["session_id"]:
session = db.query(SessionModel).filter(SessionModel.id == str(update_data["session_id"])).first()
if not session:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session with ID {update_data['session_id']} not found"
)
if "client_id" in update_data and update_data["client_id"]:
client = db.query(Client).filter(Client.id == str(update_data["client_id"])).first()
if not client:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Client with ID {update_data['client_id']} not found"
)
if "project_id" in update_data and update_data["project_id"]:
project = db.query(Project).filter(Project.id == str(update_data["project_id"])).first()
if not project:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Project with ID {update_data['project_id']} not found"
)
if "parent_task_id" in update_data and update_data["parent_task_id"]:
parent_task = db.query(TaskModel).filter(TaskModel.id == str(update_data["parent_task_id"])).first()
if not parent_task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Parent task with ID {update_data['parent_task_id']} not found"
)
# Apply updates
for field, value in update_data.items():
setattr(task, field, value)
db.commit()
db.refresh(task)
return task
except HTTPException:
db.rollback()
raise
except IntegrityError as e:
db.rollback()
if "session_id" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Invalid session_id"
)
elif "client_id" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Invalid client_id"
)
elif "project_id" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Invalid project_id"
)
elif "parent_task_id" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail="Invalid parent_task_id"
)
elif "ck_tasks_type" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid task_type. Must be one of: implementation, research, review, deployment, testing, documentation, bugfix, analysis"
)
elif "ck_tasks_status" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid status. Must be one of: pending, in_progress, blocked, completed, cancelled"
)
elif "ck_tasks_complexity" in str(e.orig):
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid estimated_complexity. Must be one of: trivial, simple, moderate, complex, very_complex"
)
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update task: {str(e)}"
)
def delete_task(db: Session, task_id: UUID) -> dict:
"""
Delete a task by its ID.
Args:
db: Database session
task_id: UUID of the task to delete
Returns:
dict: Success message
Raises:
HTTPException: 404 if task not found
HTTPException: 500 if database error occurs
Example:
```python
result = delete_task(db, task_id)
print(result["message"]) # "Task deleted successfully"
```
"""
# Get existing task (raises 404 if not found)
task = get_task_by_id(db, task_id)
try:
db.delete(task)
db.commit()
return {
"message": "Task deleted successfully",
"task_id": str(task_id)
}
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete task: {str(e)}"
)