""" Credential audit log service layer for business logic and database operations. This module handles read-only operations for credential audit logs. """ from uuid import UUID from fastapi import HTTPException, status from sqlalchemy.orm import Session from api.models.credential_audit_log import CredentialAuditLog def get_credential_audit_logs(db: Session, skip: int = 0, limit: int = 100) -> tuple[list[CredentialAuditLog], int]: """ Retrieve a paginated list of credential audit logs. Args: db: Database session skip: Number of records to skip (for pagination) limit: Maximum number of records to return Returns: tuple: (list of audit logs, total count) Example: ```python logs, total = get_credential_audit_logs(db, skip=0, limit=50) print(f"Retrieved {len(logs)} of {total} audit logs") ``` """ # Get total count total = db.query(CredentialAuditLog).count() # Get paginated results, ordered by timestamp descending (newest first) logs = ( db.query(CredentialAuditLog) .order_by(CredentialAuditLog.timestamp.desc()) .offset(skip) .limit(limit) .all() ) return logs, total def get_credential_audit_log_by_id(db: Session, log_id: UUID) -> CredentialAuditLog: """ Retrieve a single credential audit log by its ID. Args: db: Database session log_id: UUID of the audit log to retrieve Returns: CredentialAuditLog: The audit log object Raises: HTTPException: 404 if audit log not found Example: ```python log = get_credential_audit_log_by_id(db, log_id) print(f"Found audit log: {log.action} by {log.user_id}") ``` """ log = db.query(CredentialAuditLog).filter(CredentialAuditLog.id == str(log_id)).first() if not log: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Credential audit log with ID {log_id} not found" ) return log def get_credential_audit_logs_by_credential( db: Session, credential_id: UUID, skip: int = 0, limit: int = 100 ) -> tuple[list[CredentialAuditLog], int]: """ Retrieve audit logs for a specific credential. Args: db: Database session credential_id: UUID of the credential skip: Number of records to skip limit: Maximum number of records to return Returns: tuple: (list of audit logs, total count) Example: ```python logs, total = get_credential_audit_logs_by_credential(db, credential_id, skip=0, limit=50) print(f"Credential has {total} audit log entries") ``` """ # Get total count for this credential total = ( db.query(CredentialAuditLog) .filter(CredentialAuditLog.credential_id == str(credential_id)) .count() ) # Get paginated results logs = ( db.query(CredentialAuditLog) .filter(CredentialAuditLog.credential_id == str(credential_id)) .order_by(CredentialAuditLog.timestamp.desc()) .offset(skip) .limit(limit) .all() ) return logs, total def get_credential_audit_logs_by_user( db: Session, user_id: str, skip: int = 0, limit: int = 100 ) -> tuple[list[CredentialAuditLog], int]: """ Retrieve audit logs for a specific user. Args: db: Database session user_id: User ID to filter by skip: Number of records to skip limit: Maximum number of records to return Returns: tuple: (list of audit logs, total count) Example: ```python logs, total = get_credential_audit_logs_by_user(db, "user123", skip=0, limit=50) print(f"User has {total} audit log entries") ``` """ # Get total count for this user total = ( db.query(CredentialAuditLog) .filter(CredentialAuditLog.user_id == user_id) .count() ) # Get paginated results logs = ( db.query(CredentialAuditLog) .filter(CredentialAuditLog.user_id == user_id) .order_by(CredentialAuditLog.timestamp.desc()) .offset(skip) .limit(limit) .all() ) return logs, total