Files
claudetools/api/services/firewall_rule_service.py
Mike Swanson 390b10b32c Complete Phase 6: MSP Work Tracking with Context Recall System
Implements production-ready MSP platform with cross-machine persistent memory for Claude.

API Implementation:
- 130 REST API endpoints across 21 entities
- JWT authentication on all endpoints
- AES-256-GCM encryption for credentials
- Automatic audit logging
- Complete OpenAPI documentation

Database:
- 43 tables in MariaDB (172.16.3.20:3306)
- 42 SQLAlchemy models with modern 2.0 syntax
- Full Alembic migration system
- 99.1% CRUD test pass rate

Context Recall System (Phase 6):
- Cross-machine persistent memory via database
- Automatic context injection via Claude Code hooks
- Automatic context saving after task completion
- 90-95% token reduction with compression utilities
- Relevance scoring with time decay
- Tag-based semantic search
- One-command setup script

Security Features:
- JWT tokens with Argon2 password hashing
- AES-256-GCM encryption for all sensitive data
- Comprehensive audit trail for credentials
- HMAC tamper detection
- Secure configuration management

Test Results:
- Phase 3: 38/38 CRUD tests passing (100%)
- Phase 4: 34/35 core API tests passing (97.1%)
- Phase 5: 62/62 extended API tests passing (100%)
- Phase 6: 10/10 compression tests passing (100%)
- Overall: 144/145 tests passing (99.3%)

Documentation:
- Comprehensive architecture guides
- Setup automation scripts
- API documentation at /api/docs
- Complete test reports
- Troubleshooting guides

Project Status: 95% Complete (Production-Ready)
Phase 7 (optional work context APIs) remains for future enhancement.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-17 06:00:26 -07:00

368 lines
11 KiB
Python

