Files
claudetools/api/utils/credential_scanner.py
Mike Swanson 390b10b32c Complete Phase 6: MSP Work Tracking with Context Recall System
Implements production-ready MSP platform with cross-machine persistent memory for Claude.

API Implementation:
- 130 REST API endpoints across 21 entities
- JWT authentication on all endpoints
- AES-256-GCM encryption for credentials
- Automatic audit logging
- Complete OpenAPI documentation

Database:
- 43 tables in MariaDB (172.16.3.20:3306)
- 42 SQLAlchemy models with modern 2.0 syntax
- Full Alembic migration system
- 99.1% CRUD test pass rate

Context Recall System (Phase 6):
- Cross-machine persistent memory via database
- Automatic context injection via Claude Code hooks
- Automatic context saving after task completion
- 90-95% token reduction with compression utilities
- Relevance scoring with time decay
- Tag-based semantic search
- One-command setup script

Security Features:
- JWT tokens with Argon2 password hashing
- AES-256-GCM encryption for all sensitive data
- Comprehensive audit trail for credentials
- HMAC tamper detection
- Secure configuration management

Test Results:
- Phase 3: 38/38 CRUD tests passing (100%)
- Phase 4: 34/35 core API tests passing (97.1%)
- Phase 5: 62/62 extended API tests passing (100%)
- Phase 6: 10/10 compression tests passing (100%)
- Overall: 144/145 tests passing (99.3%)

Documentation:
- Comprehensive architecture guides
- Setup automation scripts
- API documentation at /api/docs
- Complete test reports
- Troubleshooting guides

Project Status: 95% Complete (Production-Ready)
Phase 7 (optional work context APIs) remains for future enhancement.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-17 06:00:26 -07:00

598 lines
18 KiB
Python

