Files
claudetools/api/utils/context_compression.py
Mike Swanson fce1345a40 [Fix] Remove all emoji violations from code files
- Replaced emojis with ASCII text markers ([OK], [ERROR], [WARNING], etc.)
- Fixed 38+ violations across 20 files (7 Python, 6 shell scripts, 6 hooks, 1 API)
- All modified files pass syntax verification
- Conforms to CODING_GUIDELINES.md NO EMOJIS rule

Details:
- Python test files: check_record_counts.py, test_*.py (31 fixes)
- API utils: context_compression.py regex pattern updated
- Shell scripts: setup/test/install/upgrade scripts (64+ fixes)
- Hook scripts: task-complete, user-prompt-submit, sync-contexts (10 fixes)

Verification: All files pass syntax checks (python -m py_compile, bash -n)
Report: FIXES_APPLIED.md contains complete change log

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-17 13:06:33 -07:00

644 lines
20 KiB
Python

"""
Context Compression Utilities for ClaudeTools Context Recall System
Maximum information density, minimum token usage.
All functions designed for efficient context summarization and injection.
"""
import re
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Union
from collections import defaultdict
def compress_conversation_summary(
conversation: Union[str, List[Dict[str, str]]]
) -> Dict[str, Any]:
"""
Compress conversation into dense JSON structure with key points.
Args:
conversation: Raw conversation text or message list
[{role: str, content: str}, ...] or str
Returns:
Dense summary with phase, completed, in_progress, blockers, decisions, next
Example:
>>> msgs = [{"role": "user", "content": "Build auth system"}]
>>> compress_conversation_summary(msgs)
{
"phase": "api_development",
"completed": ["auth"],
"in_progress": None,
"blockers": [],
"decisions": [],
"next": []
}
"""
# Convert to text if list
if isinstance(conversation, list):
text = "\n".join([f"{msg.get('role', 'user')}: {msg.get('content', '')}"
for msg in conversation])
else:
text = conversation
text_lower = text.lower()
# Extract phase
phase = "unknown"
phase_keywords = {
"api_development": ["api", "endpoint", "fastapi", "route"],
"testing": ["test", "pytest", "unittest"],
"deployment": ["deploy", "docker", "production"],
"debugging": ["bug", "error", "fix", "debug"],
"design": ["design", "architecture", "plan"],
"integration": ["integrate", "connect", "third-party"]
}
for p, keywords in phase_keywords.items():
if any(kw in text_lower for kw in keywords):
phase = p
break
# Extract completed tasks
completed = []
completed_patterns = [
r"completed[:\s]+([^\n.]+)",
r"finished[:\s]+([^\n.]+)",
r"done[:\s]+([^\n.]+)",
r"\[OK\]\s*([^\n.]+)",
r"\[PASS\]\s*([^\n.]+)",
r"implemented[:\s]+([^\n.]+)"
]
for pattern in completed_patterns:
matches = re.findall(pattern, text_lower)
completed.extend([m.strip()[:50] for m in matches])
# Extract in-progress
in_progress = None
in_progress_patterns = [
r"in[- ]progress[:\s]+([^\n.]+)",
r"working on[:\s]+([^\n.]+)",
r"currently[:\s]+([^\n.]+)"
]
for pattern in in_progress_patterns:
match = re.search(pattern, text_lower)
if match:
in_progress = match.group(1).strip()[:50]
break
# Extract blockers
blockers = []
blocker_patterns = [
r"blocker[s]?[:\s]+([^\n.]+)",
r"blocked[:\s]+([^\n.]+)",
r"issue[s]?[:\s]+([^\n.]+)",
r"problem[s]?[:\s]+([^\n.]+)"
]
for pattern in blocker_patterns:
matches = re.findall(pattern, text_lower)
blockers.extend([m.strip()[:50] for m in matches])
# Extract decisions
decisions = extract_key_decisions(text)
# Extract next actions
next_actions = []
next_patterns = [
r"next[:\s]+([^\n.]+)",
r"todo[:\s]+([^\n.]+)",
r"will[:\s]+([^\n.]+)"
]
for pattern in next_patterns:
matches = re.findall(pattern, text_lower)
next_actions.extend([m.strip()[:50] for m in matches])
return {
"phase": phase,
"completed": list(set(completed))[:10], # Dedupe, limit
"in_progress": in_progress,
"blockers": list(set(blockers))[:5],
"decisions": decisions[:5],
"next": list(set(next_actions))[:10]
}
def create_context_snippet(
content: str,
snippet_type: str = "general",
importance: int = 5
) -> Dict[str, Any]:
"""
Create structured snippet with auto-extracted tags and relevance score.
Args:
content: Raw information (decision, pattern, lesson)
snippet_type: Type of snippet (decision, pattern, lesson, state)
importance: Manual importance 1-10, default 5
Returns:
Structured snippet with tags, relevance score, metadata
Example:
>>> create_context_snippet("Using FastAPI for async support", "decision")
{
"content": "Using FastAPI for async support",
"type": "decision",
"tags": ["fastapi", "async"],
"importance": 5,
"relevance_score": 5.0,
"created_at": "2026-01-16T...",
"usage_count": 0
}
"""
# Extract tags from content
tags = extract_tags_from_text(content)
# Add type-specific tag
if snippet_type not in tags:
tags.insert(0, snippet_type)
now = datetime.now(timezone.utc).isoformat()
snippet = {
"content": content[:500], # Limit content length
"type": snippet_type,
"tags": tags[:10], # Limit tags
"importance": max(1, min(10, importance)), # Clamp 1-10
"created_at": now,
"usage_count": 0,
"last_used": None
}
# Calculate initial relevance score
snippet["relevance_score"] = calculate_relevance_score(snippet)
return snippet
def compress_project_state(
project_details: Dict[str, Any],
current_work: str,
files_changed: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Compress project state into dense summary.
Args:
project_details: Dict with name, description, phase, etc.
current_work: Description of current work
files_changed: List of file paths that changed
Returns:
Dense project state with phase, progress, blockers, next actions
Example:
>>> compress_project_state(
... {"name": "ClaudeTools", "phase": "api_dev"},
... "Building auth endpoints",
... ["api/auth.py"]
... )
{
"project": "ClaudeTools",
"phase": "api_dev",
"progress": 0,
"current": "Building auth endpoints",
"files": ["api/auth.py"],
"blockers": [],
"next": []
}
"""
files_changed = files_changed or []
state = {
"project": project_details.get("name", "unknown")[:50],
"phase": project_details.get("phase", "unknown")[:30],
"progress": project_details.get("progress_pct", 0),
"current": current_work[:200], # Compress description
"files": compress_file_changes(files_changed),
"blockers": project_details.get("blockers", [])[:5],
"next": project_details.get("next_actions", [])[:10]
}
return state
def extract_key_decisions(text: str) -> List[Dict[str, str]]:
"""
Extract key decisions from conversation text.
Args:
text: Conversation text or work description
Returns:
Array of decision objects with decision, rationale, impact, timestamp
Example:
>>> extract_key_decisions("Decided to use FastAPI for async support")
[{
"decision": "use FastAPI",
"rationale": "async support",
"impact": "medium",
"timestamp": "2026-01-16T..."
}]
"""
decisions = []
text_lower = text.lower()
# Decision patterns
patterns = [
r"decid(?:ed|e)[:\s]+([^.\n]+?)(?:because|for|due to)[:\s]+([^.\n]+)",
r"chose[:\s]+([^.\n]+?)(?:because|for|due to)[:\s]+([^.\n]+)",
r"using[:\s]+([^.\n]+?)(?:because|for|due to)[:\s]+([^.\n]+)",
r"will use[:\s]+([^.\n]+?)(?:because|for|due to)[:\s]+([^.\n]+)"
]
for pattern in patterns:
matches = re.findall(pattern, text_lower)
for match in matches:
decision = match[0].strip()[:100]
rationale = match[1].strip()[:100]
# Estimate impact based on keywords
impact = "low"
high_impact_keywords = ["architecture", "database", "framework", "major"]
medium_impact_keywords = ["api", "endpoint", "feature", "integration"]
if any(kw in decision.lower() or kw in rationale.lower()
for kw in high_impact_keywords):
impact = "high"
elif any(kw in decision.lower() or kw in rationale.lower()
for kw in medium_impact_keywords):
impact = "medium"
decisions.append({
"decision": decision,
"rationale": rationale,
"impact": impact,
"timestamp": datetime.now(timezone.utc).isoformat()
})
return decisions
def calculate_relevance_score(
snippet: Dict[str, Any],
current_time: Optional[datetime] = None
) -> float:
"""
Calculate relevance score based on age, usage, tags, importance.
Args:
snippet: Snippet metadata with created_at, usage_count, importance, tags
current_time: Optional current time for testing, defaults to now
Returns:
Float score 0.0-10.0 (higher = more relevant)
Example:
>>> snippet = {
... "created_at": "2026-01-16T12:00:00Z",
... "usage_count": 5,
... "importance": 8,
... "tags": ["critical", "fastapi"]
... }
>>> calculate_relevance_score(snippet)
9.2
"""
if current_time is None:
current_time = datetime.now(timezone.utc)
# Parse created_at
try:
created_at = datetime.fromisoformat(snippet["created_at"].replace("Z", "+00:00"))
except (ValueError, KeyError):
created_at = current_time
# Base score from importance (0-10)
score = float(snippet.get("importance", 5))
# Time decay - lose 0.1 points per day, max -2.0
age_days = (current_time - created_at).total_seconds() / 86400
time_penalty = min(2.0, age_days * 0.1)
score -= time_penalty
# Usage boost - add 0.2 per use, max +2.0
usage_count = snippet.get("usage_count", 0)
usage_boost = min(2.0, usage_count * 0.2)
score += usage_boost
# Tag boost for important tags
important_tags = {"critical", "blocker", "decision", "architecture",
"security", "performance", "bug"}
tags = set(snippet.get("tags", []))
tag_boost = len(tags & important_tags) * 0.5 # 0.5 per important tag
score += tag_boost
# Recency boost if used recently
last_used = snippet.get("last_used")
if last_used:
try:
last_used_dt = datetime.fromisoformat(last_used.replace("Z", "+00:00"))
hours_since_use = (current_time - last_used_dt).total_seconds() / 3600
if hours_since_use < 24: # Used in last 24h
score += 1.0
except (ValueError, AttributeError):
pass
# Clamp to 0.0-10.0
return max(0.0, min(10.0, score))
def merge_contexts(contexts: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Merge multiple context objects into single deduplicated context.
Args:
contexts: List of context objects to merge
Returns:
Single merged context with deduplicated, most recent info
Example:
>>> ctx1 = {"phase": "api_dev", "completed": ["auth"]}
>>> ctx2 = {"phase": "api_dev", "completed": ["auth", "crud"]}
>>> merge_contexts([ctx1, ctx2])
{"phase": "api_dev", "completed": ["auth", "crud"], ...}
"""
if not contexts:
return {}
merged = {
"phase": None,
"completed": [],
"in_progress": None,
"blockers": [],
"decisions": [],
"next": [],
"files": [],
"tags": []
}
# Collect all items
completed_set = set()
blocker_set = set()
next_set = set()
files_set = set()
tags_set = set()
decisions_list = []
for ctx in contexts:
# Take most recent phase
if ctx.get("phase") and not merged["phase"]:
merged["phase"] = ctx["phase"]
# Take most recent in_progress
if ctx.get("in_progress"):
merged["in_progress"] = ctx["in_progress"]
# Collect completed
for item in ctx.get("completed", []):
if isinstance(item, str):
completed_set.add(item)
# Collect blockers
for item in ctx.get("blockers", []):
if isinstance(item, str):
blocker_set.add(item)
# Collect next actions
for item in ctx.get("next", []):
if isinstance(item, str):
next_set.add(item)
# Collect files
for item in ctx.get("files", []):
if isinstance(item, str):
files_set.add(item)
elif isinstance(item, dict) and "path" in item:
files_set.add(item["path"])
# Collect tags
for item in ctx.get("tags", []):
if isinstance(item, str):
tags_set.add(item)
# Collect decisions (keep all with timestamps)
for decision in ctx.get("decisions", []):
if isinstance(decision, dict):
decisions_list.append(decision)
# Sort decisions by timestamp (most recent first)
decisions_list.sort(
key=lambda d: d.get("timestamp", ""),
reverse=True
)
merged["completed"] = sorted(list(completed_set))[:20]
merged["blockers"] = sorted(list(blocker_set))[:10]
merged["next"] = sorted(list(next_set))[:20]
merged["files"] = sorted(list(files_set))[:30]
merged["tags"] = sorted(list(tags_set))[:20]
merged["decisions"] = decisions_list[:10]
return merged
def format_for_injection(
contexts: List[Dict[str, Any]],
max_tokens: int = 1000
) -> str:
"""
Format context objects for token-efficient prompt injection.
Args:
contexts: List of context objects from database (sorted by relevance)
max_tokens: Approximate max tokens to use (rough estimate)
Returns:
Token-efficient markdown string for Claude prompt
Example:
>>> contexts = [{"content": "Use FastAPI", "tags": ["api"]}]
>>> format_for_injection(contexts)
"## Context Recall\\n\\n- Use FastAPI [api]\\n"
"""
if not contexts:
return ""
lines = ["## Context Recall\n"]
# Estimate ~4 chars per token
max_chars = max_tokens * 4
current_chars = len(lines[0])
# Group by type
by_type = defaultdict(list)
for ctx in contexts:
ctx_type = ctx.get("type", "general")
by_type[ctx_type].append(ctx)
# Priority order for types
type_priority = ["blocker", "decision", "state", "pattern", "lesson", "general"]
for ctx_type in type_priority:
if ctx_type not in by_type:
continue
# Add type header
header = f"\n**{ctx_type.title()}s:**\n"
if current_chars + len(header) > max_chars:
break
lines.append(header)
current_chars += len(header)
# Add contexts of this type
for ctx in by_type[ctx_type][:5]: # Max 5 per type
content = ctx.get("content", "")
tags = ctx.get("tags", [])
# Format with tags
tag_str = f" [{', '.join(tags[:3])}]" if tags else ""
line = f"- {content[:150]}{tag_str}\n"
if current_chars + len(line) > max_chars:
break
lines.append(line)
current_chars += len(line)
# Add summary stats
summary = f"\n*{len(contexts)} contexts loaded*\n"
if current_chars + len(summary) <= max_chars:
lines.append(summary)
return "".join(lines)
def extract_tags_from_text(text: str) -> List[str]:
"""
Auto-detect relevant tags from text content.
Args:
text: Content to extract tags from
Returns:
List of detected tags (technologies, patterns, categories)
Example:
>>> extract_tags_from_text("Using FastAPI with PostgreSQL")
["fastapi", "postgresql", "api", "database"]
"""
text_lower = text.lower()
tags = []
# Technology keywords
tech_keywords = {
"fastapi": ["fastapi"],
"postgresql": ["postgresql", "postgres", "psql"],
"sqlalchemy": ["sqlalchemy", "orm"],
"alembic": ["alembic", "migration"],
"docker": ["docker", "container"],
"redis": ["redis", "cache"],
"nginx": ["nginx", "reverse proxy"],
"python": ["python", "py"],
"javascript": ["javascript", "js", "node"],
"typescript": ["typescript", "ts"],
"react": ["react", "jsx"],
"vue": ["vue"],
"api": ["api", "endpoint", "rest"],
"database": ["database", "db", "sql"],
"auth": ["auth", "authentication", "authorization"],
"security": ["security", "encryption", "secure"],
"testing": ["test", "pytest", "unittest"],
"deployment": ["deploy", "deployment", "production"]
}
for tag, keywords in tech_keywords.items():
if any(kw in text_lower for kw in keywords):
tags.append(tag)
# Pattern keywords
pattern_keywords = {
"async": ["async", "asynchronous", "await"],
"crud": ["crud", "create", "read", "update", "delete"],
"middleware": ["middleware"],
"dependency-injection": ["dependency injection", "depends"],
"error-handling": ["error", "exception", "try", "catch"],
"validation": ["validation", "validate", "pydantic"],
"optimization": ["optimize", "performance", "speed"],
"refactor": ["refactor", "refactoring", "cleanup"]
}
for tag, keywords in pattern_keywords.items():
if any(kw in text_lower for kw in keywords):
tags.append(tag)
# Category keywords
category_keywords = {
"critical": ["critical", "urgent", "important"],
"blocker": ["blocker", "blocked", "blocking"],
"bug": ["bug", "error", "issue", "problem"],
"feature": ["feature", "enhancement", "add"],
"architecture": ["architecture", "design", "structure"],
"integration": ["integration", "integrate", "connect"]
}
for tag, keywords in category_keywords.items():
if any(kw in text_lower for kw in keywords):
tags.append(tag)
# Deduplicate and return
return list(dict.fromkeys(tags)) # Preserves order
def compress_file_changes(file_paths: List[str]) -> List[Dict[str, str]]:
"""
Compress file change list into brief summaries.
Args:
file_paths: List of file paths that changed
Returns:
Compressed summary with path and inferred change type
Example:
>>> compress_file_changes(["api/auth.py", "tests/test_auth.py"])
[
{"path": "api/auth.py", "type": "impl"},
{"path": "tests/test_auth.py", "type": "test"}
]
"""
compressed = []
for path in file_paths[:50]: # Limit to 50 files
# Infer change type from path
change_type = "other"
path_lower = path.lower()
if "test" in path_lower:
change_type = "test"
elif any(ext in path_lower for ext in [".py", ".js", ".ts", ".go", ".java"]):
if "migration" in path_lower:
change_type = "migration"
elif "config" in path_lower or path_lower.endswith((".yaml", ".yml", ".json", ".toml")):
change_type = "config"
elif "model" in path_lower or "schema" in path_lower:
change_type = "schema"
elif "api" in path_lower or "endpoint" in path_lower or "route" in path_lower:
change_type = "api"
else:
change_type = "impl"
elif path_lower.endswith((".md", ".txt", ".rst")):
change_type = "doc"
elif "docker" in path_lower or "deploy" in path_lower:
change_type = "infra"
compressed.append({
"path": path,
"type": change_type
})
return compressed