""" Context Compression Utilities for ClaudeTools Context Recall System Maximum information density, minimum token usage. All functions designed for efficient context summarization and injection. """ import re from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Union from collections import defaultdict def compress_conversation_summary( conversation: Union[str, List[Dict[str, str]]] ) -> Dict[str, Any]: """ Compress conversation into dense JSON structure with key points. Args: conversation: Raw conversation text or message list [{role: str, content: str}, ...] or str Returns: Dense summary with phase, completed, in_progress, blockers, decisions, next Example: >>> msgs = [{"role": "user", "content": "Build auth system"}] >>> compress_conversation_summary(msgs) { "phase": "api_development", "completed": ["auth"], "in_progress": None, "blockers": [], "decisions": [], "next": [] } """ # Convert to text if list if isinstance(conversation, list): text = "\n".join([f"{msg.get('role', 'user')}: {msg.get('content', '')}" for msg in conversation]) else: text = conversation text_lower = text.lower() # Extract phase phase = "unknown" phase_keywords = { "api_development": ["api", "endpoint", "fastapi", "route"], "testing": ["test", "pytest", "unittest"], "deployment": ["deploy", "docker", "production"], "debugging": ["bug", "error", "fix", "debug"], "design": ["design", "architecture", "plan"], "integration": ["integrate", "connect", "third-party"] } for p, keywords in phase_keywords.items(): if any(kw in text_lower for kw in keywords): phase = p break # Extract completed tasks completed = [] completed_patterns = [ r"completed[:\s]+([^\n.]+)", r"finished[:\s]+([^\n.]+)", r"done[:\s]+([^\n.]+)", r"✓\s*([^\n.]+)", r"implemented[:\s]+([^\n.]+)" ] for pattern in completed_patterns: matches = re.findall(pattern, text_lower) completed.extend([m.strip()[:50] for m in matches]) # Extract in-progress in_progress = None in_progress_patterns = [ r"in[- ]progress[:\s]+([^\n.]+)", r"working on[:\s]+([^\n.]+)", r"currently[:\s]+([^\n.]+)" ] for pattern in in_progress_patterns: match = re.search(pattern, text_lower) if match: in_progress = match.group(1).strip()[:50] break # Extract blockers blockers = [] blocker_patterns = [ r"blocker[s]?[:\s]+([^\n.]+)", r"blocked[:\s]+([^\n.]+)", r"issue[s]?[:\s]+([^\n.]+)", r"problem[s]?[:\s]+([^\n.]+)" ] for pattern in blocker_patterns: matches = re.findall(pattern, text_lower) blockers.extend([m.strip()[:50] for m in matches]) # Extract decisions decisions = extract_key_decisions(text) # Extract next actions next_actions = [] next_patterns = [ r"next[:\s]+([^\n.]+)", r"todo[:\s]+([^\n.]+)", r"will[:\s]+([^\n.]+)" ] for pattern in next_patterns: matches = re.findall(pattern, text_lower) next_actions.extend([m.strip()[:50] for m in matches]) return { "phase": phase, "completed": list(set(completed))[:10], # Dedupe, limit "in_progress": in_progress, "blockers": list(set(blockers))[:5], "decisions": decisions[:5], "next": list(set(next_actions))[:10] } def create_context_snippet( content: str, snippet_type: str = "general", importance: int = 5 ) -> Dict[str, Any]: """ Create structured snippet with auto-extracted tags and relevance score. Args: content: Raw information (decision, pattern, lesson) snippet_type: Type of snippet (decision, pattern, lesson, state) importance: Manual importance 1-10, default 5 Returns: Structured snippet with tags, relevance score, metadata Example: >>> create_context_snippet("Using FastAPI for async support", "decision") { "content": "Using FastAPI for async support", "type": "decision", "tags": ["fastapi", "async"], "importance": 5, "relevance_score": 5.0, "created_at": "2026-01-16T...", "usage_count": 0 } """ # Extract tags from content tags = extract_tags_from_text(content) # Add type-specific tag if snippet_type not in tags: tags.insert(0, snippet_type) now = datetime.now(timezone.utc).isoformat() snippet = { "content": content[:500], # Limit content length "type": snippet_type, "tags": tags[:10], # Limit tags "importance": max(1, min(10, importance)), # Clamp 1-10 "created_at": now, "usage_count": 0, "last_used": None } # Calculate initial relevance score snippet["relevance_score"] = calculate_relevance_score(snippet) return snippet def compress_project_state( project_details: Dict[str, Any], current_work: str, files_changed: Optional[List[str]] = None ) -> Dict[str, Any]: """ Compress project state into dense summary. Args: project_details: Dict with name, description, phase, etc. current_work: Description of current work files_changed: List of file paths that changed Returns: Dense project state with phase, progress, blockers, next actions Example: >>> compress_project_state( ... {"name": "ClaudeTools", "phase": "api_dev"}, ... "Building auth endpoints", ... ["api/auth.py"] ... ) { "project": "ClaudeTools", "phase": "api_dev", "progress": 0, "current": "Building auth endpoints", "files": ["api/auth.py"], "blockers": [], "next": [] } """ files_changed = files_changed or [] state = { "project": project_details.get("name", "unknown")[:50], "phase": project_details.get("phase", "unknown")[:30], "progress": project_details.get("progress_pct", 0), "current": current_work[:200], # Compress description "files": compress_file_changes(files_changed), "blockers": project_details.get("blockers", [])[:5], "next": project_details.get("next_actions", [])[:10] } return state def extract_key_decisions(text: str) -> List[Dict[str, str]]: """ Extract key decisions from conversation text. Args: text: Conversation text or work description Returns: Array of decision objects with decision, rationale, impact, timestamp Example: >>> extract_key_decisions("Decided to use FastAPI for async support") [{ "decision": "use FastAPI", "rationale": "async support", "impact": "medium", "timestamp": "2026-01-16T..." }] """ decisions = [] text_lower = text.lower() # Decision patterns patterns = [ r"decid(?:ed|e)[:\s]+([^.\n]+?)(?:because|for|due to)[:\s]+([^.\n]+)", r"chose[:\s]+([^.\n]+?)(?:because|for|due to)[:\s]+([^.\n]+)", r"using[:\s]+([^.\n]+?)(?:because|for|due to)[:\s]+([^.\n]+)", r"will use[:\s]+([^.\n]+?)(?:because|for|due to)[:\s]+([^.\n]+)" ] for pattern in patterns: matches = re.findall(pattern, text_lower) for match in matches: decision = match[0].strip()[:100] rationale = match[1].strip()[:100] # Estimate impact based on keywords impact = "low" high_impact_keywords = ["architecture", "database", "framework", "major"] medium_impact_keywords = ["api", "endpoint", "feature", "integration"] if any(kw in decision.lower() or kw in rationale.lower() for kw in high_impact_keywords): impact = "high" elif any(kw in decision.lower() or kw in rationale.lower() for kw in medium_impact_keywords): impact = "medium" decisions.append({ "decision": decision, "rationale": rationale, "impact": impact, "timestamp": datetime.now(timezone.utc).isoformat() }) return decisions def calculate_relevance_score( snippet: Dict[str, Any], current_time: Optional[datetime] = None ) -> float: """ Calculate relevance score based on age, usage, tags, importance. Args: snippet: Snippet metadata with created_at, usage_count, importance, tags current_time: Optional current time for testing, defaults to now Returns: Float score 0.0-10.0 (higher = more relevant) Example: >>> snippet = { ... "created_at": "2026-01-16T12:00:00Z", ... "usage_count": 5, ... "importance": 8, ... "tags": ["critical", "fastapi"] ... } >>> calculate_relevance_score(snippet) 9.2 """ if current_time is None: current_time = datetime.now(timezone.utc) # Parse created_at try: created_at = datetime.fromisoformat(snippet["created_at"].replace("Z", "+00:00")) except (ValueError, KeyError): created_at = current_time # Base score from importance (0-10) score = float(snippet.get("importance", 5)) # Time decay - lose 0.1 points per day, max -2.0 age_days = (current_time - created_at).total_seconds() / 86400 time_penalty = min(2.0, age_days * 0.1) score -= time_penalty # Usage boost - add 0.2 per use, max +2.0 usage_count = snippet.get("usage_count", 0) usage_boost = min(2.0, usage_count * 0.2) score += usage_boost # Tag boost for important tags important_tags = {"critical", "blocker", "decision", "architecture", "security", "performance", "bug"} tags = set(snippet.get("tags", [])) tag_boost = len(tags & important_tags) * 0.5 # 0.5 per important tag score += tag_boost # Recency boost if used recently last_used = snippet.get("last_used") if last_used: try: last_used_dt = datetime.fromisoformat(last_used.replace("Z", "+00:00")) hours_since_use = (current_time - last_used_dt).total_seconds() / 3600 if hours_since_use < 24: # Used in last 24h score += 1.0 except (ValueError, AttributeError): pass # Clamp to 0.0-10.0 return max(0.0, min(10.0, score)) def merge_contexts(contexts: List[Dict[str, Any]]) -> Dict[str, Any]: """ Merge multiple context objects into single deduplicated context. Args: contexts: List of context objects to merge Returns: Single merged context with deduplicated, most recent info Example: >>> ctx1 = {"phase": "api_dev", "completed": ["auth"]} >>> ctx2 = {"phase": "api_dev", "completed": ["auth", "crud"]} >>> merge_contexts([ctx1, ctx2]) {"phase": "api_dev", "completed": ["auth", "crud"], ...} """ if not contexts: return {} merged = { "phase": None, "completed": [], "in_progress": None, "blockers": [], "decisions": [], "next": [], "files": [], "tags": [] } # Collect all items completed_set = set() blocker_set = set() next_set = set() files_set = set() tags_set = set() decisions_list = [] for ctx in contexts: # Take most recent phase if ctx.get("phase") and not merged["phase"]: merged["phase"] = ctx["phase"] # Take most recent in_progress if ctx.get("in_progress"): merged["in_progress"] = ctx["in_progress"] # Collect completed for item in ctx.get("completed", []): if isinstance(item, str): completed_set.add(item) # Collect blockers for item in ctx.get("blockers", []): if isinstance(item, str): blocker_set.add(item) # Collect next actions for item in ctx.get("next", []): if isinstance(item, str): next_set.add(item) # Collect files for item in ctx.get("files", []): if isinstance(item, str): files_set.add(item) elif isinstance(item, dict) and "path" in item: files_set.add(item["path"]) # Collect tags for item in ctx.get("tags", []): if isinstance(item, str): tags_set.add(item) # Collect decisions (keep all with timestamps) for decision in ctx.get("decisions", []): if isinstance(decision, dict): decisions_list.append(decision) # Sort decisions by timestamp (most recent first) decisions_list.sort( key=lambda d: d.get("timestamp", ""), reverse=True ) merged["completed"] = sorted(list(completed_set))[:20] merged["blockers"] = sorted(list(blocker_set))[:10] merged["next"] = sorted(list(next_set))[:20] merged["files"] = sorted(list(files_set))[:30] merged["tags"] = sorted(list(tags_set))[:20] merged["decisions"] = decisions_list[:10] return merged def format_for_injection( contexts: List[Dict[str, Any]], max_tokens: int = 1000 ) -> str: """ Format context objects for token-efficient prompt injection. Args: contexts: List of context objects from database (sorted by relevance) max_tokens: Approximate max tokens to use (rough estimate) Returns: Token-efficient markdown string for Claude prompt Example: >>> contexts = [{"content": "Use FastAPI", "tags": ["api"]}] >>> format_for_injection(contexts) "## Context Recall\\n\\n- Use FastAPI [api]\\n" """ if not contexts: return "" lines = ["## Context Recall\n"] # Estimate ~4 chars per token max_chars = max_tokens * 4 current_chars = len(lines[0]) # Group by type by_type = defaultdict(list) for ctx in contexts: ctx_type = ctx.get("type", "general") by_type[ctx_type].append(ctx) # Priority order for types type_priority = ["blocker", "decision", "state", "pattern", "lesson", "general"] for ctx_type in type_priority: if ctx_type not in by_type: continue # Add type header header = f"\n**{ctx_type.title()}s:**\n" if current_chars + len(header) > max_chars: break lines.append(header) current_chars += len(header) # Add contexts of this type for ctx in by_type[ctx_type][:5]: # Max 5 per type content = ctx.get("content", "") tags = ctx.get("tags", []) # Format with tags tag_str = f" [{', '.join(tags[:3])}]" if tags else "" line = f"- {content[:150]}{tag_str}\n" if current_chars + len(line) > max_chars: break lines.append(line) current_chars += len(line) # Add summary stats summary = f"\n*{len(contexts)} contexts loaded*\n" if current_chars + len(summary) <= max_chars: lines.append(summary) return "".join(lines) def extract_tags_from_text(text: str) -> List[str]: """ Auto-detect relevant tags from text content. Args: text: Content to extract tags from Returns: List of detected tags (technologies, patterns, categories) Example: >>> extract_tags_from_text("Using FastAPI with PostgreSQL") ["fastapi", "postgresql", "api", "database"] """ text_lower = text.lower() tags = [] # Technology keywords tech_keywords = { "fastapi": ["fastapi"], "postgresql": ["postgresql", "postgres", "psql"], "sqlalchemy": ["sqlalchemy", "orm"], "alembic": ["alembic", "migration"], "docker": ["docker", "container"], "redis": ["redis", "cache"], "nginx": ["nginx", "reverse proxy"], "python": ["python", "py"], "javascript": ["javascript", "js", "node"], "typescript": ["typescript", "ts"], "react": ["react", "jsx"], "vue": ["vue"], "api": ["api", "endpoint", "rest"], "database": ["database", "db", "sql"], "auth": ["auth", "authentication", "authorization"], "security": ["security", "encryption", "secure"], "testing": ["test", "pytest", "unittest"], "deployment": ["deploy", "deployment", "production"] } for tag, keywords in tech_keywords.items(): if any(kw in text_lower for kw in keywords): tags.append(tag) # Pattern keywords pattern_keywords = { "async": ["async", "asynchronous", "await"], "crud": ["crud", "create", "read", "update", "delete"], "middleware": ["middleware"], "dependency-injection": ["dependency injection", "depends"], "error-handling": ["error", "exception", "try", "catch"], "validation": ["validation", "validate", "pydantic"], "optimization": ["optimize", "performance", "speed"], "refactor": ["refactor", "refactoring", "cleanup"] } for tag, keywords in pattern_keywords.items(): if any(kw in text_lower for kw in keywords): tags.append(tag) # Category keywords category_keywords = { "critical": ["critical", "urgent", "important"], "blocker": ["blocker", "blocked", "blocking"], "bug": ["bug", "error", "issue", "problem"], "feature": ["feature", "enhancement", "add"], "architecture": ["architecture", "design", "structure"], "integration": ["integration", "integrate", "connect"] } for tag, keywords in category_keywords.items(): if any(kw in text_lower for kw in keywords): tags.append(tag) # Deduplicate and return return list(dict.fromkeys(tags)) # Preserves order def compress_file_changes(file_paths: List[str]) -> List[Dict[str, str]]: """ Compress file change list into brief summaries. Args: file_paths: List of file paths that changed Returns: Compressed summary with path and inferred change type Example: >>> compress_file_changes(["api/auth.py", "tests/test_auth.py"]) [ {"path": "api/auth.py", "type": "impl"}, {"path": "tests/test_auth.py", "type": "test"} ] """ compressed = [] for path in file_paths[:50]: # Limit to 50 files # Infer change type from path change_type = "other" path_lower = path.lower() if "test" in path_lower: change_type = "test" elif any(ext in path_lower for ext in [".py", ".js", ".ts", ".go", ".java"]): if "migration" in path_lower: change_type = "migration" elif "config" in path_lower or path_lower.endswith((".yaml", ".yml", ".json", ".toml")): change_type = "config" elif "model" in path_lower or "schema" in path_lower: change_type = "schema" elif "api" in path_lower or "endpoint" in path_lower or "route" in path_lower: change_type = "api" else: change_type = "impl" elif path_lower.endswith((".md", ".txt", ".rst")): change_type = "doc" elif "docker" in path_lower or "deploy" in path_lower: change_type = "infra" compressed.append({ "path": path, "type": change_type }) return compressed