""" Credentials API router for ClaudeTools. This module defines all REST API endpoints for managing credentials with encryption. """ from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from sqlalchemy.orm import Session from api.database import get_db from api.middleware.auth import get_current_user from api.schemas.credential import ( CredentialCreate, CredentialResponse, CredentialUpdate, ) from api.services import credential_service # Create router with prefix and tags router = APIRouter() def _get_user_context(request: Request, current_user: dict) -> dict: """Extract user context for audit logging.""" return { "user_id": current_user.get("sub", "unknown"), "ip_address": request.client.host if request.client else None, "user_agent": request.headers.get("user-agent"), } @router.get( "", response_model=dict, summary="List all credentials", description="Retrieve a paginated list of all credentials (decrypted for authenticated users)", status_code=status.HTTP_200_OK, ) def list_credentials( skip: int = Query( default=0, ge=0, description="Number of records to skip for pagination" ), limit: int = Query( default=100, ge=1, le=1000, description="Maximum number of records to return (max 1000)" ), db: Session = Depends(get_db), current_user: dict = Depends(get_current_user), ): """ List all credentials with pagination. - **skip**: Number of credentials to skip (default: 0) - **limit**: Maximum number of credentials to return (default: 100, max: 1000) Returns a list of credentials with pagination metadata. Sensitive fields are decrypted and returned to authenticated users. **Security Note**: This endpoint returns decrypted passwords and keys. Ensure proper authentication and authorization before calling. """ try: credentials, total = credential_service.get_credentials(db, skip, limit) # Convert to response models with decryption response_credentials = [] for cred in credentials: # Map encrypted fields to decrypted field names for the response schema cred_dict = { "id": cred.id, "client_id": cred.client_id, "service_id": cred.service_id, "infrastructure_id": cred.infrastructure_id, "credential_type": cred.credential_type, "service_name": cred.service_name, "username": cred.username, "password": cred.password_encrypted, # Will be decrypted by validator "api_key": cred.api_key_encrypted, # Will be decrypted by validator "client_id_oauth": cred.client_id_oauth, "client_secret": cred.client_secret_encrypted, # Will be decrypted by validator "tenant_id_oauth": cred.tenant_id_oauth, "public_key": cred.public_key, "token": cred.token_encrypted, # Will be decrypted by validator "connection_string": cred.connection_string_encrypted, # Will be decrypted by validator "integration_code": cred.integration_code, "external_url": cred.external_url, "internal_url": cred.internal_url, "custom_port": cred.custom_port, "role_description": cred.role_description, "requires_vpn": cred.requires_vpn, "requires_2fa": cred.requires_2fa, "ssh_key_auth_enabled": cred.ssh_key_auth_enabled, "access_level": cred.access_level, "expires_at": cred.expires_at, "last_rotated_at": cred.last_rotated_at, "is_active": cred.is_active, "created_at": cred.created_at, "updated_at": cred.updated_at, } response_credentials.append(CredentialResponse(**cred_dict)) return { "total": total, "skip": skip, "limit": limit, "credentials": response_credentials } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve credentials: {str(e)}" ) @router.get( "/{credential_id}", response_model=CredentialResponse, summary="Get credential by ID", description="Retrieve a single credential by its unique identifier (decrypted)", status_code=status.HTTP_200_OK, ) def get_credential( credential_id: UUID, request: Request, db: Session = Depends(get_db), current_user: dict = Depends(get_current_user), ): """ Get a specific credential by ID. - **credential_id**: UUID of the credential to retrieve Returns the complete credential details with decrypted sensitive fields. This action is logged in the audit trail. **Security Note**: This endpoint returns decrypted passwords and keys. """ user_ctx = _get_user_context(request, current_user) credential = credential_service.get_credential_by_id(db, credential_id, user_id=user_ctx["user_id"]) # Map encrypted fields to decrypted field names cred_dict = { "id": credential.id, "client_id": credential.client_id, "service_id": credential.service_id, "infrastructure_id": credential.infrastructure_id, "credential_type": credential.credential_type, "service_name": credential.service_name, "username": credential.username, "password": credential.password_encrypted, "api_key": credential.api_key_encrypted, "client_id_oauth": credential.client_id_oauth, "client_secret": credential.client_secret_encrypted, "tenant_id_oauth": credential.tenant_id_oauth, "public_key": credential.public_key, "token": credential.token_encrypted, "connection_string": credential.connection_string_encrypted, "integration_code": credential.integration_code, "external_url": credential.external_url, "internal_url": credential.internal_url, "custom_port": credential.custom_port, "role_description": credential.role_description, "requires_vpn": credential.requires_vpn, "requires_2fa": credential.requires_2fa, "ssh_key_auth_enabled": credential.ssh_key_auth_enabled, "access_level": credential.access_level, "expires_at": credential.expires_at, "last_rotated_at": credential.last_rotated_at, "is_active": credential.is_active, "created_at": credential.created_at, "updated_at": credential.updated_at, } return CredentialResponse(**cred_dict) @router.post( "", response_model=CredentialResponse, summary="Create new credential", description="Create a new credential with encryption of sensitive fields", status_code=status.HTTP_201_CREATED, ) def create_credential( credential_data: CredentialCreate, request: Request, db: Session = Depends(get_db), current_user: dict = Depends(get_current_user), ): """ Create a new credential. Sensitive fields (password, api_key, client_secret, token, connection_string) are automatically encrypted before storage. This action is logged in the audit trail. Requires a valid JWT token with appropriate permissions. **Security Note**: Plaintext credentials are never logged or stored unencrypted. """ user_ctx = _get_user_context(request, current_user) credential = credential_service.create_credential( db, credential_data, user_id=user_ctx["user_id"], ip_address=user_ctx["ip_address"], user_agent=user_ctx["user_agent"], ) # Map encrypted fields to decrypted field names cred_dict = { "id": credential.id, "client_id": credential.client_id, "service_id": credential.service_id, "infrastructure_id": credential.infrastructure_id, "credential_type": credential.credential_type, "service_name": credential.service_name, "username": credential.username, "password": credential.password_encrypted, "api_key": credential.api_key_encrypted, "client_id_oauth": credential.client_id_oauth, "client_secret": credential.client_secret_encrypted, "tenant_id_oauth": credential.tenant_id_oauth, "public_key": credential.public_key, "token": credential.token_encrypted, "connection_string": credential.connection_string_encrypted, "integration_code": credential.integration_code, "external_url": credential.external_url, "internal_url": credential.internal_url, "custom_port": credential.custom_port, "role_description": credential.role_description, "requires_vpn": credential.requires_vpn, "requires_2fa": credential.requires_2fa, "ssh_key_auth_enabled": credential.ssh_key_auth_enabled, "access_level": credential.access_level, "expires_at": credential.expires_at, "last_rotated_at": credential.last_rotated_at, "is_active": credential.is_active, "created_at": credential.created_at, "updated_at": credential.updated_at, } return CredentialResponse(**cred_dict) @router.put( "/{credential_id}", response_model=CredentialResponse, summary="Update credential", description="Update an existing credential's details with re-encryption if needed", status_code=status.HTTP_200_OK, ) def update_credential( credential_id: UUID, credential_data: CredentialUpdate, request: Request, db: Session = Depends(get_db), current_user: dict = Depends(get_current_user), ): """ Update an existing credential. - **credential_id**: UUID of the credential to update Only provided fields will be updated. All fields are optional. If sensitive fields are updated, they are re-encrypted. This action is logged. **Security Note**: Updated credentials are re-encrypted before storage. """ user_ctx = _get_user_context(request, current_user) credential = credential_service.update_credential( db, credential_id, credential_data, user_id=user_ctx["user_id"], ip_address=user_ctx["ip_address"], user_agent=user_ctx["user_agent"], ) # Map encrypted fields to decrypted field names cred_dict = { "id": credential.id, "client_id": credential.client_id, "service_id": credential.service_id, "infrastructure_id": credential.infrastructure_id, "credential_type": credential.credential_type, "service_name": credential.service_name, "username": credential.username, "password": credential.password_encrypted, "api_key": credential.api_key_encrypted, "client_id_oauth": credential.client_id_oauth, "client_secret": credential.client_secret_encrypted, "tenant_id_oauth": credential.tenant_id_oauth, "public_key": credential.public_key, "token": credential.token_encrypted, "connection_string": credential.connection_string_encrypted, "integration_code": credential.integration_code, "external_url": credential.external_url, "internal_url": credential.internal_url, "custom_port": credential.custom_port, "role_description": credential.role_description, "requires_vpn": credential.requires_vpn, "requires_2fa": credential.requires_2fa, "ssh_key_auth_enabled": credential.ssh_key_auth_enabled, "access_level": credential.access_level, "expires_at": credential.expires_at, "last_rotated_at": credential.last_rotated_at, "is_active": credential.is_active, "created_at": credential.created_at, "updated_at": credential.updated_at, } return CredentialResponse(**cred_dict) @router.delete( "/{credential_id}", response_model=dict, summary="Delete credential", description="Delete a credential by its ID", status_code=status.HTTP_200_OK, ) def delete_credential( credential_id: UUID, request: Request, db: Session = Depends(get_db), current_user: dict = Depends(get_current_user), ): """ Delete a credential. - **credential_id**: UUID of the credential to delete This is a permanent operation and cannot be undone. The deletion is logged in the audit trail. **Security Note**: Audit logs are retained after credential deletion. """ user_ctx = _get_user_context(request, current_user) return credential_service.delete_credential( db, credential_id, user_id=user_ctx["user_id"], ip_address=user_ctx["ip_address"], user_agent=user_ctx["user_agent"], ) @router.get( "/by-client/{client_id}", response_model=dict, summary="Get credentials by client", description="Retrieve all credentials for a specific client", status_code=status.HTTP_200_OK, ) def get_credentials_by_client( client_id: UUID, skip: int = Query(default=0, ge=0, description="Number of records to skip"), limit: int = Query(default=100, ge=1, le=1000, description="Maximum number of records to return"), db: Session = Depends(get_db), current_user: dict = Depends(get_current_user), ): """ Get all credentials associated with a specific client. - **client_id**: UUID of the client - **skip**: Number of credentials to skip (default: 0) - **limit**: Maximum number of credentials to return (default: 100, max: 1000) Returns credentials with decrypted sensitive fields. """ try: credentials, total = credential_service.get_credentials_by_client(db, client_id, skip, limit) # Convert to response models with decryption response_credentials = [] for cred in credentials: cred_dict = { "id": cred.id, "client_id": cred.client_id, "service_id": cred.service_id, "infrastructure_id": cred.infrastructure_id, "credential_type": cred.credential_type, "service_name": cred.service_name, "username": cred.username, "password": cred.password_encrypted, "api_key": cred.api_key_encrypted, "client_id_oauth": cred.client_id_oauth, "client_secret": cred.client_secret_encrypted, "tenant_id_oauth": cred.tenant_id_oauth, "public_key": cred.public_key, "token": cred.token_encrypted, "connection_string": cred.connection_string_encrypted, "integration_code": cred.integration_code, "external_url": cred.external_url, "internal_url": cred.internal_url, "custom_port": cred.custom_port, "role_description": cred.role_description, "requires_vpn": cred.requires_vpn, "requires_2fa": cred.requires_2fa, "ssh_key_auth_enabled": cred.ssh_key_auth_enabled, "access_level": cred.access_level, "expires_at": cred.expires_at, "last_rotated_at": cred.last_rotated_at, "is_active": cred.is_active, "created_at": cred.created_at, "updated_at": cred.updated_at, } response_credentials.append(CredentialResponse(**cred_dict)) return { "total": total, "skip": skip, "limit": limit, "client_id": str(client_id), "credentials": response_credentials } except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Failed to retrieve credentials for client: {str(e)}" )