From 63975284f447122f678a13c441a23df4527e13f0 Mon Sep 17 00:00:00 2001 From: Mike Swanson Date: Tue, 12 May 2026 08:25:33 -0700 Subject: [PATCH] feat: agent coordination system (workflows, locks, components, messages) Adds /api/coord/* endpoints for real-time cross-session coordination: - coord_workflows: named units of work per project - coord_work_items: tasks within workflows with dependency chains - coord_session_locks: exclusive resource locks with auto-expiry (TTL) - coord_component_states: live component state per project (upsert) - coord_messages: cross-session messaging and broadcasts - /api/coord/status: cross-project snapshot endpoint Replaces PROJECT_STATE.md as the coordination layer for Claude sessions. Co-Authored-By: Claude Sonnet 4.6 --- api/main.py | 18 +++ api/models/__init__.py | 10 ++ api/models/coord_component_state.py | 57 ++++++++ api/models/coord_message.py | 56 ++++++++ api/models/coord_session_lock.py | 64 +++++++++ api/models/coord_work_item.py | 87 ++++++++++++ api/models/coord_workflow.py | 61 +++++++++ api/routers/coord_components.py | 38 ++++++ api/routers/coord_locks.py | 81 +++++++++++ api/routers/coord_messages.py | 77 +++++++++++ api/routers/coord_status.py | 46 +++++++ api/routers/coord_work_items.py | 86 ++++++++++++ api/routers/coord_workflows.py | 83 ++++++++++++ api/schemas/coord_component_state.py | 30 ++++ api/schemas/coord_message.py | 39 ++++++ api/schemas/coord_session_lock.py | 41 ++++++ api/schemas/coord_work_item.py | 53 ++++++++ api/schemas/coord_workflow.py | 42 ++++++ api/services/coord_component_service.py | 63 +++++++++ api/services/coord_lock_service.py | 128 ++++++++++++++++++ api/services/coord_message_service.py | 96 +++++++++++++ api/services/coord_work_item_service.py | 100 ++++++++++++++ api/services/coord_workflow_service.py | 87 ++++++++++++ ...0260512_120000_coord_agent_coordination.py | 122 +++++++++++++++++ 24 files changed, 1565 insertions(+) create mode 100644 api/models/coord_component_state.py create mode 100644 api/models/coord_message.py create mode 100644 api/models/coord_session_lock.py create mode 100644 api/models/coord_work_item.py create mode 100644 api/models/coord_workflow.py create mode 100644 api/routers/coord_components.py create mode 100644 api/routers/coord_locks.py create mode 100644 api/routers/coord_messages.py create mode 100644 api/routers/coord_status.py create mode 100644 api/routers/coord_work_items.py create mode 100644 api/routers/coord_workflows.py create mode 100644 api/schemas/coord_component_state.py create mode 100644 api/schemas/coord_message.py create mode 100644 api/schemas/coord_session_lock.py create mode 100644 api/schemas/coord_work_item.py create mode 100644 api/schemas/coord_workflow.py create mode 100644 api/services/coord_component_service.py create mode 100644 api/services/coord_lock_service.py create mode 100644 api/services/coord_message_service.py create mode 100644 api/services/coord_work_item_service.py create mode 100644 api/services/coord_workflow_service.py create mode 100644 migrations/versions/20260512_120000_coord_agent_coordination.py diff --git a/api/main.py b/api/main.py index cd194e2..1794dca 100644 --- a/api/main.py +++ b/api/main.py @@ -39,6 +39,16 @@ from api.routers import ( gravityzone, ) +# Import coordination routers +from api.routers import ( + coord_workflows, + coord_work_items, + coord_locks, + coord_components, + coord_messages, + coord_status, +) + # Import middleware from api.middleware.error_handler import register_exception_handlers @@ -136,6 +146,14 @@ app.include_router(admin_quotes.router, prefix="/api/admin/quotes", tags=["Admin app.include_router(ticktick.router, prefix="/api/ticktick", tags=["TickTick"]) app.include_router(gravityzone.router, prefix="/api/gravityzone", tags=["GravityZone"]) +# Agent coordination +app.include_router(coord_workflows.router, prefix="/api/coord/workflows", tags=["Coordination"]) +app.include_router(coord_work_items.router, prefix="/api/coord/work-items", tags=["Coordination"]) +app.include_router(coord_locks.router, prefix="/api/coord/locks", tags=["Coordination"]) +app.include_router(coord_components.router, prefix="/api/coord/components", tags=["Coordination"]) +app.include_router(coord_messages.router, prefix="/api/coord/messages", tags=["Coordination"]) +app.include_router(coord_status.router, prefix="/api/coord/status", tags=["Coordination"]) + if __name__ == "__main__": import uvicorn diff --git a/api/models/__init__.py b/api/models/__init__.py index 7c508dd..ce390c7 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -5,6 +5,11 @@ This package contains all database models and their base classes. """ from api.models.api_audit_log import ApiAuditLog +from api.models.coord_workflow import CoordWorkflow +from api.models.coord_work_item import CoordWorkItem +from api.models.coord_session_lock import CoordSessionLock +from api.models.coord_component_state import CoordComponentState +from api.models.coord_message import CoordMessage from api.models.backup_log import BackupLog from api.models.base import Base, TimestampMixin, UUIDMixin from api.models.billable_time import BillableTime @@ -47,6 +52,11 @@ from api.models.work_item_tag import WorkItemTag __all__ = [ "ApiAuditLog", + "CoordWorkflow", + "CoordWorkItem", + "CoordSessionLock", + "CoordComponentState", + "CoordMessage", "BackupLog", "Base", "BillableTime", diff --git a/api/models/coord_component_state.py b/api/models/coord_component_state.py new file mode 100644 index 0000000..fa78d78 --- /dev/null +++ b/api/models/coord_component_state.py @@ -0,0 +1,57 @@ +"""Coordination component state model.""" + +from typing import Optional + +from sqlalchemy import PrimaryKeyConstraint, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from .base import Base, TimestampMixin + + +class CoordComponentState(Base, TimestampMixin): + """Current state of a named component within a project.""" + + __tablename__ = "coord_component_states" + + project_key: Mapped[str] = mapped_column( + String(200), + nullable=False, + primary_key=True, + doc="Project namespace" + ) + + component: Mapped[str] = mapped_column( + String(200), + nullable=False, + primary_key=True, + doc="Component name, e.g. 'server', 'agent', 'dashboard', 'database'" + ) + + state: Mapped[str] = mapped_column( + String(50), + nullable=False, + doc="State: deployed, building, stable, broken, unknown" + ) + + version: Mapped[Optional[str]] = mapped_column( + String(100), + doc="Version string or git SHA" + ) + + notes: Mapped[Optional[str]] = mapped_column( + Text, + doc="Freeform notes about current state" + ) + + updated_by: Mapped[str] = mapped_column( + String(200), + nullable=False, + doc="Session that last updated this record" + ) + + __table_args__ = ( + PrimaryKeyConstraint("project_key", "component", name="pk_coord_component_states"), + ) + + def __repr__(self) -> str: + return f"" diff --git a/api/models/coord_message.py b/api/models/coord_message.py new file mode 100644 index 0000000..63d8ea3 --- /dev/null +++ b/api/models/coord_message.py @@ -0,0 +1,56 @@ +"""Coordination inter-session message model.""" + +from datetime import datetime +from typing import Optional + +from sqlalchemy import DateTime, Index, String, Text +from sqlalchemy.orm import Mapped, mapped_column + +from .base import Base, TimestampMixin, UUIDMixin + + +class CoordMessage(Base, UUIDMixin, TimestampMixin): + """A message sent from one session to another (or broadcast).""" + + __tablename__ = "coord_messages" + + from_session: Mapped[str] = mapped_column( + String(200), + nullable=False, + doc="Sending session, e.g. 'DESKTOP-0O8A1RL/Claude'" + ) + + to_session: Mapped[Optional[str]] = mapped_column( + String(200), + doc="Recipient session; NULL means broadcast to all" + ) + + project_key: Mapped[Optional[str]] = mapped_column( + String(200), + doc="Optional project context for the message" + ) + + subject: Mapped[str] = mapped_column( + String(500), + nullable=False, + doc="Message subject line" + ) + + body: Mapped[str] = mapped_column( + Text, + nullable=False, + doc="Message body, markdown ok" + ) + + read_at: Mapped[Optional[datetime]] = mapped_column( + DateTime, + doc="NULL means unread" + ) + + __table_args__ = ( + Index("idx_coord_messages_to_read", "to_session", "read_at"), + Index("idx_coord_messages_from", "from_session"), + ) + + def __repr__(self) -> str: + return f"" diff --git a/api/models/coord_session_lock.py b/api/models/coord_session_lock.py new file mode 100644 index 0000000..e155cbf --- /dev/null +++ b/api/models/coord_session_lock.py @@ -0,0 +1,64 @@ +"""Coordination session lock model.""" + +from datetime import datetime +from typing import Optional + +from sqlalchemy import DateTime, Index, String, Text +from sqlalchemy.orm import Mapped, mapped_column +from sqlalchemy.sql import func + +from .base import Base, TimestampMixin, UUIDMixin + + +class CoordSessionLock(Base, UUIDMixin, TimestampMixin): + """An exclusive lock held by a session on a resource path.""" + + __tablename__ = "coord_session_locks" + + project_key: Mapped[str] = mapped_column( + String(200), + nullable=False, + doc="Project namespace this lock applies to" + ) + + session_id: Mapped[str] = mapped_column( + String(200), + nullable=False, + doc="Session holding the lock, e.g. 'DESKTOP-0O8A1RL/Claude'" + ) + + resource: Mapped[str] = mapped_column( + String(500), + nullable=False, + doc="Resource path being locked, e.g. 'server/src/', 'migrations/'" + ) + + description: Mapped[Optional[str]] = mapped_column( + Text, + doc="Why this lock was acquired" + ) + + acquired_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + server_default=func.now(), + doc="When the lock was claimed" + ) + + expires_at: Mapped[Optional[datetime]] = mapped_column( + DateTime, + doc="NULL means no expiry; otherwise the lock expires at this time" + ) + + released_at: Mapped[Optional[datetime]] = mapped_column( + DateTime, + doc="NULL means still held; set when lock is explicitly released" + ) + + __table_args__ = ( + Index("idx_coord_locks_project_resource", "project_key", "resource"), + Index("idx_coord_locks_session", "session_id"), + ) + + def __repr__(self) -> str: + return f"" diff --git a/api/models/coord_work_item.py b/api/models/coord_work_item.py new file mode 100644 index 0000000..5dcd77b --- /dev/null +++ b/api/models/coord_work_item.py @@ -0,0 +1,87 @@ +"""Coordination work item model.""" + +from datetime import datetime +from typing import Optional + +from sqlalchemy import CHAR, CheckConstraint, ForeignKey, Index, Integer, String, Text, DateTime +from sqlalchemy.orm import Mapped, mapped_column + +from .base import Base, TimestampMixin, UUIDMixin + + +class CoordWorkItem(Base, UUIDMixin, TimestampMixin): + """A discrete task within a coordination workflow.""" + + __tablename__ = "coord_work_items" + + workflow_id: Mapped[str] = mapped_column( + CHAR(36), + ForeignKey("coord_workflows.id", ondelete="CASCADE"), + nullable=False, + doc="Parent workflow" + ) + + project_key: Mapped[str] = mapped_column( + String(200), + nullable=False, + doc="Denormalized project key for filtering without join" + ) + + title: Mapped[str] = mapped_column( + String(500), + nullable=False, + doc="Short title" + ) + + description: Mapped[Optional[str]] = mapped_column( + Text, + doc="Full description, markdown ok — store design specs, schemas, etc." + ) + + status: Mapped[str] = mapped_column( + String(20), + nullable=False, + default="pending", + doc="Status: pending, in_progress, blocked, completed, cancelled" + ) + + priority: Mapped[int] = mapped_column( + Integer, + nullable=False, + default=0, + doc="Higher value = more urgent" + ) + + assigned_session: Mapped[Optional[str]] = mapped_column( + String(200), + doc="Session currently working this item" + ) + + depends_on_id: Mapped[Optional[str]] = mapped_column( + CHAR(36), + ForeignKey("coord_work_items.id", ondelete="SET NULL"), + doc="Blocking predecessor item" + ) + + started_at: Mapped[Optional[datetime]] = mapped_column( + DateTime, + doc="When work began" + ) + + completed_at: Mapped[Optional[datetime]] = mapped_column( + DateTime, + doc="When item reached a terminal state" + ) + + __table_args__ = ( + CheckConstraint( + "status IN ('pending', 'in_progress', 'blocked', 'completed', 'cancelled')", + name="ck_coord_work_items_status" + ), + Index("idx_coord_work_items_workflow", "workflow_id"), + Index("idx_coord_work_items_project_status", "project_key", "status"), + Index("idx_coord_work_items_assigned", "assigned_session"), + ) + + def __repr__(self) -> str: + return f"" diff --git a/api/models/coord_workflow.py b/api/models/coord_workflow.py new file mode 100644 index 0000000..999624c --- /dev/null +++ b/api/models/coord_workflow.py @@ -0,0 +1,61 @@ +"""Coordination workflow model.""" + +from datetime import datetime +from typing import Optional + +from sqlalchemy import CHAR, CheckConstraint, Index, String, Text, DateTime +from sqlalchemy.orm import Mapped, mapped_column + +from .base import Base, TimestampMixin, UUIDMixin + + +class CoordWorkflow(Base, UUIDMixin, TimestampMixin): + """A named unit of work spanning one or more sessions.""" + + __tablename__ = "coord_workflows" + + project_key: Mapped[str] = mapped_column( + String(200), + nullable=False, + doc="Project namespace slug, e.g. 'gururmm', 'client/acme-corp'" + ) + + name: Mapped[str] = mapped_column( + String(200), + nullable=False, + doc="Short identifier, e.g. 'discovery-feature'" + ) + + description: Mapped[Optional[str]] = mapped_column( + Text, + doc="Freeform description, markdown ok" + ) + + status: Mapped[str] = mapped_column( + String(20), + nullable=False, + default="planning", + doc="Status: planning, active, blocked, completed, cancelled" + ) + + created_by: Mapped[str] = mapped_column( + String(200), + nullable=False, + doc="Session that created this workflow, e.g. 'DESKTOP-0O8A1RL/Claude'" + ) + + completed_at: Mapped[Optional[datetime]] = mapped_column( + DateTime, + doc="When the workflow reached a terminal state" + ) + + __table_args__ = ( + CheckConstraint( + "status IN ('planning', 'active', 'blocked', 'completed', 'cancelled')", + name="ck_coord_workflows_status" + ), + Index("idx_coord_workflows_project_status", "project_key", "status"), + ) + + def __repr__(self) -> str: + return f"" diff --git a/api/routers/coord_components.py b/api/routers/coord_components.py new file mode 100644 index 0000000..ba52225 --- /dev/null +++ b/api/routers/coord_components.py @@ -0,0 +1,38 @@ +"""Coordination component states router.""" + +from fastapi import APIRouter, Depends, Query, status +from sqlalchemy.orm import Session + +from api.database import get_db +from api.middleware.auth import get_current_user +from api.schemas.coord_component_state import CoordComponentStateUpsert, CoordComponentStateResponse +from api.services import coord_component_service + +router = APIRouter() + + +@router.get("", response_model=dict, status_code=status.HTTP_200_OK) +def list_component_states( + project_key: str | None = Query(default=None), + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """List all component states, optionally filtered by project.""" + states = coord_component_service.get_component_states(db, project_key=project_key) + return { + "total": len(states), + "states": [CoordComponentStateResponse.model_validate(s) for s in states], + } + + +@router.put("/{project_key}/{component}", response_model=CoordComponentStateResponse, status_code=status.HTTP_200_OK) +def upsert_component_state( + project_key: str, + component: str, + data: CoordComponentStateUpsert, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Create or update the state of a component within a project.""" + state = coord_component_service.upsert_component_state(db, project_key, component, data) + return CoordComponentStateResponse.model_validate(state) diff --git a/api/routers/coord_locks.py b/api/routers/coord_locks.py new file mode 100644 index 0000000..52c4c7c --- /dev/null +++ b/api/routers/coord_locks.py @@ -0,0 +1,81 @@ +"""Coordination session locks router.""" + +from uuid import UUID + +from fastapi import APIRouter, Depends, Query, status +from sqlalchemy.orm import Session + +from api.database import get_db +from api.middleware.auth import get_current_user +from api.schemas.coord_session_lock import CoordSessionLockCreate, CoordSessionLockResponse +from api.services import coord_lock_service + +router = APIRouter() + + +@router.get("", response_model=dict, status_code=status.HTTP_200_OK) +def list_active_locks( + project_key: str | None = Query(default=None), + session_id: str | None = Query(default=None), + skip: int = Query(default=0, ge=0), + limit: int = Query(default=100, ge=1, le=1000), + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """List currently active locks with optional filters.""" + locks, total = coord_lock_service.get_active_locks( + db, project_key=project_key, session_id=session_id, skip=skip, limit=limit + ) + return { + "total": total, + "skip": skip, + "limit": limit, + "locks": [CoordSessionLockResponse.model_validate(l) for l in locks], + } + + +@router.get("/check", response_model=dict, status_code=status.HTTP_200_OK) +def check_resource_locked( + project_key: str = Query(...), + resource: str = Query(...), + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Check whether a resource is currently locked.""" + lock = coord_lock_service.check_resource_locked(db, project_key, resource) + if lock: + return {"locked": True, "lock": CoordSessionLockResponse.model_validate(lock)} + return {"locked": False} + + +@router.post("", response_model=CoordSessionLockResponse, status_code=status.HTTP_201_CREATED) +def claim_lock( + data: CoordSessionLockCreate, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Claim a resource lock for a session.""" + lock = coord_lock_service.claim_lock(db, data) + return CoordSessionLockResponse.model_validate(lock) + + +@router.delete("", response_model=dict, status_code=status.HTTP_200_OK) +def release_all_session_locks( + session_id: str = Query(..., description="Release all active locks held by this session"), + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Release all active locks for a session (call on session end).""" + return coord_lock_service.release_all_session_locks(db, session_id) + + +@router.delete("/{lock_id}", response_model=CoordSessionLockResponse, status_code=status.HTTP_200_OK) +def release_lock( + lock_id: UUID, + session_id: str = Query(..., description="Must match the session that claimed the lock"), + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Release a specific lock by ID.""" + lock = coord_lock_service.release_lock(db, lock_id, session_id) + return CoordSessionLockResponse.model_validate(lock) diff --git a/api/routers/coord_messages.py b/api/routers/coord_messages.py new file mode 100644 index 0000000..2cc3489 --- /dev/null +++ b/api/routers/coord_messages.py @@ -0,0 +1,77 @@ +"""Coordination inter-session messages router.""" + +from uuid import UUID + +from fastapi import APIRouter, Depends, Query, status +from sqlalchemy.orm import Session + +from api.database import get_db +from api.middleware.auth import get_current_user +from api.schemas.coord_message import CoordMessageCreate, CoordMessageResponse +from api.services import coord_message_service + +router = APIRouter() + + +@router.get("", response_model=dict, status_code=status.HTTP_200_OK) +def list_messages( + to_session: str | None = Query(default=None), + unread_only: bool = Query(default=False), + skip: int = Query(default=0, ge=0), + limit: int = Query(default=100, ge=1, le=1000), + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """List messages with optional filters.""" + messages, total = coord_message_service.get_messages( + db, to_session=to_session, unread_only=unread_only, skip=skip, limit=limit + ) + return { + "total": total, + "skip": skip, + "limit": limit, + "messages": [CoordMessageResponse.model_validate(m) for m in messages], + } + + +@router.get("/unread-count", response_model=dict, status_code=status.HTTP_200_OK) +def get_unread_count( + session_id: str = Query(...), + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Return the count of unread messages for a session.""" + count = coord_message_service.get_unread_count(db, session_id) + return {"count": count} + + +@router.post("", response_model=CoordMessageResponse, status_code=status.HTTP_201_CREATED) +def send_message( + data: CoordMessageCreate, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Send a message to a session or broadcast.""" + msg = coord_message_service.send_message(db, data) + return CoordMessageResponse.model_validate(msg) + + +@router.put("/{message_id}/read", response_model=CoordMessageResponse, status_code=status.HTTP_200_OK) +def mark_message_read( + message_id: UUID, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Mark a message as read.""" + msg = coord_message_service.mark_read(db, message_id) + return CoordMessageResponse.model_validate(msg) + + +@router.delete("/{message_id}", response_model=dict, status_code=status.HTTP_200_OK) +def delete_message( + message_id: UUID, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Delete a message.""" + return coord_message_service.delete_message(db, message_id) diff --git a/api/routers/coord_status.py b/api/routers/coord_status.py new file mode 100644 index 0000000..9ac1994 --- /dev/null +++ b/api/routers/coord_status.py @@ -0,0 +1,46 @@ +"""Coordination status overview router.""" + +from fastapi import APIRouter, Depends, status +from sqlalchemy.orm import Session + +from api.database import get_db +from api.middleware.auth import get_current_user +from api.schemas.coord_session_lock import CoordSessionLockResponse +from api.schemas.coord_workflow import CoordWorkflowResponse +from api.schemas.coord_component_state import CoordComponentStateResponse +from api.services import coord_lock_service, coord_workflow_service, coord_component_service, coord_message_service + +router = APIRouter() + + +@router.get("", response_model=dict, status_code=status.HTTP_200_OK) +def get_coordination_status( + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Return a cross-project snapshot: active locks, in-progress workflows, component states, unread message counts.""" + active_locks, lock_total = coord_lock_service.get_active_locks(db, limit=200) + active_workflows, wf_total = coord_workflow_service.get_workflows(db, status_filter="active", limit=200) + component_states = coord_component_service.get_component_states(db) + + # Group active locks by project + locks_by_project: dict = {} + for lock in active_locks: + locks_by_project.setdefault(lock.project_key, []).append( + CoordSessionLockResponse.model_validate(lock) + ) + + # Group component states by project + components_by_project: dict = {} + for state in component_states: + components_by_project.setdefault(state.project_key, []).append( + CoordComponentStateResponse.model_validate(state) + ) + + return { + "active_lock_count": lock_total, + "active_workflow_count": wf_total, + "locks_by_project": {k: v for k, v in locks_by_project.items()}, + "active_workflows": [CoordWorkflowResponse.model_validate(w) for w in active_workflows], + "components_by_project": {k: v for k, v in components_by_project.items()}, + } diff --git a/api/routers/coord_work_items.py b/api/routers/coord_work_items.py new file mode 100644 index 0000000..a67cb30 --- /dev/null +++ b/api/routers/coord_work_items.py @@ -0,0 +1,86 @@ +"""Coordination work items router.""" + +from uuid import UUID + +from fastapi import APIRouter, Depends, Query, status +from sqlalchemy.orm import Session + +from api.database import get_db +from api.middleware.auth import get_current_user +from api.schemas.coord_work_item import CoordWorkItemCreate, CoordWorkItemResponse, CoordWorkItemUpdate +from api.services import coord_work_item_service + +router = APIRouter() + + +@router.get("", response_model=dict, status_code=status.HTTP_200_OK) +def list_work_items( + workflow_id: str | None = Query(default=None), + project_key: str | None = Query(default=None), + status_filter: str | None = Query(default=None), + assigned_session: str | None = Query(default=None), + skip: int = Query(default=0, ge=0), + limit: int = Query(default=100, ge=1, le=1000), + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """List work items with optional filters.""" + items, total = coord_work_item_service.get_work_items( + db, + workflow_id=workflow_id, + project_key=project_key, + status_filter=status_filter, + assigned_session=assigned_session, + skip=skip, + limit=limit, + ) + return { + "total": total, + "skip": skip, + "limit": limit, + "work_items": [CoordWorkItemResponse.model_validate(i) for i in items], + } + + +@router.post("", response_model=CoordWorkItemResponse, status_code=status.HTTP_201_CREATED) +def create_work_item( + data: CoordWorkItemCreate, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Create a new work item within a workflow.""" + item = coord_work_item_service.create_work_item(db, data) + return CoordWorkItemResponse.model_validate(item) + + +@router.get("/{item_id}", response_model=CoordWorkItemResponse, status_code=status.HTTP_200_OK) +def get_work_item( + item_id: UUID, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Get a work item by ID.""" + item = coord_work_item_service.get_work_item_by_id(db, item_id) + return CoordWorkItemResponse.model_validate(item) + + +@router.put("/{item_id}", response_model=CoordWorkItemResponse, status_code=status.HTTP_200_OK) +def update_work_item( + item_id: UUID, + data: CoordWorkItemUpdate, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Update a work item.""" + item = coord_work_item_service.update_work_item(db, item_id, data) + return CoordWorkItemResponse.model_validate(item) + + +@router.delete("/{item_id}", response_model=dict, status_code=status.HTTP_200_OK) +def delete_work_item( + item_id: UUID, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Delete a work item.""" + return coord_work_item_service.delete_work_item(db, item_id) diff --git a/api/routers/coord_workflows.py b/api/routers/coord_workflows.py new file mode 100644 index 0000000..4fe4001 --- /dev/null +++ b/api/routers/coord_workflows.py @@ -0,0 +1,83 @@ +"""Coordination workflows router.""" + +from uuid import UUID + +from fastapi import APIRouter, Depends, Query, status +from sqlalchemy.orm import Session + +from api.database import get_db +from api.middleware.auth import get_current_user +from api.schemas.coord_workflow import CoordWorkflowCreate, CoordWorkflowResponse, CoordWorkflowUpdate +from api.schemas.coord_work_item import CoordWorkItemResponse +from api.services import coord_workflow_service, coord_work_item_service + +router = APIRouter() + + +@router.get("", response_model=dict, status_code=status.HTTP_200_OK) +def list_workflows( + project_key: str | None = Query(default=None), + status_filter: str | None = Query(default=None), + skip: int = Query(default=0, ge=0), + limit: int = Query(default=100, ge=1, le=1000), + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """List workflows with optional filters.""" + workflows, total = coord_workflow_service.get_workflows( + db, project_key=project_key, status_filter=status_filter, skip=skip, limit=limit + ) + return { + "total": total, + "skip": skip, + "limit": limit, + "workflows": [CoordWorkflowResponse.model_validate(w) for w in workflows], + } + + +@router.post("", response_model=CoordWorkflowResponse, status_code=status.HTTP_201_CREATED) +def create_workflow( + data: CoordWorkflowCreate, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Create a new coordination workflow.""" + workflow = coord_workflow_service.create_workflow(db, data) + return CoordWorkflowResponse.model_validate(workflow) + + +@router.get("/{workflow_id}", response_model=dict, status_code=status.HTTP_200_OK) +def get_workflow( + workflow_id: UUID, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Get a workflow by ID including its work items.""" + workflow = coord_workflow_service.get_workflow_by_id(db, workflow_id) + items, _ = coord_work_item_service.get_work_items(db, workflow_id=str(workflow_id)) + return { + "workflow": CoordWorkflowResponse.model_validate(workflow), + "work_items": [CoordWorkItemResponse.model_validate(i) for i in items], + } + + +@router.put("/{workflow_id}", response_model=CoordWorkflowResponse, status_code=status.HTTP_200_OK) +def update_workflow( + workflow_id: UUID, + data: CoordWorkflowUpdate, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Update a workflow.""" + workflow = coord_workflow_service.update_workflow(db, workflow_id, data) + return CoordWorkflowResponse.model_validate(workflow) + + +@router.delete("/{workflow_id}", response_model=dict, status_code=status.HTTP_200_OK) +def delete_workflow( + workflow_id: UUID, + db: Session = Depends(get_db), + current_user: dict = Depends(get_current_user), +): + """Delete a workflow and its work items (cascade).""" + return coord_workflow_service.delete_workflow(db, workflow_id) diff --git a/api/schemas/coord_component_state.py b/api/schemas/coord_component_state.py new file mode 100644 index 0000000..1c5ec48 --- /dev/null +++ b/api/schemas/coord_component_state.py @@ -0,0 +1,30 @@ +"""Pydantic schemas for CoordComponentState.""" + +from datetime import datetime +from typing import Optional + +from pydantic import BaseModel, Field + + +class CoordComponentStateUpsert(BaseModel): + """Input schema for upserting a component state.""" + + state: str = Field(..., description="State: deployed, building, stable, broken, unknown", max_length=50) + version: Optional[str] = Field(None, description="Version string or git SHA", max_length=100) + notes: Optional[str] = Field(None, description="Freeform notes") + updated_by: str = Field(..., description="Session performing this update", max_length=200) + + +class CoordComponentStateResponse(BaseModel): + """Output schema for a component state.""" + + project_key: str + component: str + state: str + version: Optional[str] + notes: Optional[str] + updated_by: str + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} diff --git a/api/schemas/coord_message.py b/api/schemas/coord_message.py new file mode 100644 index 0000000..3ef2e99 --- /dev/null +++ b/api/schemas/coord_message.py @@ -0,0 +1,39 @@ +"""Pydantic schemas for CoordMessage.""" + +from datetime import datetime +from typing import Optional +from uuid import UUID + +from pydantic import BaseModel, Field + + +class CoordMessageCreate(BaseModel): + """Input schema for sending a message.""" + + from_session: str = Field(..., description="Sending session identifier", max_length=200) + to_session: Optional[str] = Field(None, description="Recipient session; NULL = broadcast", max_length=200) + project_key: Optional[str] = Field(None, description="Optional project context", max_length=200) + subject: str = Field(..., description="Message subject", max_length=500) + body: str = Field(..., description="Message body, markdown ok") + + +class CoordMessageUpdate(BaseModel): + """Input schema for updating a message (mark read, etc.).""" + + read_at: Optional[datetime] = None + + +class CoordMessageResponse(BaseModel): + """Output schema for a message.""" + + id: UUID + from_session: str + to_session: Optional[str] + project_key: Optional[str] + subject: str + body: str + read_at: Optional[datetime] + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} diff --git a/api/schemas/coord_session_lock.py b/api/schemas/coord_session_lock.py new file mode 100644 index 0000000..5b9a132 --- /dev/null +++ b/api/schemas/coord_session_lock.py @@ -0,0 +1,41 @@ +"""Pydantic schemas for CoordSessionLock.""" + +from datetime import datetime +from typing import Optional +from uuid import UUID + +from pydantic import BaseModel, Field + + +class CoordSessionLockCreate(BaseModel): + """Input schema for claiming a lock.""" + + project_key: str = Field(..., description="Project namespace", max_length=200) + session_id: str = Field(..., description="Session claiming the lock", max_length=200) + resource: str = Field(..., description="Resource path being locked, e.g. 'server/src/'", max_length=500) + description: Optional[str] = Field(None, description="Why this lock is needed") + ttl_hours: float = Field(2.0, description="Lock lifetime in hours; 0 = no expiry", ge=0) + + +class CoordSessionLockUpdate(BaseModel): + """Input schema for updating a lock (rarely needed directly).""" + + description: Optional[str] = None + expires_at: Optional[datetime] = None + + +class CoordSessionLockResponse(BaseModel): + """Output schema for a lock.""" + + id: UUID + project_key: str + session_id: str + resource: str + description: Optional[str] + acquired_at: datetime + expires_at: Optional[datetime] + released_at: Optional[datetime] + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} diff --git a/api/schemas/coord_work_item.py b/api/schemas/coord_work_item.py new file mode 100644 index 0000000..cbf8a66 --- /dev/null +++ b/api/schemas/coord_work_item.py @@ -0,0 +1,53 @@ +"""Pydantic schemas for CoordWorkItem.""" + +from datetime import datetime +from typing import Optional +from uuid import UUID + +from pydantic import BaseModel, Field + + +class CoordWorkItemCreate(BaseModel): + """Input schema for creating a work item.""" + + workflow_id: str = Field(..., description="Parent workflow UUID", max_length=36) + project_key: str = Field(..., description="Project namespace slug", max_length=200) + title: str = Field(..., description="Short title", max_length=500) + description: Optional[str] = Field(None, description="Full description, markdown ok") + status: str = Field("pending", description="Status: pending, in_progress, blocked, completed, cancelled") + priority: int = Field(0, description="Higher value = more urgent") + assigned_session: Optional[str] = Field(None, max_length=200) + depends_on_id: Optional[str] = Field(None, description="Blocking predecessor item UUID", max_length=36) + + +class CoordWorkItemUpdate(BaseModel): + """Input schema for updating a work item. All fields optional.""" + + title: Optional[str] = Field(None, max_length=500) + description: Optional[str] = None + status: Optional[str] = Field(None, description="Status: pending, in_progress, blocked, completed, cancelled") + priority: Optional[int] = None + assigned_session: Optional[str] = Field(None, max_length=200) + depends_on_id: Optional[str] = Field(None, max_length=36) + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + + +class CoordWorkItemResponse(BaseModel): + """Output schema for a work item.""" + + id: UUID + workflow_id: str + project_key: str + title: str + description: Optional[str] + status: str + priority: int + assigned_session: Optional[str] + depends_on_id: Optional[str] + started_at: Optional[datetime] + completed_at: Optional[datetime] + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} diff --git a/api/schemas/coord_workflow.py b/api/schemas/coord_workflow.py new file mode 100644 index 0000000..6eb91b4 --- /dev/null +++ b/api/schemas/coord_workflow.py @@ -0,0 +1,42 @@ +"""Pydantic schemas for CoordWorkflow.""" + +from datetime import datetime +from typing import Optional +from uuid import UUID + +from pydantic import BaseModel, Field + + +class CoordWorkflowCreate(BaseModel): + """Input schema for creating a workflow.""" + + project_key: str = Field(..., description="Project namespace slug", max_length=200) + name: str = Field(..., description="Short workflow identifier", max_length=200) + description: Optional[str] = Field(None, description="Freeform description, markdown ok") + status: str = Field("planning", description="Status: planning, active, blocked, completed, cancelled") + created_by: str = Field(..., description="Creating session identifier", max_length=200) + + +class CoordWorkflowUpdate(BaseModel): + """Input schema for updating a workflow. All fields optional.""" + + name: Optional[str] = Field(None, max_length=200) + description: Optional[str] = None + status: Optional[str] = Field(None, description="Status: planning, active, blocked, completed, cancelled") + completed_at: Optional[datetime] = None + + +class CoordWorkflowResponse(BaseModel): + """Output schema for a workflow.""" + + id: UUID + project_key: str + name: str + description: Optional[str] + status: str + created_by: str + completed_at: Optional[datetime] + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} diff --git a/api/services/coord_component_service.py b/api/services/coord_component_service.py new file mode 100644 index 0000000..e900109 --- /dev/null +++ b/api/services/coord_component_service.py @@ -0,0 +1,63 @@ +"""Service layer for CoordComponentState.""" + +from typing import Optional + +from fastapi import HTTPException, status +from sqlalchemy.dialects.mysql import insert +from sqlalchemy.orm import Session + +from api.models.coord_component_state import CoordComponentState +from api.schemas.coord_component_state import CoordComponentStateUpsert + + +def get_component_states( + db: Session, + project_key: Optional[str] = None, +) -> list[CoordComponentState]: + """Return all component states, optionally filtered by project.""" + q = db.query(CoordComponentState) + if project_key: + q = q.filter(CoordComponentState.project_key == project_key) + return q.order_by(CoordComponentState.project_key, CoordComponentState.component).all() + + +def upsert_component_state( + db: Session, + project_key: str, + component: str, + data: CoordComponentStateUpsert, +) -> CoordComponentState: + """Insert or update a component state using MariaDB ON DUPLICATE KEY UPDATE.""" + try: + stmt = insert(CoordComponentState).values( + project_key=project_key, + component=component, + state=data.state, + version=data.version, + notes=data.notes, + updated_by=data.updated_by, + ) + stmt = stmt.on_duplicate_key_update( + state=stmt.inserted.state, + version=stmt.inserted.version, + notes=stmt.inserted.notes, + updated_by=stmt.inserted.updated_by, + ) + db.execute(stmt) + db.commit() + + record = ( + db.query(CoordComponentState) + .filter( + CoordComponentState.project_key == project_key, + CoordComponentState.component == component, + ) + .first() + ) + return record + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to upsert component state: {e}" + ) diff --git a/api/services/coord_lock_service.py b/api/services/coord_lock_service.py new file mode 100644 index 0000000..80803b9 --- /dev/null +++ b/api/services/coord_lock_service.py @@ -0,0 +1,128 @@ +"""Service layer for CoordSessionLock.""" + +from datetime import datetime, timedelta, timezone +from typing import Optional +from uuid import UUID + +from fastapi import HTTPException, status +from sqlalchemy import and_, or_ +from sqlalchemy.orm import Session + +from api.models.coord_session_lock import CoordSessionLock +from api.schemas.coord_session_lock import CoordSessionLockCreate + + +def _active_filter(q): + """Apply the 'lock is currently active' predicate to a query.""" + now = datetime.now(timezone.utc).replace(tzinfo=None) + return q.filter( + CoordSessionLock.released_at.is_(None), + or_( + CoordSessionLock.expires_at.is_(None), + CoordSessionLock.expires_at > now, + ), + ) + + +def get_active_locks( + db: Session, + project_key: Optional[str] = None, + session_id: Optional[str] = None, + skip: int = 0, + limit: int = 100, +) -> tuple[list[CoordSessionLock], int]: + """Return currently active locks with optional filters.""" + q = db.query(CoordSessionLock) + if project_key: + q = q.filter(CoordSessionLock.project_key == project_key) + if session_id: + q = q.filter(CoordSessionLock.session_id == session_id) + q = _active_filter(q) + total = q.count() + locks = q.order_by(CoordSessionLock.acquired_at.desc()).offset(skip).limit(limit).all() + return locks, total + + +def check_resource_locked( + db: Session, project_key: str, resource: str +) -> Optional[CoordSessionLock]: + """Return the active lock on a resource, or None if unlocked.""" + q = db.query(CoordSessionLock).filter( + CoordSessionLock.project_key == project_key, + CoordSessionLock.resource == resource, + ) + return _active_filter(q).first() + + +def claim_lock(db: Session, data: CoordSessionLockCreate) -> CoordSessionLock: + """Claim a resource lock, computing expires_at from ttl_hours.""" + expires_at: Optional[datetime] = None + if data.ttl_hours > 0: + expires_at = datetime.now(timezone.utc).replace(tzinfo=None) + timedelta(hours=data.ttl_hours) + + try: + lock = CoordSessionLock( + project_key=data.project_key, + session_id=data.session_id, + resource=data.resource, + description=data.description, + acquired_at=datetime.now(timezone.utc).replace(tzinfo=None), + expires_at=expires_at, + released_at=None, + ) + db.add(lock) + db.commit() + db.refresh(lock) + return lock + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to claim lock: {e}" + ) + + +def release_lock(db: Session, lock_id: UUID, session_id: str) -> CoordSessionLock: + """Release a specific lock; only the owning session may release it.""" + lock = db.query(CoordSessionLock).filter(CoordSessionLock.id == str(lock_id)).first() + if not lock: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Lock {lock_id} not found" + ) + if lock.session_id != session_id: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Only the session that claimed this lock may release it" + ) + try: + lock.released_at = datetime.now(timezone.utc).replace(tzinfo=None) + db.commit() + db.refresh(lock) + return lock + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to release lock: {e}" + ) + + +def release_all_session_locks(db: Session, session_id: str) -> dict: + """Release all active locks held by a session (cleanup on session end).""" + now = datetime.now(timezone.utc).replace(tzinfo=None) + try: + q = db.query(CoordSessionLock).filter( + CoordSessionLock.session_id == session_id, + CoordSessionLock.released_at.is_(None), + ) + count = q.count() + q.update({"released_at": now}, synchronize_session=False) + db.commit() + return {"message": f"Released {count} lock(s) for session '{session_id}'", "count": count} + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to release session locks: {e}" + ) diff --git a/api/services/coord_message_service.py b/api/services/coord_message_service.py new file mode 100644 index 0000000..8a34a2a --- /dev/null +++ b/api/services/coord_message_service.py @@ -0,0 +1,96 @@ +"""Service layer for CoordMessage.""" + +from datetime import datetime, timezone +from typing import Optional +from uuid import UUID + +from fastapi import HTTPException, status +from sqlalchemy.orm import Session + +from api.models.coord_message import CoordMessage +from api.schemas.coord_message import CoordMessageCreate + + +def get_messages( + db: Session, + to_session: Optional[str] = None, + unread_only: bool = False, + skip: int = 0, + limit: int = 100, +) -> tuple[list[CoordMessage], int]: + """Return paginated messages with optional filters.""" + q = db.query(CoordMessage) + if to_session: + q = q.filter(CoordMessage.to_session == to_session) + if unread_only: + q = q.filter(CoordMessage.read_at.is_(None)) + total = q.count() + messages = q.order_by(CoordMessage.created_at.desc()).offset(skip).limit(limit).all() + return messages, total + + +def get_unread_count(db: Session, session_id: str) -> int: + """Return the number of unread messages addressed to a session.""" + return ( + db.query(CoordMessage) + .filter(CoordMessage.to_session == session_id, CoordMessage.read_at.is_(None)) + .count() + ) + + +def get_message_by_id(db: Session, message_id: UUID) -> CoordMessage: + """Return a single message or raise 404.""" + msg = db.query(CoordMessage).filter(CoordMessage.id == str(message_id)).first() + if not msg: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Message {message_id} not found" + ) + return msg + + +def send_message(db: Session, data: CoordMessageCreate) -> CoordMessage: + """Persist a new message.""" + try: + msg = CoordMessage(**data.model_dump()) + db.add(msg) + db.commit() + db.refresh(msg) + return msg + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to send message: {e}" + ) + + +def mark_read(db: Session, message_id: UUID) -> CoordMessage: + """Mark a message as read at the current time.""" + msg = get_message_by_id(db, message_id) + try: + msg.read_at = datetime.now(timezone.utc).replace(tzinfo=None) + db.commit() + db.refresh(msg) + return msg + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to mark message read: {e}" + ) + + +def delete_message(db: Session, message_id: UUID) -> dict: + """Delete a message by ID.""" + msg = get_message_by_id(db, message_id) + try: + db.delete(msg) + db.commit() + return {"message": "Message deleted", "message_id": str(message_id)} + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to delete message: {e}" + ) diff --git a/api/services/coord_work_item_service.py b/api/services/coord_work_item_service.py new file mode 100644 index 0000000..162e71a --- /dev/null +++ b/api/services/coord_work_item_service.py @@ -0,0 +1,100 @@ +"""Service layer for CoordWorkItem.""" + +from typing import Optional +from uuid import UUID + +from fastapi import HTTPException, status +from sqlalchemy.orm import Session + +from api.models.coord_work_item import CoordWorkItem +from api.models.coord_workflow import CoordWorkflow +from api.schemas.coord_work_item import CoordWorkItemCreate, CoordWorkItemUpdate + + +def get_work_items( + db: Session, + workflow_id: Optional[str] = None, + project_key: Optional[str] = None, + status_filter: Optional[str] = None, + assigned_session: Optional[str] = None, + skip: int = 0, + limit: int = 100, +) -> tuple[list[CoordWorkItem], int]: + """Return paginated work items with optional filters.""" + q = db.query(CoordWorkItem) + if workflow_id: + q = q.filter(CoordWorkItem.workflow_id == workflow_id) + if project_key: + q = q.filter(CoordWorkItem.project_key == project_key) + if status_filter: + q = q.filter(CoordWorkItem.status == status_filter) + if assigned_session: + q = q.filter(CoordWorkItem.assigned_session == assigned_session) + total = q.count() + items = q.order_by(CoordWorkItem.priority.desc(), CoordWorkItem.created_at.asc()).offset(skip).limit(limit).all() + return items, total + + +def get_work_item_by_id(db: Session, item_id: UUID) -> CoordWorkItem: + """Return a single work item or raise 404.""" + item = db.query(CoordWorkItem).filter(CoordWorkItem.id == str(item_id)).first() + if not item: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Work item {item_id} not found" + ) + return item + + +def create_work_item(db: Session, data: CoordWorkItemCreate) -> CoordWorkItem: + """Create and persist a new work item, validating the parent workflow exists.""" + workflow = db.query(CoordWorkflow).filter(CoordWorkflow.id == data.workflow_id).first() + if not workflow: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Workflow {data.workflow_id} not found" + ) + try: + item = CoordWorkItem(**data.model_dump()) + db.add(item) + db.commit() + db.refresh(item) + return item + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to create work item: {e}" + ) + + +def update_work_item(db: Session, item_id: UUID, data: CoordWorkItemUpdate) -> CoordWorkItem: + """Apply partial update to a work item.""" + item = get_work_item_by_id(db, item_id) + try: + for field, value in data.model_dump(exclude_unset=True).items(): + setattr(item, field, value) + db.commit() + db.refresh(item) + return item + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to update work item: {e}" + ) + + +def delete_work_item(db: Session, item_id: UUID) -> dict: + """Delete a work item by ID.""" + item = get_work_item_by_id(db, item_id) + try: + db.delete(item) + db.commit() + return {"message": "Work item deleted", "item_id": str(item_id)} + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to delete work item: {e}" + ) diff --git a/api/services/coord_workflow_service.py b/api/services/coord_workflow_service.py new file mode 100644 index 0000000..4fdb560 --- /dev/null +++ b/api/services/coord_workflow_service.py @@ -0,0 +1,87 @@ +"""Service layer for CoordWorkflow.""" + +from typing import Optional +from uuid import UUID + +from fastapi import HTTPException, status +from sqlalchemy.orm import Session + +from api.models.coord_workflow import CoordWorkflow +from api.schemas.coord_workflow import CoordWorkflowCreate, CoordWorkflowUpdate + + +def get_workflows( + db: Session, + project_key: Optional[str] = None, + status_filter: Optional[str] = None, + skip: int = 0, + limit: int = 100, +) -> tuple[list[CoordWorkflow], int]: + """Return paginated workflows with optional filters.""" + q = db.query(CoordWorkflow) + if project_key: + q = q.filter(CoordWorkflow.project_key == project_key) + if status_filter: + q = q.filter(CoordWorkflow.status == status_filter) + total = q.count() + workflows = q.order_by(CoordWorkflow.created_at.desc()).offset(skip).limit(limit).all() + return workflows, total + + +def get_workflow_by_id(db: Session, workflow_id: UUID) -> CoordWorkflow: + """Return a single workflow or raise 404.""" + workflow = db.query(CoordWorkflow).filter(CoordWorkflow.id == str(workflow_id)).first() + if not workflow: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Workflow {workflow_id} not found" + ) + return workflow + + +def create_workflow(db: Session, data: CoordWorkflowCreate) -> CoordWorkflow: + """Create and persist a new workflow.""" + try: + workflow = CoordWorkflow(**data.model_dump()) + db.add(workflow) + db.commit() + db.refresh(workflow) + return workflow + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to create workflow: {e}" + ) + + +def update_workflow(db: Session, workflow_id: UUID, data: CoordWorkflowUpdate) -> CoordWorkflow: + """Apply partial update to a workflow.""" + workflow = get_workflow_by_id(db, workflow_id) + try: + for field, value in data.model_dump(exclude_unset=True).items(): + setattr(workflow, field, value) + db.commit() + db.refresh(workflow) + return workflow + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to update workflow: {e}" + ) + + +def delete_workflow(db: Session, workflow_id: UUID) -> dict: + """Delete a workflow by ID.""" + workflow = get_workflow_by_id(db, workflow_id) + try: + db.delete(workflow) + db.commit() + return {"message": "Workflow deleted", "workflow_id": str(workflow_id)} + except Exception as e: + db.rollback() + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"Failed to delete workflow: {e}" + ) diff --git a/migrations/versions/20260512_120000_coord_agent_coordination.py b/migrations/versions/20260512_120000_coord_agent_coordination.py new file mode 100644 index 0000000..9100c48 --- /dev/null +++ b/migrations/versions/20260512_120000_coord_agent_coordination.py @@ -0,0 +1,122 @@ +"""coord_agent_coordination + +Revision ID: 20260512_120000 +Revises: 20260309_074038 +Create Date: 2026-05-12 12:00:00 + +Creates the agent coordination tables: +- coord_workflows: Named units of work spanning one or more sessions +- coord_work_items: Discrete tasks within a workflow +- coord_session_locks: Exclusive resource locks claimed by sessions +- coord_component_states: Current state of named components per project (composite PK) +- coord_messages: Inter-session messages and broadcasts +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +revision: str = "20260512_120000" +down_revision: Union[str, None] = "20260309_074038" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create all coordination tables.""" + + op.create_table( + "coord_workflows", + sa.Column("id", sa.CHAR(36), primary_key=True), + sa.Column("project_key", sa.String(200), nullable=False), + sa.Column("name", sa.String(200), nullable=False), + sa.Column("description", sa.Text(), nullable=True), + sa.Column("status", sa.String(20), nullable=False, server_default="planning"), + sa.Column("created_by", sa.String(200), nullable=False), + sa.Column("completed_at", sa.DateTime(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False, server_default=sa.text("CURRENT_TIMESTAMP")), + sa.Column("updated_at", sa.DateTime(), nullable=False, server_default=sa.text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")), + sa.CheckConstraint( + "status IN ('planning', 'active', 'blocked', 'completed', 'cancelled')", + name="ck_coord_workflows_status", + ), + ) + op.create_index("idx_coord_workflows_project_status", "coord_workflows", ["project_key", "status"]) + + op.create_table( + "coord_work_items", + sa.Column("id", sa.CHAR(36), primary_key=True), + sa.Column("workflow_id", sa.CHAR(36), sa.ForeignKey("coord_workflows.id", ondelete="CASCADE"), nullable=False), + sa.Column("project_key", sa.String(200), nullable=False), + sa.Column("title", sa.String(500), nullable=False), + sa.Column("description", sa.Text(), nullable=True), + sa.Column("status", sa.String(20), nullable=False, server_default="pending"), + sa.Column("priority", sa.Integer(), nullable=False, server_default="0"), + sa.Column("assigned_session", sa.String(200), nullable=True), + sa.Column("depends_on_id", sa.CHAR(36), sa.ForeignKey("coord_work_items.id", ondelete="SET NULL"), nullable=True), + sa.Column("started_at", sa.DateTime(), nullable=True), + sa.Column("completed_at", sa.DateTime(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False, server_default=sa.text("CURRENT_TIMESTAMP")), + sa.Column("updated_at", sa.DateTime(), nullable=False, server_default=sa.text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")), + sa.CheckConstraint( + "status IN ('pending', 'in_progress', 'blocked', 'completed', 'cancelled')", + name="ck_coord_work_items_status", + ), + ) + op.create_index("idx_coord_work_items_workflow", "coord_work_items", ["workflow_id"]) + op.create_index("idx_coord_work_items_project_status", "coord_work_items", ["project_key", "status"]) + op.create_index("idx_coord_work_items_assigned", "coord_work_items", ["assigned_session"]) + + op.create_table( + "coord_session_locks", + sa.Column("id", sa.CHAR(36), primary_key=True), + sa.Column("project_key", sa.String(200), nullable=False), + sa.Column("session_id", sa.String(200), nullable=False), + sa.Column("resource", sa.String(500), nullable=False), + sa.Column("description", sa.Text(), nullable=True), + sa.Column("acquired_at", sa.DateTime(), nullable=False, server_default=sa.text("CURRENT_TIMESTAMP")), + sa.Column("expires_at", sa.DateTime(), nullable=True), + sa.Column("released_at", sa.DateTime(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False, server_default=sa.text("CURRENT_TIMESTAMP")), + sa.Column("updated_at", sa.DateTime(), nullable=False, server_default=sa.text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")), + ) + op.create_index("idx_coord_locks_project_resource", "coord_session_locks", ["project_key", "resource"]) + op.create_index("idx_coord_locks_session", "coord_session_locks", ["session_id"]) + + op.create_table( + "coord_component_states", + sa.Column("project_key", sa.String(200), nullable=False), + sa.Column("component", sa.String(200), nullable=False), + sa.Column("state", sa.String(50), nullable=False), + sa.Column("version", sa.String(100), nullable=True), + sa.Column("notes", sa.Text(), nullable=True), + sa.Column("updated_by", sa.String(200), nullable=False), + sa.Column("created_at", sa.DateTime(), nullable=False, server_default=sa.text("CURRENT_TIMESTAMP")), + sa.Column("updated_at", sa.DateTime(), nullable=False, server_default=sa.text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")), + sa.PrimaryKeyConstraint("project_key", "component", name="pk_coord_component_states"), + ) + + op.create_table( + "coord_messages", + sa.Column("id", sa.CHAR(36), primary_key=True), + sa.Column("from_session", sa.String(200), nullable=False), + sa.Column("to_session", sa.String(200), nullable=True), + sa.Column("project_key", sa.String(200), nullable=True), + sa.Column("subject", sa.String(500), nullable=False), + sa.Column("body", sa.Text(), nullable=False), + sa.Column("read_at", sa.DateTime(), nullable=True), + sa.Column("created_at", sa.DateTime(), nullable=False, server_default=sa.text("CURRENT_TIMESTAMP")), + sa.Column("updated_at", sa.DateTime(), nullable=False, server_default=sa.text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP")), + ) + op.create_index("idx_coord_messages_to_read", "coord_messages", ["to_session", "read_at"]) + op.create_index("idx_coord_messages_from", "coord_messages", ["from_session"]) + + +def downgrade() -> None: + """Drop all coordination tables in reverse dependency order.""" + op.drop_table("coord_messages") + op.drop_table("coord_component_states") + op.drop_table("coord_session_locks") + op.drop_table("coord_work_items") + op.drop_table("coord_workflows")