""" Bulk Import API Router for ClaudeTools. Provides endpoints for bulk importing conversation contexts from Claude project folders. Scans .jsonl files, extracts context using the conversation_parser utility. """ import json from typing import Dict, List, Optional from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.orm import Session from api.database import get_db from api.middleware.auth import get_current_user from api.schemas.conversation_context import ConversationContextCreate from api.services import conversation_context_service from api.utils.conversation_parser import ( extract_context_from_conversation, parse_jsonl_conversation, scan_folder_for_conversations, ) # Create router router = APIRouter() @router.post( "/import-folder", response_model=dict, summary="Bulk import from Claude projects folder", description="Scan a folder for .jsonl conversation files and import them as contexts", status_code=status.HTTP_200_OK, ) async def import_claude_folder( folder_path: str = Query(..., description="Path to Claude projects folder"), dry_run: bool = Query(False, description="Preview import without saving to database"), project_id: Optional[UUID] = Query(None, description="Associate contexts with a specific project"), session_id: Optional[UUID] = Query(None, description="Associate contexts with a specific session"), db: Session = Depends(get_db), current_user: dict = Depends(get_current_user), ): """ Bulk import conversation contexts from a Claude projects folder. This endpoint: 1. Scans the folder for .jsonl conversation files 2. Parses each conversation file 3. Extracts context, decisions, and metadata 4. Saves contexts to database (unless dry_run=True) Args: folder_path: Path to the folder containing Claude project conversations dry_run: If True, preview import without saving (default: False) project_id: Optional project ID to associate all contexts with session_id: Optional session ID to associate all contexts with db: Database session current_user: Current authenticated user Returns: Dictionary with import results and statistics """ result = { "dry_run": dry_run, "folder_path": folder_path, "files_scanned": 0, "files_processed": 0, "contexts_created": 0, "errors": [], "contexts_preview": [], } try: # Step 1: Scan folder for conversation files conversation_files = scan_folder_for_conversations(folder_path) result["files_scanned"] = len(conversation_files) if not conversation_files: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"No .jsonl conversation files found in {folder_path}" ) # Step 2: Process each conversation file for file_path in conversation_files: try: # Parse conversation file using the new parser conversation = parse_jsonl_conversation(file_path) if not conversation.get("messages"): result["errors"].append({ "file": file_path, "error": "No messages found in file" }) continue # Extract context using the new parser context = extract_context_from_conversation(conversation) # Map context to database format context_title = context["raw_metadata"].get("title", f"Conversation: {conversation.get('file_paths', ['Unknown'])[0] if conversation.get('file_paths') else 'Unknown'}") # Build dense summary from compressed summary summary_parts = [] if context["summary"].get("summary"): summary_parts.append(context["summary"]["summary"]) # Add category information summary_parts.append(f"Category: {context['category']}") # Add key statistics metrics = context.get("metrics", {}) summary_parts.append( f"Messages: {metrics.get('message_count', 0)}, " f"Duration: {metrics.get('duration_seconds', 0)}s, " f"Quality: {metrics.get('quality_score', 0)}/10" ) dense_summary = "\n\n".join(summary_parts) # Map category to context_type category = context.get("category", "general") if category == "msp": context_type = "session_summary" elif category == "development": context_type = "project_state" else: context_type = "general_context" # Extract key decisions as JSON string decisions = context.get("decisions", []) key_decisions_json = json.dumps(decisions) if decisions else None # Extract tags as JSON string tags = context.get("tags", []) tags_json = json.dumps(tags) # Calculate relevance score from quality score quality_score = metrics.get("quality_score", 5.0) relevance_score = min(10.0, quality_score) # Build context create schema context_data = ConversationContextCreate( session_id=session_id, project_id=project_id, machine_id=None, context_type=context_type, title=context_title, dense_summary=dense_summary, key_decisions=key_decisions_json, current_state=None, tags=tags_json, relevance_score=relevance_score, ) # Preview context context_preview = { "file": file_path.split('\\')[-1] if '\\' in file_path else file_path.split('/')[-1], "title": context_title, "type": context_type, "category": category, "message_count": metrics.get("message_count", 0), "tags": tags[:5], # First 5 tags "relevance_score": relevance_score, "quality_score": quality_score, } result["contexts_preview"].append(context_preview) # Save to database (unless dry_run) if not dry_run: created_context = conversation_context_service.create_conversation_context( db, context_data ) result["contexts_created"] += 1 result["files_processed"] += 1 except Exception as e: result["errors"].append({ "file": file_path, "error": str(e) }) continue # Step 3: Generate summary result["summary"] = _generate_import_summary(result) return result except HTTPException: raise except FileNotFoundError as e: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=str(e) ) except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Import failed: {str(e)}" ) def _generate_import_summary(result: Dict) -> str: """ Generate human-readable summary of import results. Args: result: Import results dictionary Returns: Summary string """ summary_lines = [ f"Scanned {result['files_scanned']} files", f"Processed {result['files_processed']} successfully", ] if result["dry_run"]: summary_lines.append("DRY RUN - No changes saved to database") summary_lines.append(f"Would create {len(result['contexts_preview'])} contexts") else: summary_lines.append(f"Created {result['contexts_created']} contexts") if result["errors"]: summary_lines.append(f"Encountered {len(result['errors'])} errors") return " | ".join(summary_lines) @router.get( "/import-status", response_model=dict, summary="Check import system status", description="Get status of the bulk import system", status_code=status.HTTP_200_OK, ) async def get_import_status( current_user: dict = Depends(get_current_user), ): """ Get status information about the bulk import system. Returns: Dictionary with system status """ return { "status": "online", "features": { "conversation_parsing": True, "intelligent_categorization": True, "dry_run": True, }, "supported_formats": [".jsonl", ".json"], "categories": ["msp", "development", "general"], "version": "1.0.0", }