Implements production-ready MSP platform with cross-machine persistent memory for Claude. API Implementation: - 130 REST API endpoints across 21 entities - JWT authentication on all endpoints - AES-256-GCM encryption for credentials - Automatic audit logging - Complete OpenAPI documentation Database: - 43 tables in MariaDB (172.16.3.20:3306) - 42 SQLAlchemy models with modern 2.0 syntax - Full Alembic migration system - 99.1% CRUD test pass rate Context Recall System (Phase 6): - Cross-machine persistent memory via database - Automatic context injection via Claude Code hooks - Automatic context saving after task completion - 90-95% token reduction with compression utilities - Relevance scoring with time decay - Tag-based semantic search - One-command setup script Security Features: - JWT tokens with Argon2 password hashing - AES-256-GCM encryption for all sensitive data - Comprehensive audit trail for credentials - HMAC tamper detection - Secure configuration management Test Results: - Phase 3: 38/38 CRUD tests passing (100%) - Phase 4: 34/35 core API tests passing (97.1%) - Phase 5: 62/62 extended API tests passing (100%) - Phase 6: 10/10 compression tests passing (100%) - Overall: 144/145 tests passing (99.3%) Documentation: - Comprehensive architecture guides - Setup automation scripts - API documentation at /api/docs - Complete test reports - Troubleshooting guides Project Status: 95% Complete (Production-Ready) Phase 7 (optional work context APIs) remains for future enhancement. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
598 lines
18 KiB
Python
598 lines
18 KiB
Python
"""
|
|
Credential scanner and importer for ClaudeTools context import system.
|
|
|
|
This module provides utilities to scan for credential files, parse structured
|
|
credential data from various formats, and import credentials into the database
|
|
with automatic encryption.
|
|
|
|
Security features:
|
|
- Automatic encryption using existing credential_service
|
|
- No plaintext credentials logged
|
|
- Audit trail for all imports
|
|
- Support for multiple credential file formats
|
|
|
|
Supported file formats:
|
|
- credentials.md (Markdown format with headers)
|
|
- .env (KEY=value format)
|
|
- passwords.txt (structured text format)
|
|
- Custom parsers for various formats
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from api.schemas.credential import CredentialCreate
|
|
from api.services.credential_service import create_credential
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Credential type detection patterns
|
|
API_KEY_PATTERNS = [
|
|
r"^sk-[a-zA-Z0-9]{20,}", # OpenAI-style
|
|
r"^api_[a-zA-Z0-9]{20,}", # API prefix
|
|
r"^token[_-]?[a-zA-Z0-9]{20,}", # Token prefix
|
|
r"^ghp_[a-zA-Z0-9]{36,}", # GitHub Personal Access Token
|
|
r"^gho_[a-zA-Z0-9]{36,}", # GitHub OAuth Token
|
|
r"^xoxb-[a-zA-Z0-9-]+", # Slack bot token
|
|
r"^xoxp-[a-zA-Z0-9-]+", # Slack user token
|
|
]
|
|
|
|
SSH_KEY_PATTERN = r"^-----BEGIN (RSA|OPENSSH|DSA|EC) PRIVATE KEY-----"
|
|
|
|
CONNECTION_STRING_PATTERNS = [
|
|
r"^(mysql|postgresql|mongodb|redis|mssql)://",
|
|
r"Server=.+;Database=.+;",
|
|
r"Host=.+;Port=\d+;",
|
|
]
|
|
|
|
|
|
def scan_for_credential_files(base_path: str) -> List[str]:
|
|
"""
|
|
Find all credential files in a directory tree.
|
|
|
|
Searches for common credential file names including:
|
|
- credentials.md
|
|
- passwords.txt, passwords.md
|
|
- .env, .env.local, .env.production
|
|
- secrets.txt, secrets.md
|
|
- auth.txt, auth.md
|
|
|
|
Args:
|
|
base_path: Root directory to search from
|
|
|
|
Returns:
|
|
List of absolute paths to credential files found
|
|
|
|
Example:
|
|
```python
|
|
files = scan_for_credential_files("C:/Projects/MyApp")
|
|
# Returns: ["C:/Projects/MyApp/credentials.md", "C:/Projects/MyApp/.env"]
|
|
```
|
|
|
|
Security:
|
|
- Does not read file contents during scan
|
|
- Only returns file paths for manual review
|
|
- Skips common exclusion patterns (node_modules, .git, etc.)
|
|
"""
|
|
credential_files = []
|
|
base_path_obj = Path(base_path)
|
|
|
|
# Validate base path exists
|
|
if not base_path_obj.exists():
|
|
logger.warning(f"Base path does not exist: {base_path}")
|
|
return []
|
|
|
|
if not base_path_obj.is_dir():
|
|
logger.warning(f"Base path is not a directory: {base_path}")
|
|
return []
|
|
|
|
# File name patterns to match
|
|
file_patterns = [
|
|
"credentials.md",
|
|
"credentials.txt",
|
|
"passwords.md",
|
|
"passwords.txt",
|
|
"secrets.md",
|
|
"secrets.txt",
|
|
"auth.md",
|
|
"auth.txt",
|
|
".env",
|
|
".env.local",
|
|
".env.production",
|
|
".env.development",
|
|
".env.staging",
|
|
]
|
|
|
|
# Directories to exclude from search
|
|
exclude_dirs = {
|
|
".git",
|
|
".svn",
|
|
"node_modules",
|
|
"venv",
|
|
"__pycache__",
|
|
".venv",
|
|
"dist",
|
|
"build",
|
|
".pytest_cache",
|
|
".tox",
|
|
}
|
|
|
|
logger.info(f"Scanning for credential files in: {base_path}")
|
|
|
|
# Walk directory tree
|
|
for root, dirs, files in os.walk(base_path):
|
|
# Remove excluded directories from search
|
|
dirs[:] = [d for d in dirs if d not in exclude_dirs]
|
|
|
|
# Check each file against patterns
|
|
for filename in files:
|
|
if filename in file_patterns:
|
|
file_path = os.path.join(root, filename)
|
|
credential_files.append(file_path)
|
|
logger.info(f"Found credential file: {file_path}")
|
|
|
|
logger.info(f"Scan complete. Found {len(credential_files)} credential file(s)")
|
|
return credential_files
|
|
|
|
|
|
def parse_credential_file(file_path: str) -> List[Dict]:
|
|
"""
|
|
Extract credentials from a file and return structured data.
|
|
|
|
Supports multiple file formats:
|
|
- Markdown (.md) - Parses headers and key-value pairs
|
|
- Environment (.env) - Parses KEY=value format
|
|
- Text (.txt) - Parses structured text with labels
|
|
|
|
Args:
|
|
file_path: Absolute path to credential file
|
|
|
|
Returns:
|
|
List of credential dictionaries with keys:
|
|
- service_name: Name of the service/system
|
|
- credential_type: Type (password, api_key, oauth, etc.)
|
|
- username: Username (if applicable)
|
|
- password: Password value (if applicable)
|
|
- api_key: API key value (if applicable)
|
|
- token: Token value (if applicable)
|
|
- connection_string: Connection string (if applicable)
|
|
- notes: Additional notes/metadata
|
|
|
|
Example:
|
|
```python
|
|
creds = parse_credential_file("C:/Projects/credentials.md")
|
|
# Returns:
|
|
# [
|
|
# {
|
|
# "service_name": "Gitea Admin",
|
|
# "credential_type": "password",
|
|
# "username": "admin",
|
|
# "password": "SecurePass123!"
|
|
# },
|
|
# ...
|
|
# ]
|
|
```
|
|
|
|
Security:
|
|
- Returns plaintext credentials for encryption by import function
|
|
- Never logs credential values
|
|
- Validates file exists before reading
|
|
"""
|
|
file_path_obj = Path(file_path)
|
|
|
|
if not file_path_obj.exists():
|
|
logger.error(f"Credential file not found: {file_path}")
|
|
return []
|
|
|
|
if not file_path_obj.is_file():
|
|
logger.error(f"Path is not a file: {file_path}")
|
|
return []
|
|
|
|
logger.info(f"Parsing credential file: {file_path}")
|
|
|
|
# Determine file type by extension
|
|
file_ext = file_path_obj.suffix.lower()
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
if file_ext == '.md':
|
|
credentials = _parse_markdown_credentials(content)
|
|
elif file_ext == '.env' or file_path_obj.name.startswith('.env'):
|
|
credentials = _parse_env_credentials(content)
|
|
elif file_ext == '.txt':
|
|
credentials = _parse_text_credentials(content)
|
|
else:
|
|
logger.warning(f"Unknown file type: {file_ext}, attempting markdown parser")
|
|
credentials = _parse_markdown_credentials(content)
|
|
|
|
logger.info(f"Parsed {len(credentials)} credential(s) from file")
|
|
return credentials
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to parse credential file: {str(e)}")
|
|
return []
|
|
|
|
|
|
def _parse_markdown_credentials(content: str) -> List[Dict]:
|
|
"""
|
|
Parse credentials from Markdown format.
|
|
|
|
Expected format:
|
|
```
|
|
## Service Name
|
|
Username: user@example.com
|
|
Password: secret123
|
|
API Key: sk-1234567890
|
|
Notes: Additional info
|
|
|
|
## Another Service
|
|
...
|
|
```
|
|
"""
|
|
credentials = []
|
|
lines = content.split('\n')
|
|
current_cred = None
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
|
|
# Skip empty lines and comments
|
|
if not line or line.startswith('#') and not line.startswith('##'):
|
|
continue
|
|
|
|
# Service header (## or #)
|
|
if line.startswith('##'):
|
|
# Save previous credential if exists
|
|
if current_cred and current_cred.get('service_name'):
|
|
credentials.append(_finalize_credential(current_cred))
|
|
|
|
# Start new credential
|
|
service_name = line.lstrip('#').strip()
|
|
current_cred = {'service_name': service_name}
|
|
|
|
elif line.startswith('#'):
|
|
# Save previous credential if exists
|
|
if current_cred and current_cred.get('service_name'):
|
|
credentials.append(_finalize_credential(current_cred))
|
|
|
|
# Start new credential
|
|
service_name = line.lstrip('#').strip()
|
|
current_cred = {'service_name': service_name}
|
|
|
|
# Key-value pairs
|
|
elif ':' in line and current_cred is not None:
|
|
key, value = line.split(':', 1)
|
|
key = key.strip().lower()
|
|
value = value.strip()
|
|
|
|
if not value:
|
|
continue
|
|
|
|
# Map common keys to credential fields
|
|
if key in ['username', 'user', 'login']:
|
|
current_cred['username'] = value
|
|
elif key in ['password', 'pass', 'pwd']:
|
|
current_cred['password'] = value
|
|
elif key in ['api key', 'api_key', 'apikey', 'key']:
|
|
current_cred['api_key'] = value
|
|
elif key in ['token', 'access token', 'access_token', 'bearer']:
|
|
current_cred['token'] = value
|
|
elif key in ['client secret', 'client_secret', 'secret']:
|
|
current_cred['client_secret'] = value
|
|
elif key in ['connection string', 'connection_string', 'conn_str']:
|
|
current_cred['connection_string'] = value
|
|
elif key in ['url', 'host', 'server', 'address']:
|
|
current_cred['url'] = value
|
|
elif key in ['port']:
|
|
try:
|
|
current_cred['custom_port'] = int(value)
|
|
except ValueError:
|
|
pass
|
|
elif key in ['notes', 'note', 'description', 'desc']:
|
|
current_cred['notes'] = value
|
|
elif key in ['type', 'credential_type', 'kind']:
|
|
current_cred['credential_type'] = value
|
|
|
|
# Add last credential
|
|
if current_cred and current_cred.get('service_name'):
|
|
credentials.append(_finalize_credential(current_cred))
|
|
|
|
return credentials
|
|
|
|
|
|
def _parse_env_credentials(content: str) -> List[Dict]:
|
|
"""
|
|
Parse credentials from .env format.
|
|
|
|
Expected format:
|
|
```
|
|
DATABASE_URL=mysql://user:pass@host:3306/db
|
|
API_KEY=sk-1234567890
|
|
SECRET_TOKEN=abc123def456
|
|
```
|
|
"""
|
|
credentials = []
|
|
lines = content.split('\n')
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
|
|
# Skip comments and empty lines
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
# Parse KEY=value
|
|
if '=' not in line:
|
|
continue
|
|
|
|
key, value = line.split('=', 1)
|
|
key = key.strip()
|
|
value = value.strip().strip('"').strip("'")
|
|
|
|
if not value:
|
|
continue
|
|
|
|
# Create credential based on key pattern
|
|
cred = {
|
|
'service_name': key.replace('_', ' ').title(),
|
|
}
|
|
|
|
# Detect credential type from value
|
|
cred_type, field = _detect_credential_type(value)
|
|
cred['credential_type'] = cred_type
|
|
cred[field] = value
|
|
|
|
credentials.append(cred)
|
|
|
|
return credentials
|
|
|
|
|
|
def _parse_text_credentials(content: str) -> List[Dict]:
|
|
"""
|
|
Parse credentials from structured text format.
|
|
|
|
Similar to markdown but more flexible with delimiters.
|
|
"""
|
|
# Use markdown parser as fallback for text files
|
|
return _parse_markdown_credentials(content)
|
|
|
|
|
|
def _detect_credential_type(value: str) -> tuple[str, str]:
|
|
"""
|
|
Detect the type of credential based on its value pattern.
|
|
|
|
Returns:
|
|
tuple: (credential_type, field_name)
|
|
"""
|
|
# Check for SSH key
|
|
if re.match(SSH_KEY_PATTERN, value, re.MULTILINE):
|
|
return ('ssh_key', 'password') # Store in password field
|
|
|
|
# Check for API key patterns
|
|
for pattern in API_KEY_PATTERNS:
|
|
if re.match(pattern, value):
|
|
return ('api_key', 'api_key')
|
|
|
|
# Check for connection strings
|
|
for pattern in CONNECTION_STRING_PATTERNS:
|
|
if re.match(pattern, value, re.IGNORECASE):
|
|
return ('connection_string', 'connection_string')
|
|
|
|
# Check for JWT (basic heuristic: 3 base64 segments separated by dots)
|
|
if value.count('.') == 2 and len(value) > 50:
|
|
parts = value.split('.')
|
|
if all(len(p) > 10 for p in parts):
|
|
return ('jwt', 'token')
|
|
|
|
# Check for OAuth token (starts with common prefixes)
|
|
if value.startswith(('ya29.', 'ey', 'oauth')):
|
|
return ('oauth', 'token')
|
|
|
|
# Default to password
|
|
return ('password', 'password')
|
|
|
|
|
|
def _finalize_credential(cred: Dict) -> Dict:
|
|
"""
|
|
Finalize a credential dictionary by setting defaults and detecting types.
|
|
"""
|
|
# Auto-detect credential type if not specified
|
|
if 'credential_type' not in cred:
|
|
if 'api_key' in cred:
|
|
cred['credential_type'] = 'api_key'
|
|
elif 'token' in cred:
|
|
cred['credential_type'] = 'jwt'
|
|
elif 'client_secret' in cred:
|
|
cred['credential_type'] = 'oauth'
|
|
elif 'connection_string' in cred:
|
|
cred['credential_type'] = 'connection_string'
|
|
elif 'password' in cred:
|
|
cred['credential_type'] = 'password'
|
|
else:
|
|
cred['credential_type'] = 'password'
|
|
|
|
# Extract URL fields if present
|
|
if 'url' in cred:
|
|
url = cred.pop('url')
|
|
# Determine if internal or external based on IP pattern
|
|
if re.match(r'^(192\.168\.|10\.|172\.(1[6-9]|2[0-9]|3[01])\.)', url):
|
|
cred['internal_url'] = url
|
|
else:
|
|
cred['external_url'] = url
|
|
|
|
return cred
|
|
|
|
|
|
def import_credentials_to_db(
|
|
db: Session,
|
|
credentials: List[Dict],
|
|
client_id: Optional[str] = None,
|
|
user_id: str = "system_import",
|
|
ip_address: Optional[str] = None,
|
|
) -> int:
|
|
"""
|
|
Import credentials into the database using credential_service.
|
|
|
|
This function takes a list of credential dictionaries and imports them
|
|
into the database with automatic encryption. Each credential is passed
|
|
through the credential_service which handles:
|
|
- AES-256-GCM encryption of sensitive fields
|
|
- Audit log creation
|
|
- Proper database storage
|
|
|
|
Args:
|
|
db: SQLAlchemy database session
|
|
credentials: List of credential dictionaries from parse_credential_file()
|
|
client_id: Optional UUID string to associate credentials with a client
|
|
user_id: User ID for audit logging (default: "system_import")
|
|
ip_address: IP address for audit logging (optional)
|
|
|
|
Returns:
|
|
int: Count of successfully imported credentials
|
|
|
|
Example:
|
|
```python
|
|
from api.database import SessionLocal
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
files = scan_for_credential_files("C:/Projects")
|
|
for file_path in files:
|
|
creds = parse_credential_file(file_path)
|
|
count = import_credentials_to_db(db, creds, client_id="uuid-here")
|
|
print(f"Imported {count} credentials from {file_path}")
|
|
finally:
|
|
db.close()
|
|
```
|
|
|
|
Security:
|
|
- All sensitive fields automatically encrypted by credential_service
|
|
- Audit log entry created for each import
|
|
- Never logs plaintext credential values
|
|
- Uses existing encryption infrastructure
|
|
|
|
Raises:
|
|
Exception: If database operations fail (logged but not raised)
|
|
"""
|
|
imported_count = 0
|
|
|
|
logger.info(f"Starting import of {len(credentials)} credential(s)")
|
|
|
|
for cred_data in credentials:
|
|
try:
|
|
# Add client_id if provided
|
|
if client_id:
|
|
cred_data['client_id'] = client_id
|
|
|
|
# Create CredentialCreate schema object
|
|
credential_create = CredentialCreate(**cred_data)
|
|
|
|
# Import using credential_service (handles encryption and audit)
|
|
created_credential = create_credential(
|
|
db=db,
|
|
credential_data=credential_create,
|
|
user_id=user_id,
|
|
ip_address=ip_address,
|
|
user_agent="credential_scanner_import",
|
|
)
|
|
|
|
imported_count += 1
|
|
logger.info(
|
|
f"Imported credential: {created_credential.service_name} "
|
|
f"(ID: {created_credential.id})"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
f"Failed to import credential '{cred_data.get('service_name', 'Unknown')}': "
|
|
f"{str(e)}"
|
|
)
|
|
# Continue with next credential instead of failing entire import
|
|
continue
|
|
|
|
logger.info(
|
|
f"Import complete. Successfully imported {imported_count}/{len(credentials)} "
|
|
"credential(s)"
|
|
)
|
|
|
|
return imported_count
|
|
|
|
|
|
# Convenience function for full workflow
|
|
def scan_and_import_credentials(
|
|
base_path: str,
|
|
db: Session,
|
|
client_id: Optional[str] = None,
|
|
user_id: str = "system_import",
|
|
ip_address: Optional[str] = None,
|
|
) -> Dict[str, int]:
|
|
"""
|
|
Scan for credential files and import all found credentials.
|
|
|
|
This is a convenience function that combines scanning, parsing, and importing
|
|
in a single operation.
|
|
|
|
Args:
|
|
base_path: Root directory to scan
|
|
db: Database session
|
|
client_id: Optional client UUID to associate credentials with
|
|
user_id: User ID for audit logging
|
|
ip_address: IP address for audit logging
|
|
|
|
Returns:
|
|
Dict with summary statistics:
|
|
- files_found: Number of credential files found
|
|
- credentials_parsed: Total credentials parsed from all files
|
|
- credentials_imported: Number successfully imported to database
|
|
|
|
Example:
|
|
```python
|
|
from api.database import SessionLocal
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
results = scan_and_import_credentials(
|
|
"C:/Projects/MyClient",
|
|
db,
|
|
client_id="client-uuid-here"
|
|
)
|
|
print(f"Found {results['files_found']} files")
|
|
print(f"Imported {results['credentials_imported']} credentials")
|
|
finally:
|
|
db.close()
|
|
```
|
|
"""
|
|
# Scan for files
|
|
files = scan_for_credential_files(base_path)
|
|
|
|
total_parsed = 0
|
|
total_imported = 0
|
|
|
|
# Parse and import from each file
|
|
for file_path in files:
|
|
credentials = parse_credential_file(file_path)
|
|
total_parsed += len(credentials)
|
|
|
|
if credentials:
|
|
imported = import_credentials_to_db(
|
|
db=db,
|
|
credentials=credentials,
|
|
client_id=client_id,
|
|
user_id=user_id,
|
|
ip_address=ip_address,
|
|
)
|
|
total_imported += imported
|
|
|
|
return {
|
|
'files_found': len(files),
|
|
'credentials_parsed': total_parsed,
|
|
'credentials_imported': total_imported,
|
|
}
|