Files
claudetools/api/services/coord_todo_service.py
Mike Swanson 4be89035cc feat(coord): add todos system with per-user/machine/project scoping
New coord_todos table and API endpoints (GET/POST/PUT/DELETE /api/coord/todos)
supporting manual and auto-created items, sub-tasks via parent_id, and inclusive
for_user/for_machine filters (OR-null) for sync/save display. sync.sh Phase 7
now shows pending todos grouped by project after every sync. CLAUDE.md documents
auto-creation behavior for unresolved follow-up. Web/email pricing doc updated:
block time rate clarified, INKY reference removed, dates updated.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 07:53:22 -07:00

171 lines
5.5 KiB
Python

"""Service layer for CoordTodo."""
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from api.models.coord_todo import CoordTodo
from api.schemas.coord_todo import CoordTodoCreate, CoordTodoUpdate
_VALID_STATUSES = {"pending", "done", "cancelled"}
def get_todos(
db: Session,
project_key: Optional[str] = None,
assigned_to_user: Optional[str] = None,
assigned_to_machine: Optional[str] = None,
for_user: Optional[str] = None,
for_machine: Optional[str] = None,
status_filter: str = "pending",
include_subtasks: bool = True,
skip: int = 0,
limit: int = 100,
) -> list[CoordTodo]:
"""Return a flat list of to-do items with optional filters.
assigned_to_user/machine: exact match (admin queries).
for_user/for_machine: inclusive match — items for this user/machine OR unassigned.
Used by sync/save to surface relevant pending work.
When include_subtasks is False, only top-level items (parent_id IS NULL) are returned.
"""
from sqlalchemy import or_
q = db.query(CoordTodo)
if not include_subtasks:
q = q.filter(CoordTodo.parent_id.is_(None))
if project_key is not None:
q = q.filter(CoordTodo.project_key == project_key)
if assigned_to_user is not None:
q = q.filter(CoordTodo.assigned_to_user == assigned_to_user)
if assigned_to_machine is not None:
q = q.filter(CoordTodo.assigned_to_machine == assigned_to_machine)
if for_user is not None:
q = q.filter(or_(CoordTodo.assigned_to_user == for_user, CoordTodo.assigned_to_user.is_(None)))
if for_machine is not None:
q = q.filter(or_(CoordTodo.assigned_to_machine == for_machine, CoordTodo.assigned_to_machine.is_(None)))
if status_filter != "all":
q = q.filter(CoordTodo.status == status_filter)
return q.order_by(CoordTodo.created_at.asc()).offset(skip).limit(limit).all()
def get_todo_by_id(db: Session, todo_id: UUID) -> CoordTodo:
"""Return a single to-do item with its sub-tasks loaded, or raise 404."""
todo = db.query(CoordTodo).filter(CoordTodo.id == str(todo_id)).first()
if not todo:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Todo {todo_id} not found"
)
todo.subtasks = (
db.query(CoordTodo)
.filter(CoordTodo.parent_id == str(todo_id))
.order_by(CoordTodo.created_at.asc())
.all()
)
return todo
def create_todo(db: Session, todo_in: CoordTodoCreate) -> CoordTodo:
"""Persist a new to-do item."""
if todo_in.parent_id is not None:
parent = db.query(CoordTodo).filter(CoordTodo.id == str(todo_in.parent_id)).first()
if not parent:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Parent todo {todo_in.parent_id} not found"
)
data = todo_in.model_dump()
if data.get("parent_id") is not None:
data["parent_id"] = str(data["parent_id"])
try:
todo = CoordTodo(**data)
db.add(todo)
db.commit()
db.refresh(todo)
todo.subtasks = []
return todo
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create todo: {e}"
)
def update_todo(db: Session, todo_id: UUID, todo_in: CoordTodoUpdate) -> CoordTodo:
"""Apply a partial update to a to-do item.
When status changes to 'done', completed_at is set to the current UTC time.
When status changes away from 'done', completed_at is cleared.
"""
todo = get_todo_by_id(db, todo_id)
patch = todo_in.model_dump(exclude_unset=True)
if "status" in patch and patch["status"] not in _VALID_STATUSES:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid status '{patch['status']}'; must be one of {sorted(_VALID_STATUSES)}"
)
for field, value in patch.items():
setattr(todo, field, value)
if "status" in patch:
if patch["status"] == "done" and todo.completed_at is None:
todo.completed_at = datetime.now(timezone.utc).replace(tzinfo=None)
elif patch["status"] != "done":
todo.completed_at = None
try:
db.commit()
db.refresh(todo)
todo.subtasks = (
db.query(CoordTodo)
.filter(CoordTodo.parent_id == str(todo_id))
.order_by(CoordTodo.created_at.asc())
.all()
)
return todo
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update todo: {e}"
)
def delete_todo(db: Session, todo_id: UUID) -> dict:
"""Delete a to-do item and all its sub-tasks.
Sub-tasks are deleted first to satisfy FK constraints on databases that
don't enforce CASCADE at the application level.
"""
todo = get_todo_by_id(db, todo_id)
try:
db.query(CoordTodo).filter(CoordTodo.parent_id == str(todo_id)).delete(synchronize_session=False)
db.delete(todo)
db.commit()
return {"message": "Todo deleted", "todo_id": str(todo_id)}
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete todo: {e}"
)