""" Credential scanner and importer for ClaudeTools context import system. This module provides utilities to scan for credential files, parse structured credential data from various formats, and import credentials into the database with automatic encryption. Security features: - Automatic encryption using existing credential_service - No plaintext credentials logged - Audit trail for all imports - Support for multiple credential file formats Supported file formats: - credentials.md (Markdown format with headers) - .env (KEY=value format) - passwords.txt (structured text format) - Custom parsers for various formats """ import logging import os import re from pathlib import Path from typing import Dict, List, Optional from sqlalchemy.orm import Session from api.schemas.credential import CredentialCreate from api.services.credential_service import create_credential logger = logging.getLogger(__name__) # Credential type detection patterns API_KEY_PATTERNS = [ r"^sk-[a-zA-Z0-9]{20,}", # OpenAI-style r"^api_[a-zA-Z0-9]{20,}", # API prefix r"^token[_-]?[a-zA-Z0-9]{20,}", # Token prefix r"^ghp_[a-zA-Z0-9]{36,}", # GitHub Personal Access Token r"^gho_[a-zA-Z0-9]{36,}", # GitHub OAuth Token r"^xoxb-[a-zA-Z0-9-]+", # Slack bot token r"^xoxp-[a-zA-Z0-9-]+", # Slack user token ] SSH_KEY_PATTERN = r"^-----BEGIN (RSA|OPENSSH|DSA|EC) PRIVATE KEY-----" CONNECTION_STRING_PATTERNS = [ r"^(mysql|postgresql|mongodb|redis|mssql)://", r"Server=.+;Database=.+;", r"Host=.+;Port=\d+;", ] def scan_for_credential_files(base_path: str) -> List[str]: """ Find all credential files in a directory tree. Searches for common credential file names including: - credentials.md - passwords.txt, passwords.md - .env, .env.local, .env.production - secrets.txt, secrets.md - auth.txt, auth.md Args: base_path: Root directory to search from Returns: List of absolute paths to credential files found Example: ```python files = scan_for_credential_files("C:/Projects/MyApp") # Returns: ["C:/Projects/MyApp/credentials.md", "C:/Projects/MyApp/.env"] ``` Security: - Does not read file contents during scan - Only returns file paths for manual review - Skips common exclusion patterns (node_modules, .git, etc.) """ credential_files = [] base_path_obj = Path(base_path) # Validate base path exists if not base_path_obj.exists(): logger.warning(f"Base path does not exist: {base_path}") return [] if not base_path_obj.is_dir(): logger.warning(f"Base path is not a directory: {base_path}") return [] # File name patterns to match file_patterns = [ "credentials.md", "credentials.txt", "passwords.md", "passwords.txt", "secrets.md", "secrets.txt", "auth.md", "auth.txt", ".env", ".env.local", ".env.production", ".env.development", ".env.staging", ] # Directories to exclude from search exclude_dirs = { ".git", ".svn", "node_modules", "venv", "__pycache__", ".venv", "dist", "build", ".pytest_cache", ".tox", } logger.info(f"Scanning for credential files in: {base_path}") # Walk directory tree for root, dirs, files in os.walk(base_path): # Remove excluded directories from search dirs[:] = [d for d in dirs if d not in exclude_dirs] # Check each file against patterns for filename in files: if filename in file_patterns: file_path = os.path.join(root, filename) credential_files.append(file_path) logger.info(f"Found credential file: {file_path}") logger.info(f"Scan complete. Found {len(credential_files)} credential file(s)") return credential_files def parse_credential_file(file_path: str) -> List[Dict]: """ Extract credentials from a file and return structured data. Supports multiple file formats: - Markdown (.md) - Parses headers and key-value pairs - Environment (.env) - Parses KEY=value format - Text (.txt) - Parses structured text with labels Args: file_path: Absolute path to credential file Returns: List of credential dictionaries with keys: - service_name: Name of the service/system - credential_type: Type (password, api_key, oauth, etc.) - username: Username (if applicable) - password: Password value (if applicable) - api_key: API key value (if applicable) - token: Token value (if applicable) - connection_string: Connection string (if applicable) - notes: Additional notes/metadata Example: ```python creds = parse_credential_file("C:/Projects/credentials.md") # Returns: # [ # { # "service_name": "Gitea Admin", # "credential_type": "password", # "username": "admin", # "password": "SecurePass123!" # }, # ... # ] ``` Security: - Returns plaintext credentials for encryption by import function - Never logs credential values - Validates file exists before reading """ file_path_obj = Path(file_path) if not file_path_obj.exists(): logger.error(f"Credential file not found: {file_path}") return [] if not file_path_obj.is_file(): logger.error(f"Path is not a file: {file_path}") return [] logger.info(f"Parsing credential file: {file_path}") # Determine file type by extension file_ext = file_path_obj.suffix.lower() try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() if file_ext == '.md': credentials = _parse_markdown_credentials(content) elif file_ext == '.env' or file_path_obj.name.startswith('.env'): credentials = _parse_env_credentials(content) elif file_ext == '.txt': credentials = _parse_text_credentials(content) else: logger.warning(f"Unknown file type: {file_ext}, attempting markdown parser") credentials = _parse_markdown_credentials(content) logger.info(f"Parsed {len(credentials)} credential(s) from file") return credentials except Exception as e: logger.error(f"Failed to parse credential file: {str(e)}") return [] def _parse_markdown_credentials(content: str) -> List[Dict]: """ Parse credentials from Markdown format. Expected format: ``` ## Service Name Username: user@example.com Password: secret123 API Key: sk-1234567890 Notes: Additional info ## Another Service ... ``` """ credentials = [] lines = content.split('\n') current_cred = None for line in lines: line = line.strip() # Skip empty lines and comments if not line or line.startswith('#') and not line.startswith('##'): continue # Service header (## or #) if line.startswith('##'): # Save previous credential if exists if current_cred and current_cred.get('service_name'): credentials.append(_finalize_credential(current_cred)) # Start new credential service_name = line.lstrip('#').strip() current_cred = {'service_name': service_name} elif line.startswith('#'): # Save previous credential if exists if current_cred and current_cred.get('service_name'): credentials.append(_finalize_credential(current_cred)) # Start new credential service_name = line.lstrip('#').strip() current_cred = {'service_name': service_name} # Key-value pairs elif ':' in line and current_cred is not None: key, value = line.split(':', 1) key = key.strip().lower() value = value.strip() if not value: continue # Map common keys to credential fields if key in ['username', 'user', 'login']: current_cred['username'] = value elif key in ['password', 'pass', 'pwd']: current_cred['password'] = value elif key in ['api key', 'api_key', 'apikey', 'key']: current_cred['api_key'] = value elif key in ['token', 'access token', 'access_token', 'bearer']: current_cred['token'] = value elif key in ['client secret', 'client_secret', 'secret']: current_cred['client_secret'] = value elif key in ['connection string', 'connection_string', 'conn_str']: current_cred['connection_string'] = value elif key in ['url', 'host', 'server', 'address']: current_cred['url'] = value elif key in ['port']: try: current_cred['custom_port'] = int(value) except ValueError: pass elif key in ['notes', 'note', 'description', 'desc']: current_cred['notes'] = value elif key in ['type', 'credential_type', 'kind']: current_cred['credential_type'] = value # Add last credential if current_cred and current_cred.get('service_name'): credentials.append(_finalize_credential(current_cred)) return credentials def _parse_env_credentials(content: str) -> List[Dict]: """ Parse credentials from .env format. Expected format: ``` DATABASE_URL=mysql://user:pass@host:3306/db API_KEY=sk-1234567890 SECRET_TOKEN=abc123def456 ``` """ credentials = [] lines = content.split('\n') for line in lines: line = line.strip() # Skip comments and empty lines if not line or line.startswith('#'): continue # Parse KEY=value if '=' not in line: continue key, value = line.split('=', 1) key = key.strip() value = value.strip().strip('"').strip("'") if not value: continue # Create credential based on key pattern cred = { 'service_name': key.replace('_', ' ').title(), } # Detect credential type from value cred_type, field = _detect_credential_type(value) cred['credential_type'] = cred_type cred[field] = value credentials.append(cred) return credentials def _parse_text_credentials(content: str) -> List[Dict]: """ Parse credentials from structured text format. Similar to markdown but more flexible with delimiters. """ # Use markdown parser as fallback for text files return _parse_markdown_credentials(content) def _detect_credential_type(value: str) -> tuple[str, str]: """ Detect the type of credential based on its value pattern. Returns: tuple: (credential_type, field_name) """ # Check for SSH key if re.match(SSH_KEY_PATTERN, value, re.MULTILINE): return ('ssh_key', 'password') # Store in password field # Check for API key patterns for pattern in API_KEY_PATTERNS: if re.match(pattern, value): return ('api_key', 'api_key') # Check for connection strings for pattern in CONNECTION_STRING_PATTERNS: if re.match(pattern, value, re.IGNORECASE): return ('connection_string', 'connection_string') # Check for JWT (basic heuristic: 3 base64 segments separated by dots) if value.count('.') == 2 and len(value) > 50: parts = value.split('.') if all(len(p) > 10 for p in parts): return ('jwt', 'token') # Check for OAuth token (starts with common prefixes) if value.startswith(('ya29.', 'ey', 'oauth')): return ('oauth', 'token') # Default to password return ('password', 'password') def _finalize_credential(cred: Dict) -> Dict: """ Finalize a credential dictionary by setting defaults and detecting types. """ # Auto-detect credential type if not specified if 'credential_type' not in cred: if 'api_key' in cred: cred['credential_type'] = 'api_key' elif 'token' in cred: cred['credential_type'] = 'jwt' elif 'client_secret' in cred: cred['credential_type'] = 'oauth' elif 'connection_string' in cred: cred['credential_type'] = 'connection_string' elif 'password' in cred: cred['credential_type'] = 'password' else: cred['credential_type'] = 'password' # Extract URL fields if present if 'url' in cred: url = cred.pop('url') # Determine if internal or external based on IP pattern if re.match(r'^(192\.168\.|10\.|172\.(1[6-9]|2[0-9]|3[01])\.)', url): cred['internal_url'] = url else: cred['external_url'] = url return cred def import_credentials_to_db( db: Session, credentials: List[Dict], client_id: Optional[str] = None, user_id: str = "system_import", ip_address: Optional[str] = None, ) -> int: """ Import credentials into the database using credential_service. This function takes a list of credential dictionaries and imports them into the database with automatic encryption. Each credential is passed through the credential_service which handles: - AES-256-GCM encryption of sensitive fields - Audit log creation - Proper database storage Args: db: SQLAlchemy database session credentials: List of credential dictionaries from parse_credential_file() client_id: Optional UUID string to associate credentials with a client user_id: User ID for audit logging (default: "system_import") ip_address: IP address for audit logging (optional) Returns: int: Count of successfully imported credentials Example: ```python from api.database import SessionLocal db = SessionLocal() try: files = scan_for_credential_files("C:/Projects") for file_path in files: creds = parse_credential_file(file_path) count = import_credentials_to_db(db, creds, client_id="uuid-here") print(f"Imported {count} credentials from {file_path}") finally: db.close() ``` Security: - All sensitive fields automatically encrypted by credential_service - Audit log entry created for each import - Never logs plaintext credential values - Uses existing encryption infrastructure Raises: Exception: If database operations fail (logged but not raised) """ imported_count = 0 logger.info(f"Starting import of {len(credentials)} credential(s)") for cred_data in credentials: try: # Add client_id if provided if client_id: cred_data['client_id'] = client_id # Create CredentialCreate schema object credential_create = CredentialCreate(**cred_data) # Import using credential_service (handles encryption and audit) created_credential = create_credential( db=db, credential_data=credential_create, user_id=user_id, ip_address=ip_address, user_agent="credential_scanner_import", ) imported_count += 1 logger.info( f"Imported credential: {created_credential.service_name} " f"(ID: {created_credential.id})" ) except Exception as e: logger.error( f"Failed to import credential '{cred_data.get('service_name', 'Unknown')}': " f"{str(e)}" ) # Continue with next credential instead of failing entire import continue logger.info( f"Import complete. Successfully imported {imported_count}/{len(credentials)} " "credential(s)" ) return imported_count # Convenience function for full workflow def scan_and_import_credentials( base_path: str, db: Session, client_id: Optional[str] = None, user_id: str = "system_import", ip_address: Optional[str] = None, ) -> Dict[str, int]: """ Scan for credential files and import all found credentials. This is a convenience function that combines scanning, parsing, and importing in a single operation. Args: base_path: Root directory to scan db: Database session client_id: Optional client UUID to associate credentials with user_id: User ID for audit logging ip_address: IP address for audit logging Returns: Dict with summary statistics: - files_found: Number of credential files found - credentials_parsed: Total credentials parsed from all files - credentials_imported: Number successfully imported to database Example: ```python from api.database import SessionLocal db = SessionLocal() try: results = scan_and_import_credentials( "C:/Projects/MyClient", db, client_id="client-uuid-here" ) print(f"Found {results['files_found']} files") print(f"Imported {results['credentials_imported']} credentials") finally: db.close() ``` """ # Scan for files files = scan_for_credential_files(base_path) total_parsed = 0 total_imported = 0 # Parse and import from each file for file_path in files: credentials = parse_credential_file(file_path) total_parsed += len(credentials) if credentials: imported = import_credentials_to_db( db=db, credentials=credentials, client_id=client_id, user_id=user_id, ip_address=ip_address, ) total_imported += imported return { 'files_found': len(files), 'credentials_parsed': total_parsed, 'credentials_imported': total_imported, }