"""
Credential scanner and importer for ClaudeTools context import system.
This module provides utilities to scan for credential files, parse structured
credential data from various formats, and import credentials into the database
with automatic encryption.
Security features:
- Automatic encryption using existing credential_service
- No plaintext credentials logged
- Audit trail for all imports
- Support for multiple credential file formats
Supported file formats:
- credentials.md (Markdown format with headers)
- .env (KEY=value format)
- passwords.txt (structured text format)
- Custom parsers for various formats
"""
import logging
import os
import re
from pathlib import Path
from typing import Dict, List, Optional
from sqlalchemy.orm import Session
from api.schemas.credential import CredentialCreate
from api.services.credential_service import create_credential
logger = logging.getLogger(__name__)
# Credential type detection patterns
API_KEY_PATTERNS = [
r"^sk-[a-zA-Z0-9]{20,}", # OpenAI-style
r"^api_[a-zA-Z0-9]{20,}", # API prefix
r"^token[_-]?[a-zA-Z0-9]{20,}", # Token prefix
r"^ghp_[a-zA-Z0-9]{36,}", # GitHub Personal Access Token
r"^gho_[a-zA-Z0-9]{36,}", # GitHub OAuth Token
r"^xoxb-[a-zA-Z0-9-]+", # Slack bot token
r"^xoxp-[a-zA-Z0-9-]+", # Slack user token
]
SSH_KEY_PATTERN = r"^-----BEGIN (RSA|OPENSSH|DSA|EC) PRIVATE KEY-----"
CONNECTION_STRING_PATTERNS = [
r"^(mysql|postgresql|mongodb|redis|mssql)://",
r"Server=.+;Database=.+;",
r"Host=.+;Port=\d+;",
]
def scan_for_credential_files(base_path: str) -> List[str]:
"""
Find all credential files in a directory tree.
Searches for common credential file names including:
- credentials.md
- passwords.txt, passwords.md
- .env, .env.local, .env.production
- secrets.txt, secrets.md
- auth.txt, auth.md
Args:
base_path: Root directory to search from
Returns:
List of absolute paths to credential files found
Example:
```python
files = scan_for_credential_files("C:/Projects/MyApp")
# Returns: ["C:/Projects/MyApp/credentials.md", "C:/Projects/MyApp/.env"]
```
Security:
- Does not read file contents during scan
- Only returns file paths for manual review
- Skips common exclusion patterns (node_modules, .git, etc.)
"""
credential_files = []
base_path_obj = Path(base_path)
# Validate base path exists
if not base_path_obj.exists():
logger.warning(f"Base path does not exist: {base_path}")
return []
if not base_path_obj.is_dir():
logger.warning(f"Base path is not a directory: {base_path}")
return []
# File name patterns to match
file_patterns = [
"credentials.md",
"credentials.txt",
"passwords.md",
"passwords.txt",
"secrets.md",
"secrets.txt",
"auth.md",
"auth.txt",
".env",
".env.local",
".env.production",
".env.development",
".env.staging",
]
# Directories to exclude from search
exclude_dirs = {
".git",
".svn",
"node_modules",
"venv",
"__pycache__",
".venv",
"dist",
"build",
".pytest_cache",
".tox",
}
logger.info(f"Scanning for credential files in: {base_path}")
# Walk directory tree
for root, dirs, files in os.walk(base_path):
# Remove excluded directories from search
dirs[:] = [d for d in dirs if d not in exclude_dirs]
# Check each file against patterns
for filename in files:
if filename in file_patterns:
file_path = os.path.join(root, filename)
credential_files.append(file_path)
logger.info(f"Found credential file: {file_path}")
logger.info(f"Scan complete. Found {len(credential_files)} credential file(s)")
return credential_files
def parse_credential_file(file_path: str) -> List[Dict]:
"""
Extract credentials from a file and return structured data.
Supports multiple file formats:
- Markdown (.md) - Parses headers and key-value pairs
- Environment (.env) - Parses KEY=value format
- Text (.txt) - Parses structured text with labels
Args:
file_path: Absolute path to credential file
Returns:
List of credential dictionaries with keys:
- service_name: Name of the service/system
- credential_type: Type (password, api_key, oauth, etc.)
- username: Username (if applicable)
- password: Password value (if applicable)
- api_key: API key value (if applicable)
- token: Token value (if applicable)
- connection_string: Connection string (if applicable)
- notes: Additional notes/metadata
Example:
```python
creds = parse_credential_file("C:/Projects/credentials.md")
# Returns:
# [
# {
# "service_name": "Gitea Admin",
# "credential_type": "password",
# "username": "admin",
# "password": "SecurePass123!"
# },
# ...
# ]
```
Security:
- Returns plaintext credentials for encryption by import function
- Never logs credential values
- Validates file exists before reading
"""
file_path_obj = Path(file_path)
if not file_path_obj.exists():
logger.error(f"Credential file not found: {file_path}")
return []
if not file_path_obj.is_file():
logger.error(f"Path is not a file: {file_path}")
return []
logger.info(f"Parsing credential file: {file_path}")
# Determine file type by extension
file_ext = file_path_obj.suffix.lower()
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if file_ext == '.md':
credentials = _parse_markdown_credentials(content)
elif file_ext == '.env' or file_path_obj.name.startswith('.env'):
credentials = _parse_env_credentials(content)
elif file_ext == '.txt':
credentials = _parse_text_credentials(content)
else:
logger.warning(f"Unknown file type: {file_ext}, attempting markdown parser")
credentials = _parse_markdown_credentials(content)
logger.info(f"Parsed {len(credentials)} credential(s) from file")
return credentials
except Exception as e:
logger.error(f"Failed to parse credential file: {str(e)}")
return []
def _parse_markdown_credentials(content: str) -> List[Dict]:
"""
Parse credentials from Markdown format.
Expected format:
```
## Service Name
Username: user@example.com
Password: secret123
API Key: sk-1234567890
Notes: Additional info
## Another Service
...
```
"""
credentials = []
lines = content.split('\n')
current_cred = None
for line in lines:
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('#') and not line.startswith('##'):
continue
# Service header (## or #)
if line.startswith('##'):
# Save previous credential if exists
if current_cred and current_cred.get('service_name'):
credentials.append(_finalize_credential(current_cred))
# Start new credential
service_name = line.lstrip('#').strip()
current_cred = {'service_name': service_name}
elif line.startswith('#'):
# Save previous credential if exists
if current_cred and current_cred.get('service_name'):
credentials.append(_finalize_credential(current_cred))
# Start new credential
service_name = line.lstrip('#').strip()
current_cred = {'service_name': service_name}
# Key-value pairs
elif ':' in line and current_cred is not None:
key, value = line.split(':', 1)
key = key.strip().lower()
value = value.strip()
if not value:
continue
# Map common keys to credential fields
if key in ['username', 'user', 'login']:
current_cred['username'] = value
elif key in ['password', 'pass', 'pwd']:
current_cred['password'] = value
elif key in ['api key', 'api_key', 'apikey', 'key']:
current_cred['api_key'] = value
elif key in ['token', 'access token', 'access_token', 'bearer']:
current_cred['token'] = value
elif key in ['client secret', 'client_secret', 'secret']:
current_cred['client_secret'] = value
elif key in ['connection string', 'connection_string', 'conn_str']:
current_cred['connection_string'] = value
elif key in ['url', 'host', 'server', 'address']:
current_cred['url'] = value
elif key in ['port']:
try:
current_cred['custom_port'] = int(value)
except ValueError:
pass
elif key in ['notes', 'note', 'description', 'desc']:
current_cred['notes'] = value
elif key in ['type', 'credential_type', 'kind']:
current_cred['credential_type'] = value
# Add last credential
if current_cred and current_cred.get('service_name'):
credentials.append(_finalize_credential(current_cred))
return credentials
def _parse_env_credentials(content: str) -> List[Dict]:
"""
Parse credentials from .env format.
Expected format:
```
DATABASE_URL=mysql://user:pass@host:3306/db
API_KEY=sk-1234567890
SECRET_TOKEN=abc123def456
```
"""
credentials = []
lines = content.split('\n')
for line in lines:
line = line.strip()
# Skip comments and empty lines
if not line or line.startswith('#'):
continue
# Parse KEY=value
if '=' not in line:
continue
key, value = line.split('=', 1)
key = key.strip()
value = value.strip().strip('"').strip("'")
if not value:
continue
# Create credential based on key pattern
cred = {
'service_name': key.replace('_', ' ').title(),
}
# Detect credential type from value
cred_type, field = _detect_credential_type(value)
cred['credential_type'] = cred_type
cred[field] = value
credentials.append(cred)
return credentials
def _parse_text_credentials(content: str) -> List[Dict]:
"""
Parse credentials from structured text format.
Similar to markdown but more flexible with delimiters.
"""
# Use markdown parser as fallback for text files
return _parse_markdown_credentials(content)
def _detect_credential_type(value: str) -> tuple[str, str]:
"""
Detect the type of credential based on its value pattern.
Returns:
tuple: (credential_type, field_name)
"""
# Check for SSH key
if re.match(SSH_KEY_PATTERN, value, re.MULTILINE):
return ('ssh_key', 'password') # Store in password field
# Check for API key patterns
for pattern in API_KEY_PATTERNS:
if re.match(pattern, value):
return ('api_key', 'api_key')
# Check for connection strings
for pattern in CONNECTION_STRING_PATTERNS:
if re.match(pattern, value, re.IGNORECASE):
return ('connection_string', 'connection_string')
# Check for JWT (basic heuristic: 3 base64 segments separated by dots)
if value.count('.') == 2 and len(value) > 50:
parts = value.split('.')
if all(len(p) > 10 for p in parts):
return ('jwt', 'token')
# Check for OAuth token (starts with common prefixes)
if value.startswith(('ya29.', 'ey', 'oauth')):
return ('oauth', 'token')
# Default to password
return ('password', 'password')
def _finalize_credential(cred: Dict) -> Dict:
"""
Finalize a credential dictionary by setting defaults and detecting types.
"""
# Auto-detect credential type if not specified
if 'credential_type' not in cred:
if 'api_key' in cred:
cred['credential_type'] = 'api_key'
elif 'token' in cred:
cred['credential_type'] = 'jwt'
elif 'client_secret' in cred:
cred['credential_type'] = 'oauth'
elif 'connection_string' in cred:
cred['credential_type'] = 'connection_string'
elif 'password' in cred:
cred['credential_type'] = 'password'
else:
cred['credential_type'] = 'password'
# Extract URL fields if present
if 'url' in cred:
url = cred.pop('url')
# Determine if internal or external based on IP pattern
if re.match(r'^(192\.168\.|10\.|172\.(1[6-9]|2[0-9]|3[01])\.)', url):
cred['internal_url'] = url
else:
cred['external_url'] = url
return cred
def import_credentials_to_db(
db: Session,
credentials: List[Dict],
client_id: Optional[str] = None,
user_id: str = "system_import",
ip_address: Optional[str] = None,
) -> int:
"""
Import credentials into the database using credential_service.
This function takes a list of credential dictionaries and imports them
into the database with automatic encryption. Each credential is passed
through the credential_service which handles:
- AES-256-GCM encryption of sensitive fields
- Audit log creation
- Proper database storage
Args:
db: SQLAlchemy database session
credentials: List of credential dictionaries from parse_credential_file()
client_id: Optional UUID string to associate credentials with a client
user_id: User ID for audit logging (default: "system_import")
ip_address: IP address for audit logging (optional)
Returns:
int: Count of successfully imported credentials
Example:
```python
from api.database import SessionLocal
db = SessionLocal()
try:
files = scan_for_credential_files("C:/Projects")
for file_path in files:
creds = parse_credential_file(file_path)
count = import_credentials_to_db(db, creds, client_id="uuid-here")
print(f"Imported {count} credentials from {file_path}")
finally:
db.close()
```
Security:
- All sensitive fields automatically encrypted by credential_service
- Audit log entry created for each import
- Never logs plaintext credential values
- Uses existing encryption infrastructure
Raises:
Exception: If database operations fail (logged but not raised)
"""
imported_count = 0
logger.info(f"Starting import of {len(credentials)} credential(s)")
for cred_data in credentials:
try:
# Add client_id if provided
if client_id:
cred_data['client_id'] = client_id
# Create CredentialCreate schema object
credential_create = CredentialCreate(**cred_data)
# Import using credential_service (handles encryption and audit)
created_credential = create_credential(
db=db,
credential_data=credential_create,
user_id=user_id,
ip_address=ip_address,
user_agent="credential_scanner_import",
)
imported_count += 1
logger.info(
f"Imported credential: {created_credential.service_name} "
f"(ID: {created_credential.id})"
)
except Exception as e:
logger.error(
f"Failed to import credential '{cred_data.get('service_name', 'Unknown')}': "
f"{str(e)}"
)
# Continue with next credential instead of failing entire import
continue
logger.info(
f"Import complete. Successfully imported {imported_count}/{len(credentials)} "
"credential(s)"
)
return imported_count
# Convenience function for full workflow
def scan_and_import_credentials(
base_path: str,
db: Session,
client_id: Optional[str] = None,
user_id: str = "system_import",
ip_address: Optional[str] = None,
) -> Dict[str, int]:
"""
Scan for credential files and import all found credentials.
This is a convenience function that combines scanning, parsing, and importing
in a single operation.
Args:
base_path: Root directory to scan
db: Database session
client_id: Optional client UUID to associate credentials with
user_id: User ID for audit logging
ip_address: IP address for audit logging
Returns:
Dict with summary statistics:
- files_found: Number of credential files found
- credentials_parsed: Total credentials parsed from all files
- credentials_imported: Number successfully imported to database
Example:
```python
from api.database import SessionLocal
db = SessionLocal()
try:
results = scan_and_import_credentials(
"C:/Projects/MyClient",
db,
client_id="client-uuid-here"
)
print(f"Found {results['files_found']} files")
print(f"Imported {results['credentials_imported']} credentials")
finally:
db.close()
```
"""
# Scan for files
files = scan_for_credential_files(base_path)
total_parsed = 0
total_imported = 0
# Parse and import from each file
for file_path in files:
credentials = parse_credential_file(file_path)
total_parsed += len(credentials)
if credentials:
imported = import_credentials_to_db(
db=db,
credentials=credentials,
client_id=client_id,
user_id=user_id,
ip_address=ip_address,
)
total_imported += imported
return {
'files_found': len(files),
'credentials_parsed': total_parsed,
'credentials_imported': total_imported,
}