""" Credential Audit Logs API router for ClaudeTools. This module defines all REST API endpoints for viewing credential audit logs (read-only). """ from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.orm import Session from api.database import get_db from api.middleware.auth import get_current_user from api.schemas.credential_audit_log import CredentialAuditLogResponse from api.services import credential_audit_log_service # Create router with prefix and tags router = APIRouter() @router.get( "", response_model=dict, summary="List all credential audit logs", description="Retrieve a paginated list of all credential audit log entries", status_code=status.HTTP_200_OK, ) def list_credential_audit_logs( skip: int = Query( default=0, ge=0, description="Number of records to skip for pagination" ), limit: int = Query( default=100, ge=1, le=1000, description="Maximum number of records to return (max 1000)" ), db: Session = Depends(get_db), current_user: dict = Depends(get_current_user), ): """ List all credential audit logs with pagination. - **skip**: Number of logs to skip (default: 0) - **limit**: Maximum number of logs to return (default: 100, max: 1000) Returns a list of audit log entries with pagination metadata. Logs are ordered by timestamp descending (most recent first). **Note**: Audit logs are read-only and immutable. """ try: logs, total = credential_audit_log_service.get_credential_audit_logs(db, skip, limit) return { "total": total, "skip": skip, "limit": limit, "logs": [CredentialAuditLogResponse.model_validate(log) for log in logs] } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve credential audit logs: {str(e)}" ) @router.get( "/{log_id}", response_model=CredentialAuditLogResponse, summary="Get credential audit log by ID", description="Retrieve a single credential audit log entry by its unique identifier", status_code=status.HTTP_200_OK, ) def get_credential_audit_log( log_id: UUID, db: Session = Depends(get_db), current_user: dict = Depends(get_current_user), ): """ Get a specific credential audit log entry by ID. - **log_id**: UUID of the audit log entry to retrieve Returns the complete audit log details including action, user, timestamp, and context. """ log = credential_audit_log_service.get_credential_audit_log_by_id(db, log_id) return CredentialAuditLogResponse.model_validate(log) @router.get( "/by-credential/{credential_id}", response_model=dict, summary="Get audit logs for a credential", description="Retrieve all audit log entries for a specific credential", status_code=status.HTTP_200_OK, ) def get_credential_audit_logs_by_credential( credential_id: UUID, skip: int = Query(default=0, ge=0, description="Number of records to skip"), limit: int = Query(default=100, ge=1, le=1000, description="Maximum number of records to return"), db: Session = Depends(get_db), current_user: dict = Depends(get_current_user), ): """ Get all audit log entries for a specific credential. - **credential_id**: UUID of the credential - **skip**: Number of logs to skip (default: 0) - **limit**: Maximum number of logs to return (default: 100, max: 1000) Returns all operations performed on this credential including views, updates, and deletions. Logs are ordered by timestamp descending (most recent first). """ try: logs, total = credential_audit_log_service.get_credential_audit_logs_by_credential( db, credential_id, skip, limit ) return { "total": total, "skip": skip, "limit": limit, "credential_id": str(credential_id), "logs": [CredentialAuditLogResponse.model_validate(log) for log in logs] } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve credential audit logs: {str(e)}" ) @router.get( "/by-user/{user_id}", response_model=dict, summary="Get audit logs for a user", description="Retrieve all audit log entries for a specific user", status_code=status.HTTP_200_OK, ) def get_credential_audit_logs_by_user( user_id: str, skip: int = Query(default=0, ge=0, description="Number of records to skip"), limit: int = Query(default=100, ge=1, le=1000, description="Maximum number of records to return"), db: Session = Depends(get_db), current_user: dict = Depends(get_current_user), ): """ Get all audit log entries for a specific user. - **user_id**: User ID to filter by (JWT sub claim) - **skip**: Number of logs to skip (default: 0) - **limit**: Maximum number of logs to return (default: 100, max: 1000) Returns all credential operations performed by this user. Logs are ordered by timestamp descending (most recent first). """ try: logs, total = credential_audit_log_service.get_credential_audit_logs_by_user( db, user_id, skip, limit ) return { "total": total, "skip": skip, "limit": limit, "user_id": user_id, "logs": [CredentialAuditLogResponse.model_validate(log) for log in logs] } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve credential audit logs: {str(e)}" )