Files
claudetools/api/services/credential_service.py
Mike Swanson 390b10b32c Complete Phase 6: MSP Work Tracking with Context Recall System
Implements production-ready MSP platform with cross-machine persistent memory for Claude.

API Implementation:
- 130 REST API endpoints across 21 entities
- JWT authentication on all endpoints
- AES-256-GCM encryption for credentials
- Automatic audit logging
- Complete OpenAPI documentation

Database:
- 43 tables in MariaDB (172.16.3.20:3306)
- 42 SQLAlchemy models with modern 2.0 syntax
- Full Alembic migration system
- 99.1% CRUD test pass rate

Context Recall System (Phase 6):
- Cross-machine persistent memory via database
- Automatic context injection via Claude Code hooks
- Automatic context saving after task completion
- 90-95% token reduction with compression utilities
- Relevance scoring with time decay
- Tag-based semantic search
- One-command setup script

Security Features:
- JWT tokens with Argon2 password hashing
- AES-256-GCM encryption for all sensitive data
- Comprehensive audit trail for credentials
- HMAC tamper detection
- Secure configuration management

Test Results:
- Phase 3: 38/38 CRUD tests passing (100%)
- Phase 4: 34/35 core API tests passing (97.1%)
- Phase 5: 62/62 extended API tests passing (100%)
- Phase 6: 10/10 compression tests passing (100%)
- Overall: 144/145 tests passing (99.3%)

Documentation:
- Comprehensive architecture guides
- Setup automation scripts
- API documentation at /api/docs
- Complete test reports
- Troubleshooting guides

Project Status: 95% Complete (Production-Ready)
Phase 7 (optional work context APIs) remains for future enhancement.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-17 06:00:26 -07:00

494 lines
16 KiB
Python

