""" Firewall rule service layer for business logic and database operations. This module handles all database operations for firewall rules, providing a clean separation between the API routes and data access layer. """ from typing import Optional from uuid import UUID from fastapi import HTTPException, status from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from api.models.firewall_rule import FirewallRule from api.models.infrastructure import Infrastructure from api.schemas.firewall_rule import FirewallRuleCreate, FirewallRuleUpdate def get_firewall_rules(db: Session, skip: int = 0, limit: int = 100) -> tuple[list[FirewallRule], int]: """ Retrieve a paginated list of firewall rules. Args: db: Database session skip: Number of records to skip (for pagination) limit: Maximum number of records to return Returns: tuple: (list of firewall rules, total count) Example: ```python rules, total = get_firewall_rules(db, skip=0, limit=50) print(f"Retrieved {len(rules)} of {total} firewall rules") ``` """ # Get total count total = db.query(FirewallRule).count() # Get paginated results, ordered by created_at descending (newest first) rules = ( db.query(FirewallRule) .order_by(FirewallRule.created_at.desc()) .offset(skip) .limit(limit) .all() ) return rules, total def get_firewall_rule_by_id(db: Session, firewall_rule_id: UUID) -> FirewallRule: """ Retrieve a single firewall rule by its ID. Args: db: Database session firewall_rule_id: UUID of the firewall rule to retrieve Returns: FirewallRule: The firewall rule object Raises: HTTPException: 404 if firewall rule not found Example: ```python rule = get_firewall_rule_by_id(db, firewall_rule_id) print(f"Found rule: {rule.rule_name}") ``` """ rule = db.query(FirewallRule).filter(FirewallRule.id == str(firewall_rule_id)).first() if not rule: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Firewall rule with ID {firewall_rule_id} not found" ) return rule def get_firewall_rules_by_infrastructure(db: Session, infrastructure_id: UUID, skip: int = 0, limit: int = 100) -> tuple[list[FirewallRule], int]: """ Retrieve firewall rules belonging to a specific infrastructure. Args: db: Database session infrastructure_id: UUID of the infrastructure skip: Number of records to skip (for pagination) limit: Maximum number of records to return Returns: tuple: (list of firewall rules, total count for this infrastructure) Raises: HTTPException: 404 if infrastructure not found Example: ```python rules, total = get_firewall_rules_by_infrastructure(db, infrastructure_id, skip=0, limit=50) print(f"Retrieved {len(rules)} of {total} firewall rules for infrastructure") ``` """ # Verify infrastructure exists infrastructure = db.query(Infrastructure).filter(Infrastructure.id == str(infrastructure_id)).first() if not infrastructure: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Infrastructure with ID {infrastructure_id} not found" ) # Get total count for this infrastructure total = db.query(FirewallRule).filter(FirewallRule.infrastructure_id == str(infrastructure_id)).count() # Get paginated results rules = ( db.query(FirewallRule) .filter(FirewallRule.infrastructure_id == str(infrastructure_id)) .order_by(FirewallRule.rule_order.asc(), FirewallRule.created_at.desc()) .offset(skip) .limit(limit) .all() ) return rules, total def get_firewall_rules_by_action(db: Session, action: str, skip: int = 0, limit: int = 100) -> tuple[list[FirewallRule], int]: """ Retrieve firewall rules by action type (allow, deny, drop). Args: db: Database session action: Action type to filter by (allow, deny, drop) skip: Number of records to skip (for pagination) limit: Maximum number of records to return Returns: tuple: (list of firewall rules, total count for this action) Raises: HTTPException: 422 if invalid action provided Example: ```python rules, total = get_firewall_rules_by_action(db, "allow", skip=0, limit=50) print(f"Retrieved {len(rules)} of {total} allow rules") ``` """ # Validate action valid_actions = ["allow", "deny", "drop"] if action not in valid_actions: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid action '{action}'. Must be one of: {', '.join(valid_actions)}" ) # Get total count for this action total = db.query(FirewallRule).filter(FirewallRule.action == action).count() # Get paginated results rules = ( db.query(FirewallRule) .filter(FirewallRule.action == action) .order_by(FirewallRule.created_at.desc()) .offset(skip) .limit(limit) .all() ) return rules, total def create_firewall_rule(db: Session, firewall_rule_data: FirewallRuleCreate) -> FirewallRule: """ Create a new firewall rule. Args: db: Database session firewall_rule_data: Firewall rule creation data Returns: FirewallRule: The created firewall rule object Raises: HTTPException: 404 if infrastructure not found HTTPException: 422 if invalid action provided HTTPException: 500 if database error occurs Example: ```python rule_data = FirewallRuleCreate( infrastructure_id="123e4567-e89b-12d3-a456-426614174000", rule_name="Allow SSH", action="allow", port=22 ) rule = create_firewall_rule(db, rule_data) print(f"Created firewall rule: {rule.id}") ``` """ # Verify infrastructure exists if provided if firewall_rule_data.infrastructure_id: infrastructure = db.query(Infrastructure).filter( Infrastructure.id == str(firewall_rule_data.infrastructure_id) ).first() if not infrastructure: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Infrastructure with ID {firewall_rule_data.infrastructure_id} not found" ) # Validate action if provided if firewall_rule_data.action: valid_actions = ["allow", "deny", "drop"] if firewall_rule_data.action not in valid_actions: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid action '{firewall_rule_data.action}'. Must be one of: {', '.join(valid_actions)}" ) try: # Create new firewall rule instance db_rule = FirewallRule(**firewall_rule_data.model_dump()) # Add to database db.add(db_rule) db.commit() db.refresh(db_rule) return db_rule except IntegrityError as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Database error: {str(e)}" ) except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to create firewall rule: {str(e)}" ) def update_firewall_rule(db: Session, firewall_rule_id: UUID, firewall_rule_data: FirewallRuleUpdate) -> FirewallRule: """ Update an existing firewall rule. Args: db: Database session firewall_rule_id: UUID of the firewall rule to update firewall_rule_data: Firewall rule update data (only provided fields will be updated) Returns: FirewallRule: The updated firewall rule object Raises: HTTPException: 404 if firewall rule or infrastructure not found HTTPException: 422 if invalid action provided HTTPException: 500 if database error occurs Example: ```python update_data = FirewallRuleUpdate( rule_name="Allow SSH - Updated", action="deny" ) rule = update_firewall_rule(db, firewall_rule_id, update_data) print(f"Updated firewall rule: {rule.rule_name}") ``` """ # Get existing firewall rule rule = get_firewall_rule_by_id(db, firewall_rule_id) try: # Update only provided fields update_data = firewall_rule_data.model_dump(exclude_unset=True) # If updating infrastructure_id, verify new infrastructure exists if "infrastructure_id" in update_data and update_data["infrastructure_id"]: infrastructure = db.query(Infrastructure).filter( Infrastructure.id == str(update_data["infrastructure_id"]) ).first() if not infrastructure: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Infrastructure with ID {update_data['infrastructure_id']} not found" ) # Validate action if provided if "action" in update_data and update_data["action"]: valid_actions = ["allow", "deny", "drop"] if update_data["action"] not in valid_actions: raise HTTPException( status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail=f"Invalid action '{update_data['action']}'. Must be one of: {', '.join(valid_actions)}" ) # Apply updates for field, value in update_data.items(): setattr(rule, field, value) db.commit() db.refresh(rule) return rule except HTTPException: db.rollback() raise except IntegrityError as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Database error: {str(e)}" ) except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to update firewall rule: {str(e)}" ) def delete_firewall_rule(db: Session, firewall_rule_id: UUID) -> dict: """ Delete a firewall rule by its ID. Args: db: Database session firewall_rule_id: UUID of the firewall rule to delete Returns: dict: Success message Raises: HTTPException: 404 if firewall rule not found HTTPException: 500 if database error occurs Example: ```python result = delete_firewall_rule(db, firewall_rule_id) print(result["message"]) # "Firewall rule deleted successfully" ``` """ # Get existing firewall rule (raises 404 if not found) rule = get_firewall_rule_by_id(db, firewall_rule_id) try: db.delete(rule) db.commit() return { "message": "Firewall rule deleted successfully", "firewall_rule_id": str(firewall_rule_id) } except Exception as e: db.rollback() raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to delete firewall rule: {str(e)}" )