"""
Firewall rule service layer for business logic and database operations.
This module handles all database operations for firewall rules, providing a clean
separation between the API routes and data access layer.
"""
from typing import Optional
from uuid import UUID
from fastapi import HTTPException, status
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from api.models.firewall_rule import FirewallRule
from api.models.infrastructure import Infrastructure
from api.schemas.firewall_rule import FirewallRuleCreate, FirewallRuleUpdate
def get_firewall_rules(db: Session, skip: int = 0, limit: int = 100) -> tuple[list[FirewallRule], int]:
"""
Retrieve a paginated list of firewall rules.
Args:
db: Database session
skip: Number of records to skip (for pagination)
limit: Maximum number of records to return
Returns:
tuple: (list of firewall rules, total count)
Example:
```python
rules, total = get_firewall_rules(db, skip=0, limit=50)
print(f"Retrieved {len(rules)} of {total} firewall rules")
```
"""
# Get total count
total = db.query(FirewallRule).count()
# Get paginated results, ordered by created_at descending (newest first)
rules = (
db.query(FirewallRule)
.order_by(FirewallRule.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return rules, total
def get_firewall_rule_by_id(db: Session, firewall_rule_id: UUID) -> FirewallRule:
"""
Retrieve a single firewall rule by its ID.
Args:
db: Database session
firewall_rule_id: UUID of the firewall rule to retrieve
Returns:
FirewallRule: The firewall rule object
Raises:
HTTPException: 404 if firewall rule not found
Example:
```python
rule = get_firewall_rule_by_id(db, firewall_rule_id)
print(f"Found rule: {rule.rule_name}")
```
"""
rule = db.query(FirewallRule).filter(FirewallRule.id == str(firewall_rule_id)).first()
if not rule:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Firewall rule with ID {firewall_rule_id} not found"
)
return rule
def get_firewall_rules_by_infrastructure(db: Session, infrastructure_id: UUID, skip: int = 0, limit: int = 100) -> tuple[list[FirewallRule], int]:
"""
Retrieve firewall rules belonging to a specific infrastructure.
Args:
db: Database session
infrastructure_id: UUID of the infrastructure
skip: Number of records to skip (for pagination)
limit: Maximum number of records to return
Returns:
tuple: (list of firewall rules, total count for this infrastructure)
Raises:
HTTPException: 404 if infrastructure not found
Example:
```python
rules, total = get_firewall_rules_by_infrastructure(db, infrastructure_id, skip=0, limit=50)
print(f"Retrieved {len(rules)} of {total} firewall rules for infrastructure")
```
"""
# Verify infrastructure exists
infrastructure = db.query(Infrastructure).filter(Infrastructure.id == str(infrastructure_id)).first()
if not infrastructure:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Infrastructure with ID {infrastructure_id} not found"
)
# Get total count for this infrastructure
total = db.query(FirewallRule).filter(FirewallRule.infrastructure_id == str(infrastructure_id)).count()
# Get paginated results
rules = (
db.query(FirewallRule)
.filter(FirewallRule.infrastructure_id == str(infrastructure_id))
.order_by(FirewallRule.rule_order.asc(), FirewallRule.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return rules, total
def get_firewall_rules_by_action(db: Session, action: str, skip: int = 0, limit: int = 100) -> tuple[list[FirewallRule], int]:
"""
Retrieve firewall rules by action type (allow, deny, drop).
Args:
db: Database session
action: Action type to filter by (allow, deny, drop)
skip: Number of records to skip (for pagination)
limit: Maximum number of records to return
Returns:
tuple: (list of firewall rules, total count for this action)
Raises:
HTTPException: 422 if invalid action provided
Example:
```python
rules, total = get_firewall_rules_by_action(db, "allow", skip=0, limit=50)
print(f"Retrieved {len(rules)} of {total} allow rules")
```
"""
# Validate action
valid_actions = ["allow", "deny", "drop"]
if action not in valid_actions:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid action '{action}'. Must be one of: {', '.join(valid_actions)}"
)
# Get total count for this action
total = db.query(FirewallRule).filter(FirewallRule.action == action).count()
# Get paginated results
rules = (
db.query(FirewallRule)
.filter(FirewallRule.action == action)
.order_by(FirewallRule.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return rules, total
def create_firewall_rule(db: Session, firewall_rule_data: FirewallRuleCreate) -> FirewallRule:
"""
Create a new firewall rule.
Args:
db: Database session
firewall_rule_data: Firewall rule creation data
Returns:
FirewallRule: The created firewall rule object
Raises:
HTTPException: 404 if infrastructure not found
HTTPException: 422 if invalid action provided
HTTPException: 500 if database error occurs
Example:
```python
rule_data = FirewallRuleCreate(
infrastructure_id="123e4567-e89b-12d3-a456-426614174000",
rule_name="Allow SSH",
action="allow",
port=22
)
rule = create_firewall_rule(db, rule_data)
print(f"Created firewall rule: {rule.id}")
```
"""
# Verify infrastructure exists if provided
if firewall_rule_data.infrastructure_id:
infrastructure = db.query(Infrastructure).filter(
Infrastructure.id == str(firewall_rule_data.infrastructure_id)
).first()
if not infrastructure:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Infrastructure with ID {firewall_rule_data.infrastructure_id} not found"
)
# Validate action if provided
if firewall_rule_data.action:
valid_actions = ["allow", "deny", "drop"]
if firewall_rule_data.action not in valid_actions:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid action '{firewall_rule_data.action}'. Must be one of: {', '.join(valid_actions)}"
)
try:
# Create new firewall rule instance
db_rule = FirewallRule(**firewall_rule_data.model_dump())
# Add to database
db.add(db_rule)
db.commit()
db.refresh(db_rule)
return db_rule
except IntegrityError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create firewall rule: {str(e)}"
)
def update_firewall_rule(db: Session, firewall_rule_id: UUID, firewall_rule_data: FirewallRuleUpdate) -> FirewallRule:
"""
Update an existing firewall rule.
Args:
db: Database session
firewall_rule_id: UUID of the firewall rule to update
firewall_rule_data: Firewall rule update data (only provided fields will be updated)
Returns:
FirewallRule: The updated firewall rule object
Raises:
HTTPException: 404 if firewall rule or infrastructure not found
HTTPException: 422 if invalid action provided
HTTPException: 500 if database error occurs
Example:
```python
update_data = FirewallRuleUpdate(
rule_name="Allow SSH - Updated",
action="deny"
)
rule = update_firewall_rule(db, firewall_rule_id, update_data)
print(f"Updated firewall rule: {rule.rule_name}")
```
"""
# Get existing firewall rule
rule = get_firewall_rule_by_id(db, firewall_rule_id)
try:
# Update only provided fields
update_data = firewall_rule_data.model_dump(exclude_unset=True)
# If updating infrastructure_id, verify new infrastructure exists
if "infrastructure_id" in update_data and update_data["infrastructure_id"]:
infrastructure = db.query(Infrastructure).filter(
Infrastructure.id == str(update_data["infrastructure_id"])
).first()
if not infrastructure:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Infrastructure with ID {update_data['infrastructure_id']} not found"
)
# Validate action if provided
if "action" in update_data and update_data["action"]:
valid_actions = ["allow", "deny", "drop"]
if update_data["action"] not in valid_actions:
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=f"Invalid action '{update_data['action']}'. Must be one of: {', '.join(valid_actions)}"
)
# Apply updates
for field, value in update_data.items():
setattr(rule, field, value)
db.commit()
db.refresh(rule)
return rule
except HTTPException:
db.rollback()
raise
except IntegrityError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update firewall rule: {str(e)}"
)
def delete_firewall_rule(db: Session, firewall_rule_id: UUID) -> dict:
"""
Delete a firewall rule by its ID.
Args:
db: Database session
firewall_rule_id: UUID of the firewall rule to delete
Returns:
dict: Success message
Raises:
HTTPException: 404 if firewall rule not found
HTTPException: 500 if database error occurs
Example:
```python
result = delete_firewall_rule(db, firewall_rule_id)
print(result["message"]) # "Firewall rule deleted successfully"
```
"""
# Get existing firewall rule (raises 404 if not found)
rule = get_firewall_rule_by_id(db, firewall_rule_id)
try:
db.delete(rule)
db.commit()
return {
"message": "Firewall rule deleted successfully",
"firewall_rule_id": str(firewall_rule_id)
}
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete firewall rule: {str(e)}"
)