""" Pydantic schemas for Credential model. Request and response schemas for secure credential storage. """ from datetime import datetime from typing import Optional from uuid import UUID from pydantic import BaseModel, Field, field_validator from api.utils.crypto import decrypt_string class CredentialBase(BaseModel): """Base schema with shared Credential fields.""" client_id: Optional[UUID] = Field(None, description="Reference to client") service_id: Optional[UUID] = Field(None, description="Reference to service") infrastructure_id: Optional[UUID] = Field(None, description="Reference to infrastructure component") credential_type: str = Field(..., description="Type of credential: password, api_key, oauth, ssh_key, shared_secret, jwt, connection_string, certificate") service_name: str = Field(..., description="Display name for the service (e.g., 'Gitea Admin')") username: Optional[str] = Field(None, description="Username for authentication") client_id_oauth: Optional[str] = Field(None, description="OAuth client ID") tenant_id_oauth: Optional[str] = Field(None, description="OAuth tenant ID") public_key: Optional[str] = Field(None, description="SSH public key (not encrypted)") integration_code: Optional[str] = Field(None, description="Integration code for services like Autotask") external_url: Optional[str] = Field(None, description="External URL for the service") internal_url: Optional[str] = Field(None, description="Internal URL for the service") custom_port: Optional[int] = Field(None, description="Custom port number if applicable") role_description: Optional[str] = Field(None, description="Description of access level/role") requires_vpn: bool = Field(False, description="Whether VPN is required for access") requires_2fa: bool = Field(False, description="Whether 2FA is required") ssh_key_auth_enabled: bool = Field(False, description="Whether SSH key authentication is enabled") access_level: Optional[str] = Field(None, description="Description of access level") expires_at: Optional[datetime] = Field(None, description="When the credential expires") last_rotated_at: Optional[datetime] = Field(None, description="When the credential was last rotated") is_active: bool = Field(True, description="Whether the credential is currently active") class CredentialCreate(CredentialBase): """Schema for creating a new Credential.""" password: Optional[str] = Field(None, description="Plain text password (will be encrypted before storage)") api_key: Optional[str] = Field(None, description="Plain text API key (will be encrypted before storage)") client_secret: Optional[str] = Field(None, description="Plain text OAuth client secret (will be encrypted before storage)") token: Optional[str] = Field(None, description="Plain text bearer/access token (will be encrypted before storage)") connection_string: Optional[str] = Field(None, description="Plain text connection string (will be encrypted before storage)") class CredentialUpdate(BaseModel): """Schema for updating an existing Credential. All fields are optional.""" client_id: Optional[UUID] = Field(None, description="Reference to client") service_id: Optional[UUID] = Field(None, description="Reference to service") infrastructure_id: Optional[UUID] = Field(None, description="Reference to infrastructure component") credential_type: Optional[str] = Field(None, description="Type of credential") service_name: Optional[str] = Field(None, description="Display name for the service") username: Optional[str] = Field(None, description="Username for authentication") password: Optional[str] = Field(None, description="Plain text password (will be encrypted before storage)") api_key: Optional[str] = Field(None, description="Plain text API key (will be encrypted before storage)") client_id_oauth: Optional[str] = Field(None, description="OAuth client ID") client_secret: Optional[str] = Field(None, description="Plain text OAuth client secret (will be encrypted before storage)") tenant_id_oauth: Optional[str] = Field(None, description="OAuth tenant ID") public_key: Optional[str] = Field(None, description="SSH public key") token: Optional[str] = Field(None, description="Plain text bearer/access token (will be encrypted before storage)") connection_string: Optional[str] = Field(None, description="Plain text connection string (will be encrypted before storage)") integration_code: Optional[str] = Field(None, description="Integration code") external_url: Optional[str] = Field(None, description="External URL for the service") internal_url: Optional[str] = Field(None, description="Internal URL for the service") custom_port: Optional[int] = Field(None, description="Custom port number") role_description: Optional[str] = Field(None, description="Description of access level/role") requires_vpn: Optional[bool] = Field(None, description="Whether VPN is required") requires_2fa: Optional[bool] = Field(None, description="Whether 2FA is required") ssh_key_auth_enabled: Optional[bool] = Field(None, description="Whether SSH key authentication is enabled") access_level: Optional[str] = Field(None, description="Description of access level") expires_at: Optional[datetime] = Field(None, description="When the credential expires") last_rotated_at: Optional[datetime] = Field(None, description="When the credential was last rotated") is_active: Optional[bool] = Field(None, description="Whether the credential is active") class CredentialResponse(BaseModel): """Schema for Credential responses with ID and timestamps. Includes decrypted values.""" id: UUID = Field(..., description="Unique identifier for the credential") client_id: Optional[UUID] = Field(None, description="Reference to client") service_id: Optional[UUID] = Field(None, description="Reference to service") infrastructure_id: Optional[UUID] = Field(None, description="Reference to infrastructure component") credential_type: str = Field(..., description="Type of credential") service_name: str = Field(..., description="Display name for the service") username: Optional[str] = Field(None, description="Username for authentication") # Decrypted sensitive fields (computed from encrypted database fields) password: Optional[str] = Field(None, description="Decrypted password") api_key: Optional[str] = Field(None, description="Decrypted API key") client_secret: Optional[str] = Field(None, description="Decrypted OAuth client secret") token: Optional[str] = Field(None, description="Decrypted bearer/access token") connection_string: Optional[str] = Field(None, description="Decrypted connection string") # OAuth and other non-encrypted fields client_id_oauth: Optional[str] = Field(None, description="OAuth client ID") tenant_id_oauth: Optional[str] = Field(None, description="OAuth tenant ID") public_key: Optional[str] = Field(None, description="SSH public key") integration_code: Optional[str] = Field(None, description="Integration code") external_url: Optional[str] = Field(None, description="External URL for the service") internal_url: Optional[str] = Field(None, description="Internal URL for the service") custom_port: Optional[int] = Field(None, description="Custom port number") role_description: Optional[str] = Field(None, description="Description of access level/role") requires_vpn: bool = Field(..., description="Whether VPN is required") requires_2fa: bool = Field(..., description="Whether 2FA is required") ssh_key_auth_enabled: bool = Field(..., description="Whether SSH key authentication is enabled") access_level: Optional[str] = Field(None, description="Description of access level") expires_at: Optional[datetime] = Field(None, description="When the credential expires") last_rotated_at: Optional[datetime] = Field(None, description="When the credential was last rotated") is_active: bool = Field(..., description="Whether the credential is active") created_at: datetime = Field(..., description="Timestamp when the credential was created") updated_at: datetime = Field(..., description="Timestamp when the credential was last updated") model_config = {"from_attributes": True} @field_validator("password", mode="before") @classmethod def decrypt_password(cls, v): """Decrypt password_encrypted field from database.""" if v is None: return None if isinstance(v, bytes): # This is the encrypted bytes from password_encrypted field encrypted_str = v.decode('utf-8') return decrypt_string(encrypted_str, default=None) return v @field_validator("api_key", mode="before") @classmethod def decrypt_api_key(cls, v): """Decrypt api_key_encrypted field from database.""" if v is None: return None if isinstance(v, bytes): encrypted_str = v.decode('utf-8') return decrypt_string(encrypted_str, default=None) return v @field_validator("client_secret", mode="before") @classmethod def decrypt_client_secret(cls, v): """Decrypt client_secret_encrypted field from database.""" if v is None: return None if isinstance(v, bytes): encrypted_str = v.decode('utf-8') return decrypt_string(encrypted_str, default=None) return v @field_validator("token", mode="before") @classmethod def decrypt_token(cls, v): """Decrypt token_encrypted field from database.""" if v is None: return None if isinstance(v, bytes): encrypted_str = v.decode('utf-8') return decrypt_string(encrypted_str, default=None) return v @field_validator("connection_string", mode="before") @classmethod def decrypt_connection_string(cls, v): """Decrypt connection_string_encrypted field from database.""" if v is None: return None if isinstance(v, bytes): encrypted_str = v.decode('utf-8') return decrypt_string(encrypted_str, default=None) return v