""" Security incident service layer for business logic and database operations. This module handles all database operations for security incidents. """ from typing import Optional from uuid import UUID from fastapi import HTTPException, status from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from api.models.security_incident import SecurityIncident from api.schemas.security_incident import SecurityIncidentCreate, SecurityIncidentUpdate def get_security_incidents(db: Session, skip: int = 0, limit: int = 100) -> tuple[list[SecurityIncident], int]: """ Retrieve a paginated list of security incidents. Args: db: Database session skip: Number of records to skip (for pagination) limit: Maximum number of records to return Returns: tuple: (list of security incidents, total count) Example: ```python incidents, total = get_security_incidents(db, skip=0, limit=50) print(f"Retrieved {len(incidents)} of {total} security incidents") ``` """ # Get total count total = db.query(SecurityIncident).count() # Get paginated results, ordered by incident_date descending (most recent first) incidents = ( db.query(SecurityIncident) .order_by(SecurityIncident.incident_date.desc()) .offset(skip) .limit(limit) .all() ) return incidents, total def get_security_incident_by_id(db: Session, incident_id: UUID) -> SecurityIncident: """ Retrieve a single security incident by its ID. Args: db: Database session incident_id: UUID of the security incident to retrieve Returns: SecurityIncident: The security incident object Raises: HTTPException: 404 if security incident not found Example: ```python incident = get_security_incident_by_id(db, incident_id) print(f"Found incident: {incident.incident_type} - {incident.severity}") ``` """ incident = db.query(SecurityIncident).filter(SecurityIncident.id == str(incident_id)).first() if not incident: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Security incident with ID {incident_id} not found" ) return incident def get_security_incidents_by_client( db: Session, client_id: UUID, skip: int = 0, limit: int = 100 ) -> tuple[list[SecurityIncident], int]: """ Retrieve security incidents for a specific client. Args: db: Database session client_id: UUID of the client skip: Number of records to skip limit: Maximum number of records to return Returns: tuple: (list of security incidents, total count) Example: ```python incidents, total = get_security_incidents_by_client(db, client_id, skip=0, limit=50) print(f"Client has {total} security incidents") ``` """ # Get total count for this client total = db.query(SecurityIncident).filter(SecurityIncident.client_id == str(client_id)).count() # Get paginated results incidents = ( db.query(SecurityIncident) .filter(SecurityIncident.client_id == str(client_id)) .order_by(SecurityIncident.incident_date.desc()) .offset(skip) .limit(limit) .all() ) return incidents, total def get_security_incidents_by_status( db: Session, status_filter: str, skip: int = 0, limit: int = 100 ) -> tuple[list[SecurityIncident], int]: """ Retrieve security incidents by status. Args: db: Database session status_filter: Status to filter by (investigating, contained, resolved, monitoring) skip: Number of records to skip limit: Maximum number of records to return Returns: tuple: (list of security incidents, total count) Example: ```python incidents, total = get_security_incidents_by_status(db, "investigating", skip=0, limit=50) print(f"Found {total} incidents under investigation") ``` """ # Get total count for this status total = db.query(SecurityIncident).filter(SecurityIncident.status == status_filter).count() # Get paginated results incidents = ( db.query(SecurityIncident) .filter(SecurityIncident.status == status_filter) .order_by(SecurityIncident.incident_date.desc()) .offset(skip) .limit(limit) .all() ) return incidents, total def create_security_incident(db: Session, incident_data: SecurityIncidentCreate) -> SecurityIncident: """ Create a new security incident. Args: db: Database session incident_data: Security incident creation data Returns: SecurityIncident: The created security incident object Raises: HTTPException: 500 if database error occurs Example: ```python incident_data = SecurityIncidentCreate( client_id="client-uuid", incident_type="malware", incident_date=datetime.utcnow(), severity="high", description="Malware detected on workstation", status="investigating" ) incident = create_security_incident(db, incident_data) print(f"Created incident: {incident.id}") ``` """ try: # Convert Pydantic model to dict data = incident_data.model_dump() # Convert UUID fields to strings if data.get("client_id"): data["client_id"] = str(data["client_id"]) if data.get("service_id"): data["service_id"] = str(data["service_id"]) if data.get("infrastructure_id"): data["infrastructure_id"] = str(data["infrastructure_id"]) # Create new security incident instance db_incident = SecurityIncident(**data) # Add to database db.add(db_incident) db.commit() db.refresh(db_incident) return db_incident except IntegrityError as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Database integrity error: {str(e)}" ) except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create security incident: {str(e)}" ) def update_security_incident( db: Session, incident_id: UUID, incident_data: SecurityIncidentUpdate ) -> SecurityIncident: """ Update an existing security incident. Args: db: Database session incident_id: UUID of the security incident to update incident_data: Security incident update data (only provided fields will be updated) Returns: SecurityIncident: The updated security incident object Raises: HTTPException: 404 if security incident not found HTTPException: 500 if database error occurs Example: ```python update_data = SecurityIncidentUpdate( status="contained", remediation_steps="Malware removed, system scanned clean" ) incident = update_security_incident(db, incident_id, update_data) print(f"Updated incident: {incident.status}") ``` """ # Get existing security incident incident = get_security_incident_by_id(db, incident_id) try: # Update only provided fields update_data = incident_data.model_dump(exclude_unset=True) # Convert UUID fields to strings if "client_id" in update_data and update_data["client_id"]: update_data["client_id"] = str(update_data["client_id"]) if "service_id" in update_data and update_data["service_id"]: update_data["service_id"] = str(update_data["service_id"]) if "infrastructure_id" in update_data and update_data["infrastructure_id"]: update_data["infrastructure_id"] = str(update_data["infrastructure_id"]) # Apply updates for field, value in update_data.items(): setattr(incident, field, value) db.commit() db.refresh(incident) return incident except HTTPException: db.rollback() raise except IntegrityError as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Database integrity error: {str(e)}" ) except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update security incident: {str(e)}" ) def delete_security_incident(db: Session, incident_id: UUID) -> dict: """ Delete a security incident by its ID. Args: db: Database session incident_id: UUID of the security incident to delete Returns: dict: Success message Raises: HTTPException: 404 if security incident not found HTTPException: 500 if database error occurs Example: ```python result = delete_security_incident(db, incident_id) print(result["message"]) # "Security incident deleted successfully" ``` """ # Get existing security incident (raises 404 if not found) incident = get_security_incident_by_id(db, incident_id) try: db.delete(incident) db.commit() return { "message": "Security incident deleted successfully", "incident_id": str(incident_id) } except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete security incident: {str(e)}" )