Add session import tool + fix audit gaps (GrepAI, Ollama, MCP, settings)
tools/import-sessions.py: Scans ~/.claude/projects/ for existing Claude Code sessions, extracts summaries (user messages, tools used, files touched, credential flags), stages for Claude to organize into ClaudeTools folder structure. Audit gap fixes: - .mcp.json: added grepai MCP server - .claude/settings.json: created with bypassPermissions default - .claude/MCP_SERVERS.md: documented all MCP servers - Ollama: all 3 models pulled (qwen3:14b, codestral:22b, nomic-embed-text) - GrepAI: initialized (grepai init), watcher ready Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
268
tools/import-sessions.py
Normal file
268
tools/import-sessions.py
Normal file
@@ -0,0 +1,268 @@
|
||||
"""
|
||||
Import existing Claude Code session data into ClaudeTools.
|
||||
|
||||
Scans ~/.claude/projects/ for conversation transcripts, extracts
|
||||
summaries + key details, and stages them for review before committing.
|
||||
|
||||
Usage:
|
||||
python tools/import-sessions.py # scan + summarize
|
||||
python tools/import-sessions.py --output staging/ # custom output dir
|
||||
python tools/import-sessions.py --limit 10 # only last 10 sessions
|
||||
python tools/import-sessions.py --since 2026-03-01 # sessions after date
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def find_claude_projects():
|
||||
"""Find all Claude Code project directories."""
|
||||
claude_dir = Path.home() / ".claude" / "projects"
|
||||
if not claude_dir.exists():
|
||||
print(f"[!] No Claude projects found at {claude_dir}")
|
||||
return []
|
||||
|
||||
projects = []
|
||||
for project_dir in claude_dir.iterdir():
|
||||
if not project_dir.is_dir():
|
||||
continue
|
||||
jsonl_files = sorted(project_dir.glob("*.jsonl"), key=os.path.getmtime, reverse=True)
|
||||
if jsonl_files:
|
||||
projects.append({
|
||||
"dir": project_dir,
|
||||
"name": project_dir.name,
|
||||
"sessions": jsonl_files,
|
||||
"last_modified": datetime.fromtimestamp(jsonl_files[0].stat().st_mtime),
|
||||
})
|
||||
return sorted(projects, key=lambda p: p["last_modified"], reverse=True)
|
||||
|
||||
|
||||
def extract_session_summary(jsonl_path, max_messages=200):
|
||||
"""Extract key information from a single session JSONL file."""
|
||||
messages = []
|
||||
try:
|
||||
with open(jsonl_path, "r", encoding="utf-8", errors="replace") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
msg = json.loads(line)
|
||||
messages.append(msg)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
except Exception as e:
|
||||
return {"error": str(e), "path": str(jsonl_path)}
|
||||
|
||||
if not messages:
|
||||
return {"empty": True, "path": str(jsonl_path)}
|
||||
|
||||
# Extract useful data
|
||||
user_messages = []
|
||||
assistant_summaries = []
|
||||
tool_calls = []
|
||||
files_touched = set()
|
||||
credentials_mentioned = []
|
||||
urls_mentioned = set()
|
||||
|
||||
for msg in messages[-max_messages:]:
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
|
||||
# Handle content that's a list of blocks
|
||||
if isinstance(content, list):
|
||||
text_parts = []
|
||||
for block in content:
|
||||
if isinstance(block, dict):
|
||||
if block.get("type") == "text":
|
||||
text_parts.append(block.get("text", ""))
|
||||
elif block.get("type") == "tool_use":
|
||||
tool_calls.append(block.get("name", "unknown"))
|
||||
# Extract file paths from tool inputs
|
||||
inp = block.get("input", {})
|
||||
for key in ["file_path", "path", "command"]:
|
||||
val = inp.get(key, "")
|
||||
if val and ("/" in val or "\\" in val):
|
||||
files_touched.add(str(val)[:200])
|
||||
elif isinstance(block, str):
|
||||
text_parts.append(block)
|
||||
content = "\n".join(text_parts)
|
||||
|
||||
if not isinstance(content, str):
|
||||
continue
|
||||
|
||||
if role == "user":
|
||||
# Keep first 500 chars of each user message
|
||||
cleaned = content.strip()[:500]
|
||||
if cleaned and not cleaned.startswith("<"): # skip system messages
|
||||
user_messages.append(cleaned)
|
||||
|
||||
# Look for credential patterns (but don't extract values — just flag)
|
||||
if re.search(r"password|api[_-]?key|token|secret|credential", content, re.I):
|
||||
# Note the context but don't extract the actual credential
|
||||
match = re.search(r"(password|api[_-]?key|token|secret|credential)[^\n]{0,100}", content, re.I)
|
||||
if match:
|
||||
credentials_mentioned.append(match.group(0)[:120])
|
||||
|
||||
# Extract URLs
|
||||
for url in re.findall(r"https?://[^\s\"'<>]+", content):
|
||||
urls_mentioned.add(url[:200])
|
||||
|
||||
# Session metadata
|
||||
first_msg_time = None
|
||||
last_msg_time = None
|
||||
for msg in messages:
|
||||
ts = msg.get("timestamp")
|
||||
if ts:
|
||||
try:
|
||||
t = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
if first_msg_time is None:
|
||||
first_msg_time = t
|
||||
last_msg_time = t
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
return {
|
||||
"path": str(jsonl_path),
|
||||
"message_count": len(messages),
|
||||
"first_timestamp": str(first_msg_time) if first_msg_time else None,
|
||||
"last_timestamp": str(last_msg_time) if last_msg_time else None,
|
||||
"user_messages": user_messages[:30], # cap at 30 most recent
|
||||
"tool_calls_summary": dict(sorted(
|
||||
{t: tool_calls.count(t) for t in set(tool_calls)}.items(),
|
||||
key=lambda x: -x[1]
|
||||
)[:15]),
|
||||
"files_touched": sorted(files_touched)[:50],
|
||||
"credentials_flagged": len(credentials_mentioned),
|
||||
"credential_contexts": credentials_mentioned[:10],
|
||||
"urls": sorted(urls_mentioned)[:20],
|
||||
}
|
||||
|
||||
|
||||
def write_summary(projects, output_dir, limit=None, since=None):
|
||||
"""Write extracted summaries to output directory."""
|
||||
output_dir = Path(output_dir)
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
summaries = []
|
||||
session_count = 0
|
||||
|
||||
for project in projects:
|
||||
for jsonl_path in project["sessions"]:
|
||||
if limit and session_count >= limit:
|
||||
break
|
||||
|
||||
file_mtime = datetime.fromtimestamp(jsonl_path.stat().st_mtime)
|
||||
if since and file_mtime < since:
|
||||
continue
|
||||
|
||||
print(f" scanning {jsonl_path.name} ({jsonl_path.stat().st_size / 1024:.0f} KB)...")
|
||||
summary = extract_session_summary(jsonl_path)
|
||||
summary["project_dir"] = str(project["dir"])
|
||||
summary["file_modified"] = str(file_mtime)
|
||||
summaries.append(summary)
|
||||
session_count += 1
|
||||
|
||||
if limit and session_count >= limit:
|
||||
break
|
||||
|
||||
# Write individual summaries
|
||||
for i, s in enumerate(summaries):
|
||||
out_path = output_dir / f"session_{i+1:03d}.json"
|
||||
with open(out_path, "w", encoding="utf-8") as f:
|
||||
json.dump(s, f, indent=2, default=str)
|
||||
|
||||
# Write master index
|
||||
index = {
|
||||
"scan_date": datetime.now().isoformat(),
|
||||
"total_projects": len(projects),
|
||||
"sessions_scanned": len(summaries),
|
||||
"sessions": [
|
||||
{
|
||||
"file": f"session_{i+1:03d}.json",
|
||||
"messages": s.get("message_count", 0),
|
||||
"modified": s.get("file_modified", ""),
|
||||
"first_user_message": (s.get("user_messages", [""])[0][:100]
|
||||
if s.get("user_messages") else "(empty)"),
|
||||
"tools_used": list(s.get("tool_calls_summary", {}).keys())[:5],
|
||||
"credentials_flagged": s.get("credentials_flagged", 0),
|
||||
}
|
||||
for i, s in enumerate(summaries)
|
||||
],
|
||||
}
|
||||
index_path = output_dir / "INDEX.json"
|
||||
with open(index_path, "w", encoding="utf-8") as f:
|
||||
json.dump(index, f, indent=2, default=str)
|
||||
|
||||
# Write human-readable overview
|
||||
overview_path = output_dir / "OVERVIEW.md"
|
||||
with open(overview_path, "w", encoding="utf-8") as f:
|
||||
f.write("# Imported Session Overview\n\n")
|
||||
f.write(f"Scanned: {datetime.now().strftime('%Y-%m-%d %H:%M')}\n")
|
||||
f.write(f"Projects found: {len(projects)}\n")
|
||||
f.write(f"Sessions scanned: {len(summaries)}\n\n")
|
||||
f.write("## Sessions (most recent first)\n\n")
|
||||
for i, s in enumerate(summaries):
|
||||
msgs = s.get("user_messages", [])
|
||||
first = msgs[0][:120] if msgs else "(empty session)"
|
||||
f.write(f"### Session {i+1} — {s.get('file_modified', 'unknown date')}\n")
|
||||
f.write(f"- Messages: {s.get('message_count', 0)}\n")
|
||||
f.write(f"- First ask: {first}\n")
|
||||
tools = s.get("tool_calls_summary", {})
|
||||
if tools:
|
||||
f.write(f"- Tools: {', '.join(list(tools.keys())[:5])}\n")
|
||||
files = s.get("files_touched", [])
|
||||
if files:
|
||||
f.write(f"- Files touched: {len(files)} (see session_{i+1:03d}.json)\n")
|
||||
if s.get("credentials_flagged", 0) > 0:
|
||||
f.write(f"- [!] Credentials mentioned: {s['credentials_flagged']} instances\n")
|
||||
f.write("\n")
|
||||
|
||||
print(f"\n[OK] Wrote {len(summaries)} session summaries to {output_dir}/")
|
||||
print(f" Index: {index_path}")
|
||||
print(f" Overview: {overview_path}")
|
||||
print(f"\nNext step: ask Claude to review OVERVIEW.md and organize")
|
||||
print(f" the extracted data into the ClaudeTools folder structure.")
|
||||
return summaries
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Import Claude Code sessions into ClaudeTools")
|
||||
parser.add_argument("--output", "-o", default="imported-sessions",
|
||||
help="Output directory for summaries (default: imported-sessions/)")
|
||||
parser.add_argument("--limit", "-n", type=int, default=None,
|
||||
help="Max sessions to scan")
|
||||
parser.add_argument("--since", "-s", type=str, default=None,
|
||||
help="Only sessions modified after this date (YYYY-MM-DD)")
|
||||
args = parser.parse_args()
|
||||
|
||||
since = None
|
||||
if args.since:
|
||||
since = datetime.strptime(args.since, "%Y-%m-%d")
|
||||
|
||||
print("=== Claude Code Session Import Tool ===")
|
||||
print()
|
||||
|
||||
projects = find_claude_projects()
|
||||
if not projects:
|
||||
print("No sessions found. Nothing to import.")
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Found {len(projects)} project(s) with session data:")
|
||||
for p in projects[:10]:
|
||||
print(f" {p['name'][:20]}... — {len(p['sessions'])} session(s), "
|
||||
f"last: {p['last_modified'].strftime('%Y-%m-%d %H:%M')}")
|
||||
if len(projects) > 10:
|
||||
print(f" ... and {len(projects) - 10} more")
|
||||
print()
|
||||
|
||||
write_summary(projects, args.output, limit=args.limit, since=since)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user