Files
claudetools/api/services/coord_message_service.py
Mike Swanson 63975284f4 feat: agent coordination system (workflows, locks, components, messages)
Adds /api/coord/* endpoints for real-time cross-session coordination:
- coord_workflows: named units of work per project
- coord_work_items: tasks within workflows with dependency chains
- coord_session_locks: exclusive resource locks with auto-expiry (TTL)
- coord_component_states: live component state per project (upsert)
- coord_messages: cross-session messaging and broadcasts
- /api/coord/status: cross-project snapshot endpoint

Replaces PROJECT_STATE.md as the coordination layer for Claude sessions.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-12 08:25:33 -07:00

97 lines
2.9 KiB
Python

"""Service layer for CoordMessage."""
from datetime import datetime, timezone
from typing import Optional
from uuid import UUID
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from api.models.coord_message import CoordMessage
from api.schemas.coord_message import CoordMessageCreate
def get_messages(
db: Session,
to_session: Optional[str] = None,
unread_only: bool = False,
skip: int = 0,
limit: int = 100,
) -> tuple[list[CoordMessage], int]:
"""Return paginated messages with optional filters."""
q = db.query(CoordMessage)
if to_session:
q = q.filter(CoordMessage.to_session == to_session)
if unread_only:
q = q.filter(CoordMessage.read_at.is_(None))
total = q.count()
messages = q.order_by(CoordMessage.created_at.desc()).offset(skip).limit(limit).all()
return messages, total
def get_unread_count(db: Session, session_id: str) -> int:
"""Return the number of unread messages addressed to a session."""
return (
db.query(CoordMessage)
.filter(CoordMessage.to_session == session_id, CoordMessage.read_at.is_(None))
.count()
)
def get_message_by_id(db: Session, message_id: UUID) -> CoordMessage:
"""Return a single message or raise 404."""
msg = db.query(CoordMessage).filter(CoordMessage.id == str(message_id)).first()
if not msg:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Message {message_id} not found"
)
return msg
def send_message(db: Session, data: CoordMessageCreate) -> CoordMessage:
"""Persist a new message."""
try:
msg = CoordMessage(**data.model_dump())
db.add(msg)
db.commit()
db.refresh(msg)
return msg
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to send message: {e}"
)
def mark_read(db: Session, message_id: UUID) -> CoordMessage:
"""Mark a message as read at the current time."""
msg = get_message_by_id(db, message_id)
try:
msg.read_at = datetime.now(timezone.utc).replace(tzinfo=None)
db.commit()
db.refresh(msg)
return msg
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to mark message read: {e}"
)
def delete_message(db: Session, message_id: UUID) -> dict:
"""Delete a message by ID."""
msg = get_message_by_id(db, message_id)
try:
db.delete(msg)
db.commit()
return {"message": "Message deleted", "message_id": str(message_id)}
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete message: {e}"
)