Adds /api/coord/* endpoints for real-time cross-session coordination: - coord_workflows: named units of work per project - coord_work_items: tasks within workflows with dependency chains - coord_session_locks: exclusive resource locks with auto-expiry (TTL) - coord_component_states: live component state per project (upsert) - coord_messages: cross-session messaging and broadcasts - /api/coord/status: cross-project snapshot endpoint Replaces PROJECT_STATE.md as the coordination layer for Claude sessions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
88 lines
2.8 KiB
Python
88 lines
2.8 KiB
Python
"""Service layer for CoordWorkflow."""
|
|
|
|
from typing import Optional
|
|
from uuid import UUID
|
|
|
|
from fastapi import HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
|
|
from api.models.coord_workflow import CoordWorkflow
|
|
from api.schemas.coord_workflow import CoordWorkflowCreate, CoordWorkflowUpdate
|
|
|
|
|
|
def get_workflows(
|
|
db: Session,
|
|
project_key: Optional[str] = None,
|
|
status_filter: Optional[str] = None,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
) -> tuple[list[CoordWorkflow], int]:
|
|
"""Return paginated workflows with optional filters."""
|
|
q = db.query(CoordWorkflow)
|
|
if project_key:
|
|
q = q.filter(CoordWorkflow.project_key == project_key)
|
|
if status_filter:
|
|
q = q.filter(CoordWorkflow.status == status_filter)
|
|
total = q.count()
|
|
workflows = q.order_by(CoordWorkflow.created_at.desc()).offset(skip).limit(limit).all()
|
|
return workflows, total
|
|
|
|
|
|
def get_workflow_by_id(db: Session, workflow_id: UUID) -> CoordWorkflow:
|
|
"""Return a single workflow or raise 404."""
|
|
workflow = db.query(CoordWorkflow).filter(CoordWorkflow.id == str(workflow_id)).first()
|
|
if not workflow:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Workflow {workflow_id} not found"
|
|
)
|
|
return workflow
|
|
|
|
|
|
def create_workflow(db: Session, data: CoordWorkflowCreate) -> CoordWorkflow:
|
|
"""Create and persist a new workflow."""
|
|
try:
|
|
workflow = CoordWorkflow(**data.model_dump())
|
|
db.add(workflow)
|
|
db.commit()
|
|
db.refresh(workflow)
|
|
return workflow
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to create workflow: {e}"
|
|
)
|
|
|
|
|
|
def update_workflow(db: Session, workflow_id: UUID, data: CoordWorkflowUpdate) -> CoordWorkflow:
|
|
"""Apply partial update to a workflow."""
|
|
workflow = get_workflow_by_id(db, workflow_id)
|
|
try:
|
|
for field, value in data.model_dump(exclude_unset=True).items():
|
|
setattr(workflow, field, value)
|
|
db.commit()
|
|
db.refresh(workflow)
|
|
return workflow
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to update workflow: {e}"
|
|
)
|
|
|
|
|
|
def delete_workflow(db: Session, workflow_id: UUID) -> dict:
|
|
"""Delete a workflow by ID."""
|
|
workflow = get_workflow_by_id(db, workflow_id)
|
|
try:
|
|
db.delete(workflow)
|
|
db.commit()
|
|
return {"message": "Workflow deleted", "workflow_id": str(workflow_id)}
|
|
except Exception as e:
|
|
db.rollback()
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Failed to delete workflow: {e}"
|
|
)
|