"""
Credential service layer for business logic and database operations.
This module handles all database operations for credentials with encryption,
providing secure storage and retrieval of sensitive authentication data.
"""
import json
from datetime import datetime
from typing import Optional
from uuid import UUID
from fastapi import HTTPException, status
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from api.models.credential import Credential
from api.models.credential_audit_log import CredentialAuditLog
from api.schemas.credential import CredentialCreate, CredentialUpdate
from api.utils.crypto import encrypt_string, decrypt_string
def _create_audit_log(
db: Session,
credential_id: str,
action: str,
user_id: str,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
details: Optional[dict] = None,
) -> None:
"""
Create an audit log entry for credential operations.
Args:
db: Database session
credential_id: ID of the credential being accessed
action: Action performed (view, create, update, delete, rotate, decrypt)
user_id: User performing the action
ip_address: Optional IP address of the user
user_agent: Optional user agent string
details: Optional dictionary with additional context (will be JSON serialized)
Note:
This is an internal helper function. Never log decrypted passwords.
"""
try:
audit_entry = CredentialAuditLog(
credential_id=credential_id,
action=action,
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
details=json.dumps(details) if details else None,
)
db.add(audit_entry)
db.commit()
except Exception as e:
# Log but don't fail the operation if audit logging fails
db.rollback()
print(f"Warning: Failed to create audit log: {str(e)}")
def get_credentials(db: Session, skip: int = 0, limit: int = 100) -> tuple[list[Credential], int]:
"""
Retrieve a paginated list of credentials.
Args:
db: Database session
skip: Number of records to skip (for pagination)
limit: Maximum number of records to return
Returns:
tuple: (list of credentials, total count)
Example:
```python
credentials, total = get_credentials(db, skip=0, limit=50)
print(f"Retrieved {len(credentials)} of {total} credentials")
```
"""
# Get total count
total = db.query(Credential).count()
# Get paginated results, ordered by created_at descending (newest first)
credentials = (
db.query(Credential)
.order_by(Credential.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return credentials, total
def get_credential_by_id(db: Session, credential_id: UUID, user_id: Optional[str] = None) -> Credential:
"""
Retrieve a single credential by its ID.
Args:
db: Database session
credential_id: UUID of the credential to retrieve
user_id: Optional user ID for audit logging
Returns:
Credential: The credential object
Raises:
HTTPException: 404 if credential not found
Example:
```python
credential = get_credential_by_id(db, credential_id, user_id="user123")
print(f"Found credential: {credential.service_name}")
```
"""
credential = db.query(Credential).filter(Credential.id == str(credential_id)).first()
if not credential:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Credential with ID {credential_id} not found"
)
# Create audit log for view action
if user_id:
_create_audit_log(
db=db,
credential_id=str(credential_id),
action="view",
user_id=user_id,
details={"service_name": credential.service_name}
)
return credential
def get_credentials_by_client(
db: Session,
client_id: UUID,
skip: int = 0,
limit: int = 100
) -> tuple[list[Credential], int]:
"""
Retrieve credentials for a specific client.
Args:
db: Database session
client_id: UUID of the client
skip: Number of records to skip
limit: Maximum number of records to return
Returns:
tuple: (list of credentials, total count)
Example:
```python
credentials, total = get_credentials_by_client(db, client_id, skip=0, limit=50)
print(f"Client has {total} credentials")
```
"""
# Get total count for this client
total = db.query(Credential).filter(Credential.client_id == str(client_id)).count()
# Get paginated results
credentials = (
db.query(Credential)
.filter(Credential.client_id == str(client_id))
.order_by(Credential.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return credentials, total
def create_credential(
db: Session,
credential_data: CredentialCreate,
user_id: str,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
) -> Credential:
"""
Create a new credential with encryption.
Args:
db: Database session
credential_data: Credential creation data
user_id: User creating the credential
ip_address: Optional IP address
user_agent: Optional user agent string
Returns:
Credential: The created credential object
Raises:
HTTPException: 500 if database error occurs
Example:
```python
credential_data = CredentialCreate(
service_name="Gitea Admin",
credential_type="password",
username="admin",
password="SecurePassword123!"
)
credential = create_credential(db, credential_data, user_id="user123")
print(f"Created credential: {credential.id}")
```
Security:
All sensitive fields (password, api_key, etc.) are encrypted before storage.
"""
try:
# Convert Pydantic model to dict, excluding unset values
data = credential_data.model_dump(exclude_unset=True)
# Encrypt sensitive fields if present
if "password" in data and data["password"]:
encrypted_password = encrypt_string(data["password"])
data["password_encrypted"] = encrypted_password.encode('utf-8')
del data["password"]
if "api_key" in data and data["api_key"]:
encrypted_api_key = encrypt_string(data["api_key"])
data["api_key_encrypted"] = encrypted_api_key.encode('utf-8')
del data["api_key"]
if "client_secret" in data and data["client_secret"]:
encrypted_secret = encrypt_string(data["client_secret"])
data["client_secret_encrypted"] = encrypted_secret.encode('utf-8')
del data["client_secret"]
if "token" in data and data["token"]:
encrypted_token = encrypt_string(data["token"])
data["token_encrypted"] = encrypted_token.encode('utf-8')
del data["token"]
if "connection_string" in data and data["connection_string"]:
encrypted_conn = encrypt_string(data["connection_string"])
data["connection_string_encrypted"] = encrypted_conn.encode('utf-8')
del data["connection_string"]
# Convert UUID fields to strings
if "client_id" in data and data["client_id"]:
data["client_id"] = str(data["client_id"])
if "service_id" in data and data["service_id"]:
data["service_id"] = str(data["service_id"])
if "infrastructure_id" in data and data["infrastructure_id"]:
data["infrastructure_id"] = str(data["infrastructure_id"])
# Create new credential instance
db_credential = Credential(**data)
# Add to database
db.add(db_credential)
db.commit()
db.refresh(db_credential)
# Create audit log
_create_audit_log(
db=db,
credential_id=str(db_credential.id),
action="create",
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
details={
"service_name": db_credential.service_name,
"credential_type": db_credential.credential_type
}
)
return db_credential
except IntegrityError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database integrity error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to create credential: {str(e)}"
)
def update_credential(
db: Session,
credential_id: UUID,
credential_data: CredentialUpdate,
user_id: str,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
) -> Credential:
"""
Update an existing credential with re-encryption if needed.
Args:
db: Database session
credential_id: UUID of the credential to update
credential_data: Credential update data (only provided fields will be updated)
user_id: User updating the credential
ip_address: Optional IP address
user_agent: Optional user agent string
Returns:
Credential: The updated credential object
Raises:
HTTPException: 404 if credential not found
HTTPException: 500 if database error occurs
Example:
```python
update_data = CredentialUpdate(
password="NewSecurePassword456!",
last_rotated_at=datetime.utcnow()
)
credential = update_credential(db, credential_id, update_data, user_id="user123")
print(f"Updated credential: {credential.service_name}")
```
Security:
If sensitive fields are updated, they are re-encrypted before storage.
"""
# Get existing credential
credential = get_credential_by_id(db, credential_id)
try:
# Update only provided fields
update_data = credential_data.model_dump(exclude_unset=True)
changed_fields = []
# Track what changed for audit log
for field in update_data.keys():
if field not in ["password", "api_key", "client_secret", "token", "connection_string"]:
changed_fields.append(field)
# Encrypt sensitive fields if present in update
if "password" in update_data and update_data["password"]:
encrypted_password = encrypt_string(update_data["password"])
update_data["password_encrypted"] = encrypted_password.encode('utf-8')
del update_data["password"]
changed_fields.append("password")
if "api_key" in update_data and update_data["api_key"]:
encrypted_api_key = encrypt_string(update_data["api_key"])
update_data["api_key_encrypted"] = encrypted_api_key.encode('utf-8')
del update_data["api_key"]
changed_fields.append("api_key")
if "client_secret" in update_data and update_data["client_secret"]:
encrypted_secret = encrypt_string(update_data["client_secret"])
update_data["client_secret_encrypted"] = encrypted_secret.encode('utf-8')
del update_data["client_secret"]
changed_fields.append("client_secret")
if "token" in update_data and update_data["token"]:
encrypted_token = encrypt_string(update_data["token"])
update_data["token_encrypted"] = encrypted_token.encode('utf-8')
del update_data["token"]
changed_fields.append("token")
if "connection_string" in update_data and update_data["connection_string"]:
encrypted_conn = encrypt_string(update_data["connection_string"])
update_data["connection_string_encrypted"] = encrypted_conn.encode('utf-8')
del update_data["connection_string"]
changed_fields.append("connection_string")
# Convert UUID fields to strings
if "client_id" in update_data and update_data["client_id"]:
update_data["client_id"] = str(update_data["client_id"])
if "service_id" in update_data and update_data["service_id"]:
update_data["service_id"] = str(update_data["service_id"])
if "infrastructure_id" in update_data and update_data["infrastructure_id"]:
update_data["infrastructure_id"] = str(update_data["infrastructure_id"])
# Apply updates
for field, value in update_data.items():
setattr(credential, field, value)
db.commit()
db.refresh(credential)
# Create audit log
_create_audit_log(
db=db,
credential_id=str(credential_id),
action="update",
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
details={
"changed_fields": changed_fields,
"service_name": credential.service_name
}
)
return credential
except HTTPException:
db.rollback()
raise
except IntegrityError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Database integrity error: {str(e)}"
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to update credential: {str(e)}"
)
def delete_credential(
db: Session,
credential_id: UUID,
user_id: str,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
) -> dict:
"""
Delete a credential by its ID.
Args:
db: Database session
credential_id: UUID of the credential to delete
user_id: User deleting the credential
ip_address: Optional IP address
user_agent: Optional user agent string
Returns:
dict: Success message
Raises:
HTTPException: 404 if credential not found
HTTPException: 500 if database error occurs
Example:
```python
result = delete_credential(db, credential_id, user_id="user123")
print(result["message"]) # "Credential deleted successfully"
```
Security:
Deletion is audited. The audit log is retained even after credential deletion
due to CASCADE delete behavior on the credential_audit_log table.
"""
# Get existing credential (raises 404 if not found)
credential = get_credential_by_id(db, credential_id)
# Store info for audit log before deletion
service_name = credential.service_name
credential_type = credential.credential_type
try:
# Create audit log BEFORE deletion
_create_audit_log(
db=db,
credential_id=str(credential_id),
action="delete",
user_id=user_id,
ip_address=ip_address,
user_agent=user_agent,
details={
"service_name": service_name,
"credential_type": credential_type
}
)
db.delete(credential)
db.commit()
return {
"message": "Credential deleted successfully",
"credential_id": str(credential_id)
}
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Failed to delete credential: {str(e)}"
)