""" Conversation Transcript Parser and Intelligent Categorizer for ClaudeTools Parses conversation files from Claude Desktop/Code sessions and categorizes them into MSP Work, Development, or General categories with intelligent context extraction. """ import json import os import re from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Union try: from .context_compression import ( compress_conversation_summary, extract_key_decisions, extract_tags_from_text, ) except ImportError: # Fallback for standalone execution from context_compression import ( compress_conversation_summary, extract_key_decisions, extract_tags_from_text, ) def parse_jsonl_conversation(file_path: str) -> Dict[str, Any]: """ Parse .jsonl conversation file and return structured conversation data. Supports both .jsonl (line-delimited JSON) and .json formats. Extracts messages, timestamps, file paths, tool calls, and metadata. Args: file_path: Path to .jsonl or .json conversation file Returns: Dict with structure: { "messages": [{"role": str, "content": str, "timestamp": str}, ...], "metadata": {"title": str, "model": str, "created_at": str, ...}, "file_paths": [str, ...], "tool_calls": [{"tool": str, "count": int}, ...], "duration_seconds": int, "message_count": int } Example: >>> data = parse_jsonl_conversation("/path/to/conversation.jsonl") >>> data["message_count"] 15 >>> data["metadata"]["title"] "Build authentication system" """ if not os.path.exists(file_path): raise FileNotFoundError(f"Conversation file not found: {file_path}") messages = [] metadata = {} file_paths = set() tool_calls = {} file_ext = os.path.splitext(file_path)[1].lower() try: if file_ext == ".jsonl": # Parse line-delimited JSON with open(file_path, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: entry = json.loads(line) _process_conversation_entry( entry, messages, metadata, file_paths, tool_calls ) except json.JSONDecodeError as e: print(f"Warning: Invalid JSON on line {line_num}: {e}") continue elif file_ext == ".json": # Parse regular JSON file with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) # Handle different JSON structures if isinstance(data, dict): # Single conversation object _process_conversation_entry( data, messages, metadata, file_paths, tool_calls ) # Check for nested messages array if "messages" in data and isinstance(data["messages"], list): for msg in data["messages"]: _process_conversation_entry( msg, messages, metadata, file_paths, tool_calls ) elif isinstance(data, list): # Array of message objects for entry in data: _process_conversation_entry( entry, messages, metadata, file_paths, tool_calls ) else: raise ValueError(f"Unsupported file format: {file_ext}") except Exception as e: raise ValueError(f"Failed to parse conversation file: {e}") # Calculate duration duration_seconds = 0 if messages and len(messages) >= 2: try: first_ts = _parse_timestamp(messages[0].get("timestamp")) last_ts = _parse_timestamp(messages[-1].get("timestamp")) if first_ts and last_ts: duration_seconds = int((last_ts - first_ts).total_seconds()) except Exception: pass # Sort tool calls by count tool_calls_list = [ {"tool": tool, "count": count} for tool, count in sorted( tool_calls.items(), key=lambda x: x[1], reverse=True ) ] return { "messages": messages, "metadata": metadata, "file_paths": sorted(list(file_paths)), "tool_calls": tool_calls_list[:10], # Top 10 tools "duration_seconds": duration_seconds, "message_count": len(messages) } def _process_conversation_entry( entry: Dict[str, Any], messages: List[Dict], metadata: Dict, file_paths: set, tool_calls: Dict[str, int] ) -> None: """ Process a single conversation entry and extract relevant data. Internal helper function to parse different JSON structures. """ # Extract metadata fields metadata_fields = [ "title", "model", "sessionId", "cwd", "createdAt", "lastActivityAt", "isArchived", "conversation_id" ] for field in metadata_fields: if field in entry and field not in metadata: metadata[field] = entry[field] # Extract message content role = entry.get("role") or entry.get("sender") or "unknown" content = entry.get("content") or entry.get("text") or entry.get("message") or "" timestamp = entry.get("timestamp") or entry.get("createdAt") or entry.get("time") if content and isinstance(content, str) and len(content.strip()) > 0: messages.append({ "role": role, "content": content.strip(), "timestamp": timestamp }) # Extract file paths from content _extract_file_paths_from_text(content, file_paths) # Extract tool calls _extract_tool_calls_from_text(content, tool_calls) # Check for nested content structures if "parts" in entry and isinstance(entry["parts"], list): for part in entry["parts"]: if isinstance(part, dict): _process_conversation_entry( part, messages, metadata, file_paths, tool_calls ) # Check for tool use in structured format if "tool_use" in entry: tool_name = entry["tool_use"].get("name") or entry["tool_use"].get("tool") if tool_name: tool_calls[tool_name] = tool_calls.get(tool_name, 0) + 1 def _extract_file_paths_from_text(text: str, file_paths: set) -> None: """Extract file paths from text content.""" # Match common file path patterns patterns = [ r'["\']([a-zA-Z]:[/\\](?:[^"\'<>|\r\n]+))["\']', # Windows absolute r'["\'](/[^"\'<>|\r\n]+)["\']', # Unix absolute r'["\'](\./[^"\'<>|\r\n]+)["\']', # Relative r'["\'](\.\./[^"\'<>|\r\n]+)["\']', # Parent relative r'file_path["\s:=]+["\']([^"\']+)["\']', # file_path parameter r'(?:api|src|tests?|migrations?)/[a-z0-9_/]+\.(?:py|js|ts|json|yaml|yml)', # Code paths ] for pattern in patterns: matches = re.findall(pattern, text, re.IGNORECASE) for match in matches: # Clean and validate path = match.strip() if len(path) > 3 and not path.startswith("http"): file_paths.add(path) def _extract_tool_calls_from_text(text: str, tool_calls: Dict[str, int]) -> None: """Extract tool usage from text content.""" # Match tool invocation patterns patterns = [ r'', # XML-style tool calls r'Tool: (\w+)', # Explicit tool mentions r'Using (\w+) tool', # Natural language tool mentions r'Called? (\w+)\(', # Function call style ] for pattern in patterns: matches = re.findall(pattern, text, re.IGNORECASE) for match in matches: tool_name = match.strip().lower() if len(tool_name) > 2: tool_calls[tool_name] = tool_calls.get(tool_name, 0) + 1 def _parse_timestamp(timestamp: Union[str, int, float, None]) -> Optional[datetime]: """Parse various timestamp formats to datetime object.""" if timestamp is None: return None try: # Unix timestamp (milliseconds) if isinstance(timestamp, (int, float)): if timestamp > 10000000000: # Milliseconds return datetime.fromtimestamp(timestamp / 1000, tz=timezone.utc) else: # Seconds return datetime.fromtimestamp(timestamp, tz=timezone.utc) # ISO format string if isinstance(timestamp, str): # Try ISO format with Z if timestamp.endswith("Z"): return datetime.fromisoformat(timestamp.replace("Z", "+00:00")) # Try ISO format return datetime.fromisoformat(timestamp) except Exception: pass return None def categorize_conversation(messages: List[Dict[str, str]]) -> str: """ Analyze conversation content and classify as 'msp', 'development', or 'general'. Uses keyword analysis to determine the primary category of the conversation. Args: messages: List of message dicts with 'role' and 'content' keys Returns: Category string: 'msp', 'development', or 'general' Example: >>> messages = [{"role": "user", "content": "Fix client firewall issue"}] >>> categorize_conversation(messages) 'msp' >>> messages = [{"role": "user", "content": "Build API endpoint"}] >>> categorize_conversation(messages) 'development' """ # Combine all message content full_text = " ".join([msg.get("content", "") for msg in messages]) text_lower = full_text.lower() # Category keywords with weights msp_keywords = { # Client/customer terms "client": 3, "customer": 3, "site": 2, "tenant": 2, # Infrastructure "infrastructure": 3, "server": 2, "network": 2, "firewall": 3, "dns": 2, "vpn": 2, "router": 2, "switch": 2, "backup": 2, # Services "support": 2, "ticket": 3, "incident": 2, "outage": 3, "billable": 3, "invoice": 2, "billing": 2, # Microsoft/cloud services "365": 2, "office365": 2, "azure": 2, "exchange": 2, "sharepoint": 2, "teams": 2, "intune": 2, "entra": 2, # Security "phishing": 2, "breach": 3, "compromise": 3, "vulnerability": 2, # MSP specific "msp": 4, "managed service": 4, "service desk": 3, "rds": 2, "terminal server": 2, "citrix": 2, } dev_keywords = { # API/Backend "api": 3, "endpoint": 3, "route": 2, "fastapi": 4, "flask": 3, "rest": 2, "graphql": 2, "webhook": 2, # Database "database": 3, "migration": 3, "alembic": 3, "sqlalchemy": 3, "postgresql": 3, "mysql": 2, "redis": 2, "mongodb": 2, # Code "implement": 2, "refactor": 2, "debug": 2, "test": 2, "pytest": 3, "unittest": 2, "code": 2, "function": 2, "class": 2, "module": 2, "package": 2, # Development "feature": 2, "bug": 2, "commit": 2, "pull request": 2, "repository": 2, "github": 2, "git": 2, # Frontend "react": 3, "vue": 3, "component": 2, "frontend": 2, "ui": 2, "ux": 2, "design": 1, # Tools "docker": 2, "container": 2, "kubernetes": 2, "ci/cd": 2, "deployment": 2, "pipeline": 2, } # Count weighted keyword matches msp_score = sum( weight for keyword, weight in msp_keywords.items() if keyword in text_lower ) dev_score = sum( weight for keyword, weight in dev_keywords.items() if keyword in text_lower ) # Additional heuristics # Check for code patterns (increases dev score) code_patterns = [ r'def \w+\(', # Python function r'class \w+[:\(]', # Python class r'async def ', # Async function r'import \w+', # Import statement r'from \w+ import', # From import r'```(?:python|javascript|typescript|sql)', # Code blocks r'\.py|\.js|\.ts|\.go|\.java', # File extensions ] for pattern in code_patterns: if re.search(pattern, full_text, re.IGNORECASE): dev_score += 2 # Check for MSP ticket/incident patterns ticket_patterns = [ r'ticket[:\s#]+\d+', r'incident[:\s#]+\d+', r'case[:\s#]+\d+', r'user reported', r'customer reported', ] for pattern in ticket_patterns: if re.search(pattern, text_lower): msp_score += 3 # Decision logic threshold = 5 # Minimum score to be confident if msp_score >= threshold and msp_score > dev_score: return "msp" elif dev_score >= threshold and dev_score > msp_score: return "development" else: return "general" def extract_context_from_conversation(conversation: Dict[str, Any]) -> Dict[str, Any]: """ Extract dense context suitable for database storage. Combines message content, categorization, and compression to create a rich context object ready for database insertion. Args: conversation: Parsed conversation dict from parse_jsonl_conversation() Returns: Compressed context dict with: { "category": str, "summary": Dict (from compress_conversation_summary), "tags": List[str], "decisions": List[Dict], "key_files": List[str], "key_tools": List[str], "metrics": Dict, "raw_metadata": Dict } Example: >>> conversation = parse_jsonl_conversation("/path/to/file.jsonl") >>> context = extract_context_from_conversation(conversation) >>> context["category"] 'development' >>> context["tags"] ['api', 'fastapi', 'database', 'migration'] """ messages = conversation.get("messages", []) metadata = conversation.get("metadata", {}) # Categorize conversation category = categorize_conversation(messages) # Compress conversation using existing utility summary = compress_conversation_summary(messages) # Extract full text for tag and decision extraction full_text = " ".join([msg.get("content", "") for msg in messages]) # Extract tags tags = extract_tags_from_text(full_text) # Add category as a tag if category not in tags: tags.insert(0, category) # Extract decisions decisions = extract_key_decisions(full_text) # Get key file paths (most mentioned) file_paths = conversation.get("file_paths", []) key_files = file_paths[:20] # Limit to top 20 # Get key tools (most used) tool_calls = conversation.get("tool_calls", []) key_tools = [tool["tool"] for tool in tool_calls[:10]] # Calculate metrics metrics = { "message_count": conversation.get("message_count", 0), "duration_seconds": conversation.get("duration_seconds", 0), "file_count": len(file_paths), "tool_count": len(tool_calls), "decision_count": len(decisions), } # Calculate conversation quality score (0-10) quality_score = min(10, ( min(5, len(messages) / 2) + # More messages = higher quality min(2, len(decisions)) + # Decisions indicate depth min(2, len(file_paths) / 5) + # Files indicate concrete work (1 if metrics["duration_seconds"] > 300 else 0) # >5min sessions )) metrics["quality_score"] = round(quality_score, 1) return { "category": category, "summary": summary, "tags": tags[:20], # Limit tags "decisions": decisions[:10], # Limit decisions "key_files": key_files, "key_tools": key_tools, "metrics": metrics, "raw_metadata": metadata } def scan_folder_for_conversations(base_path: str) -> List[str]: """ Recursively find all conversation files (.jsonl and .json) in a directory. Args: base_path: Root directory to start scanning Returns: List of absolute file paths to conversation files Example: >>> files = scan_folder_for_conversations("/path/to/conversations") >>> len(files) 42 >>> files[0] '/path/to/conversations/session1/messages.jsonl' """ if not os.path.exists(base_path): raise FileNotFoundError(f"Base path does not exist: {base_path}") conversation_files = [] # Use pathlib for cross-platform path handling base = Path(base_path) # Find all .jsonl and .json files recursively for ext in ["*.jsonl", "*.json"]: for file_path in base.rglob(ext): # Skip config files and settings filename = file_path.name.lower() if filename in ["config.json", "settings.json", "settings.local.json"]: continue # Skip common non-conversation JSON files skip_patterns = [ "package.json", "tsconfig.json", "webpack.json", "manifest.json", ".vscode", "node_modules" ] if any(pattern in str(file_path).lower() for pattern in skip_patterns): continue conversation_files.append(str(file_path.resolve())) return sorted(conversation_files) def batch_process_conversations( base_path: str, output_callback: Optional[callable] = None ) -> List[Dict[str, Any]]: """ Scan folder and process all conversations into extracted contexts. Convenience function that combines scanning and extraction. Args: base_path: Root directory to scan output_callback: Optional callback function(file_path, context) for progress Returns: List of extracted context dicts Example: >>> def progress(path, ctx): ... print(f"Processed: {path} -> {ctx['category']}") >>> contexts = batch_process_conversations("/path", progress) Processed: /path/session1.jsonl -> development Processed: /path/session2.jsonl -> msp >>> len(contexts) 2 """ files = scan_folder_for_conversations(base_path) contexts = [] for file_path in files: try: conversation = parse_jsonl_conversation(file_path) context = extract_context_from_conversation(conversation) # Add source file path to context context["source_file"] = file_path contexts.append(context) if output_callback: output_callback(file_path, context) except Exception as e: print(f"Error processing {file_path}: {e}") continue return contexts # Utility function for quick testing def summarize_conversation_file(file_path: str) -> str: """ Quick summary of a conversation file for CLI/debugging. Args: file_path: Path to conversation file Returns: Human-readable summary string """ try: conversation = parse_jsonl_conversation(file_path) context = extract_context_from_conversation(conversation) title = context["raw_metadata"].get("title", "Untitled") category = context["category"] msg_count = context["metrics"]["message_count"] duration = context["metrics"]["duration_seconds"] tags = ", ".join(context["tags"][:5]) summary = f""" Conversation: {title} Category: {category} Messages: {msg_count} Duration: {duration}s ({duration // 60}m) Tags: {tags} Quality: {context["metrics"]["quality_score"]}/10 """.strip() return summary except Exception as e: return f"Error: {e}" if __name__ == "__main__": # Quick test if run directly import sys if len(sys.argv) > 1: file_path = sys.argv[1] print(summarize_conversation_file(file_path)) else: print("Usage: python conversation_parser.py ") print("\nExample:") print(" python conversation_parser.py /path/to/conversation.jsonl")