Reconstructs session logs from Claude Code transcripts when a session crashes or is closed before /save. Two entry points: - /recover <uuid|latest> : manual, Claude-reviewed reconstruction - detect_orphaned_sessions.py : scheduled scan that auto-builds logs for substantive, unsaved, not-yet-recovered transcripts (banner-marked RECOVERED-UNVERIFIED), commits them, and posts a #bot-alerts FYI. recover_session.py is the shared engine: Python extracts the verbatim command/config/reference timeline; Ollama drafts prose-only narrative. Machine-local ledger (.claude/state/) prevents reprocessing. Reviewed: git add scoped to own files, ledger written only after successful push, per-uuid idempotency, --max cap for unattended runs. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1188 lines
42 KiB
Python
1188 lines
42 KiB
Python
#!/usr/bin/env python3
|
|
"""recover_session.py -- reconstruct a ClaudeTools session log from a Claude Code transcript.
|
|
|
|
Claude Code writes every session live to a transcript JSONL under
|
|
``~/.claude/projects/<slug>/<uuid>.jsonl`` (slug = the repo root path with ``/``,
|
|
``\\`` and ``:`` each replaced by ``-``). When a session crashes or is closed
|
|
before ``/save`` runs, the work is still fully recorded in that transcript. This
|
|
module distills a transcript back into a normal session log that follows the
|
|
``.claude/commands/save.md`` format.
|
|
|
|
Accuracy split (deliberate):
|
|
- Ollama drafts ONLY the prose sections (Session Summary, Key Decisions,
|
|
Problems Encountered, Pending / Incomplete Tasks). It never sees -- and never
|
|
emits -- commands, IPs, credentials, file paths, commit SHAs, or ticket IDs.
|
|
- Python extracts the high-value, accuracy-critical evidence verbatim
|
|
(Configuration Changes, Commands & Outputs, Reference Information,
|
|
Infrastructure & Servers, Credentials & Secrets).
|
|
|
|
If Ollama is unreachable the log is still produced -- the prose sections carry a
|
|
placeholder note and the verbatim evidence appendix (the important part) is
|
|
intact.
|
|
|
|
CLI:
|
|
recover_session.py --uuid <uuid> [--print | --auto | --json]
|
|
recover_session.py --latest [--print | --auto | --json]
|
|
recover_session.py --path <file> [--print | --auto | --json]
|
|
|
|
Importable API (the detector uses these):
|
|
iter_events(path) -> yields raw decoded JSON objects, in file order
|
|
parse_transcript(path) -> ParsedTranscript
|
|
classify(parsed) -> dict with substantive/saved/scope/... verdict
|
|
build_log(parsed, today=None) -> (markdown_str, meta_dict)
|
|
resolve(uuid=None, latest=False, path=None) -> Path
|
|
|
|
stdlib only; targets Python 3.11+.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import urllib.error
|
|
import urllib.request
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Constants
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
# Tools that, when used, mark a session as "substantive" (it mutated something).
|
|
_MUTATING_TOOLS = {"Write", "Edit", "NotebookEdit"}
|
|
|
|
# Shell commands (Bash / PowerShell) that count as mutating actions. Matched
|
|
# case-insensitively as a search (not anchored) against the command string.
|
|
_MUTATING_CMD_PATTERNS = [
|
|
r"git\s+(commit|push|add)\b",
|
|
r"\bssh\b",
|
|
r"\bschtasks\b",
|
|
r"\bNew-Item\b",
|
|
r"\bSet-Content\b",
|
|
r"\bRemove-Item\b",
|
|
r"\bOut-File\b",
|
|
r"curl\b.*-X\s*(POST|PUT|DELETE|PATCH)",
|
|
r"/api/",
|
|
r"vault\.sh\b",
|
|
r"Invoke-RestMethod\b.*-Method\s*(Post|Put|Delete)",
|
|
]
|
|
_MUTATING_CMD_RE = re.compile("|".join(_MUTATING_CMD_PATTERNS), re.IGNORECASE)
|
|
|
|
# Skills whose use implies real, mutating work was performed.
|
|
_MUTATING_SKILLS = {
|
|
"syncro",
|
|
"rmm",
|
|
"remediation-tool",
|
|
"mailbox",
|
|
"forum-post",
|
|
"syncro-emergency-billing",
|
|
}
|
|
|
|
# Skills / file markers that indicate the session WAS already saved.
|
|
_SAVE_SKILLS = {"save", "scc", "checkpoint"}
|
|
_SESSION_LOG_MARKERS = ("session-logs/", "session-logs\\")
|
|
|
|
# Tool-result truncation budget.
|
|
_RESULT_TRUNC = 300
|
|
|
|
# Ollama digest budget.
|
|
_DIGEST_CAP = 16000
|
|
|
|
# Commit footer (matches the repo's standard).
|
|
_COMMIT_FOOTER = "Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Repo / path resolution
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def repo_root() -> Path:
|
|
"""Return the ClaudeTools repo root.
|
|
|
|
Prefer ``claudetools_root`` from ``.claude/identity.json`` (portable across
|
|
machines); fall back to two levels up from this script
|
|
(``.../.claude/scripts/`` -> repo root).
|
|
"""
|
|
here = Path(__file__).resolve()
|
|
fallback = here.parents[2] # .../.claude/scripts/recover_session.py -> repo root
|
|
id_path = fallback / ".claude" / "identity.json"
|
|
try:
|
|
data = json.loads(id_path.read_text(encoding="utf-8"))
|
|
root = data.get("claudetools_root")
|
|
if root:
|
|
p = Path(root)
|
|
if p.exists():
|
|
return p
|
|
except (OSError, ValueError):
|
|
pass
|
|
return fallback
|
|
|
|
|
|
def _identity() -> dict:
|
|
try:
|
|
return json.loads(
|
|
(repo_root() / ".claude" / "identity.json").read_text(encoding="utf-8")
|
|
)
|
|
except (OSError, ValueError):
|
|
return {}
|
|
|
|
|
|
def transcript_base_dir() -> Path:
|
|
"""Compute ``~/.claude/projects/<slug>`` from identity's claudetools_root."""
|
|
root = _identity().get("claudetools_root") or str(repo_root())
|
|
slug = re.sub(r"[/\\:]", "-", root)
|
|
return Path.home() / ".claude" / "projects" / slug
|
|
|
|
|
|
def resolve(uuid: str | None = None, latest: bool = False, path: str | None = None) -> Path:
|
|
"""Resolve the transcript file path from one of the three selectors."""
|
|
if path:
|
|
p = Path(path)
|
|
if not p.exists():
|
|
raise FileNotFoundError(f"transcript not found: {p}")
|
|
return p
|
|
base = transcript_base_dir()
|
|
if latest:
|
|
candidates = sorted(
|
|
base.glob("*.jsonl"), key=lambda f: f.stat().st_mtime, reverse=True
|
|
)
|
|
if not candidates:
|
|
raise FileNotFoundError(f"no transcripts in {base}")
|
|
return candidates[0]
|
|
if uuid:
|
|
p = base / f"{uuid}.jsonl"
|
|
if not p.exists():
|
|
raise FileNotFoundError(f"transcript not found: {p}")
|
|
return p
|
|
raise ValueError("one of uuid / latest / path is required")
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Parsing
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
_SYSTEM_REMINDER_RE = re.compile(
|
|
r"<system-reminder>.*?</system-reminder>", re.IGNORECASE | re.DOTALL
|
|
)
|
|
# Long base64-ish blobs (data URLs and bare runs). Replace with a placeholder.
|
|
_DATAURL_RE = re.compile(r"data:[^;]+;base64,[A-Za-z0-9+/=\s]+", re.IGNORECASE)
|
|
_BASE64_RUN_RE = re.compile(r"[A-Za-z0-9+/]{200,}={0,2}")
|
|
|
|
|
|
def _strip_noise(text: str) -> str:
|
|
if not text:
|
|
return ""
|
|
text = _SYSTEM_REMINDER_RE.sub("", text)
|
|
text = _DATAURL_RE.sub("[base64 blob omitted]", text)
|
|
text = _BASE64_RUN_RE.sub("[base64 blob omitted]", text)
|
|
return text
|
|
|
|
|
|
def _truncate(text: str, limit: int = _RESULT_TRUNC) -> str:
|
|
text = (text or "").strip()
|
|
if len(text) <= limit:
|
|
return text
|
|
return text[:limit].rstrip() + " ... [truncated]"
|
|
|
|
|
|
def _flatten_content(content) -> str:
|
|
"""Flatten a message-content value (str, or list of blocks) to plain text."""
|
|
if content is None:
|
|
return ""
|
|
if isinstance(content, str):
|
|
return content
|
|
parts: list[str] = []
|
|
if isinstance(content, list):
|
|
for blk in content:
|
|
if isinstance(blk, str):
|
|
parts.append(blk)
|
|
elif isinstance(blk, dict):
|
|
if blk.get("type") == "text" and isinstance(blk.get("text"), str):
|
|
parts.append(blk["text"])
|
|
elif "text" in blk and isinstance(blk["text"], str):
|
|
parts.append(blk["text"])
|
|
return "\n".join(parts)
|
|
|
|
|
|
def _concise_args(name: str, inp: dict) -> str:
|
|
"""One-line, human-readable summary of a tool_use input."""
|
|
if not isinstance(inp, dict):
|
|
return ""
|
|
if name in ("Bash", "PowerShell"):
|
|
cmd = inp.get("command", "")
|
|
return _truncate(cmd.replace("\n", " "), 200)
|
|
if name in ("Write", "Edit", "NotebookEdit"):
|
|
return inp.get("file_path") or inp.get("notebook_path") or ""
|
|
if name == "Read":
|
|
return inp.get("file_path", "")
|
|
if name in ("Glob", "Grep"):
|
|
bits = []
|
|
if inp.get("pattern"):
|
|
bits.append(f"pattern={inp['pattern']}")
|
|
if inp.get("path"):
|
|
bits.append(f"path={inp['path']}")
|
|
if inp.get("glob"):
|
|
bits.append(f"glob={inp['glob']}")
|
|
return " ".join(bits)
|
|
if name == "Skill":
|
|
skill = inp.get("skill", "")
|
|
args = _truncate(str(inp.get("args", "")).replace("\n", " "), 160)
|
|
return f"{skill}: {args}" if args else skill
|
|
if name in ("WebFetch", "WebSearch"):
|
|
return _truncate(str(inp.get("url") or inp.get("query") or ""), 160)
|
|
# generic
|
|
return _truncate(json.dumps(inp, ensure_ascii=False), 160)
|
|
|
|
|
|
@dataclass
|
|
class Event:
|
|
kind: str # "human" | "assistant_text" | "tool_use" | "tool_result"
|
|
text: str = ""
|
|
name: str = "" # tool name (tool_use)
|
|
args: str = "" # concise args (tool_use)
|
|
file_path: str = "" # for Write/Edit/NotebookEdit
|
|
skill: str = "" # for Skill tool_use
|
|
command: str = "" # raw command for Bash/PowerShell tool_use
|
|
timestamp: str = ""
|
|
|
|
|
|
@dataclass
|
|
class ParsedTranscript:
|
|
path: Path
|
|
uuid: str
|
|
events: list[Event] = field(default_factory=list)
|
|
first_ts: str = ""
|
|
last_ts: str = ""
|
|
cwd: str = ""
|
|
git_branch: str = ""
|
|
ai_title: str = ""
|
|
raw_text: str = "" # whole-transcript concatenation for regex sweeps
|
|
mtime: float = 0.0
|
|
|
|
|
|
def iter_events(path: str | Path):
|
|
"""Yield raw decoded JSON objects from a transcript, in file order.
|
|
|
|
Malformed lines are skipped silently (transcripts can have partial last
|
|
lines after a crash -- exactly the case we exist to recover from).
|
|
"""
|
|
p = Path(path)
|
|
with p.open("r", encoding="utf-8", errors="replace") as fh:
|
|
for line in fh:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
yield json.loads(line)
|
|
except ValueError:
|
|
continue
|
|
|
|
|
|
def parse_transcript(path: str | Path) -> ParsedTranscript:
|
|
"""Parse a transcript into an ordered Event list plus metadata."""
|
|
p = Path(path)
|
|
parsed = ParsedTranscript(path=p, uuid=p.stem)
|
|
try:
|
|
parsed.mtime = p.stat().st_mtime
|
|
except OSError:
|
|
parsed.mtime = 0.0
|
|
|
|
raw_chunks: list[str] = []
|
|
|
|
for obj in iter_events(p):
|
|
if not isinstance(obj, dict):
|
|
continue
|
|
t = obj.get("type")
|
|
ts = obj.get("timestamp")
|
|
if isinstance(ts, str):
|
|
if not parsed.first_ts:
|
|
parsed.first_ts = ts
|
|
parsed.last_ts = ts
|
|
if obj.get("cwd") and not parsed.cwd:
|
|
parsed.cwd = obj["cwd"]
|
|
if obj.get("gitBranch") and not parsed.git_branch:
|
|
parsed.git_branch = obj["gitBranch"]
|
|
|
|
# ai-title metadata -- usable title hint
|
|
if t == "ai-title":
|
|
title = obj.get("aiTitle")
|
|
if isinstance(title, str) and title.strip():
|
|
parsed.ai_title = title.strip()
|
|
continue
|
|
|
|
# Skip subagent / sidechain lines for the main timeline.
|
|
if obj.get("isSidechain"):
|
|
continue
|
|
|
|
if t == "assistant":
|
|
msg = obj.get("message", {}) or {}
|
|
content = msg.get("content", [])
|
|
if not isinstance(content, list):
|
|
continue
|
|
for blk in content:
|
|
if not isinstance(blk, dict):
|
|
continue
|
|
btype = blk.get("type")
|
|
if btype == "text":
|
|
txt = _strip_noise(blk.get("text", "")).strip()
|
|
if txt:
|
|
parsed.events.append(
|
|
Event(kind="assistant_text", text=txt, timestamp=ts or "")
|
|
)
|
|
raw_chunks.append(txt)
|
|
elif btype == "tool_use":
|
|
name = blk.get("name", "")
|
|
inp = blk.get("input", {}) or {}
|
|
ev = Event(
|
|
kind="tool_use",
|
|
name=name,
|
|
args=_concise_args(name, inp),
|
|
timestamp=ts or "",
|
|
)
|
|
if name in _MUTATING_TOOLS:
|
|
ev.file_path = inp.get("file_path") or inp.get(
|
|
"notebook_path", ""
|
|
)
|
|
if name == "Skill":
|
|
ev.skill = inp.get("skill", "")
|
|
if name in ("Bash", "PowerShell"):
|
|
ev.command = inp.get("command", "") or ""
|
|
parsed.events.append(ev)
|
|
raw_chunks.append(f"{name} {ev.args}")
|
|
|
|
elif t == "user":
|
|
msg = obj.get("message", {}) or {}
|
|
content = msg.get("content")
|
|
if isinstance(content, str):
|
|
# A real human-typed prompt.
|
|
txt = _strip_noise(content).strip()
|
|
if txt:
|
|
parsed.events.append(
|
|
Event(kind="human", text=txt, timestamp=ts or "")
|
|
)
|
|
raw_chunks.append(txt)
|
|
elif isinstance(content, list):
|
|
# tool_result blocks (tool output -- NOT a human prompt).
|
|
for blk in content:
|
|
if not isinstance(blk, dict):
|
|
continue
|
|
if blk.get("type") == "tool_result":
|
|
body = _flatten_content(blk.get("content"))
|
|
body = _strip_noise(body)
|
|
if body.strip():
|
|
parsed.events.append(
|
|
Event(
|
|
kind="tool_result",
|
|
text=_truncate(body),
|
|
timestamp=ts or "",
|
|
)
|
|
)
|
|
raw_chunks.append(body[:1000])
|
|
# other metadata types (mode, permission-mode, system, attachment,
|
|
# file-history-snapshot, queue-operation, last-prompt) -> skipped.
|
|
|
|
parsed.raw_text = "\n".join(raw_chunks)
|
|
return parsed
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Classification
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def _is_mutating_command(cmd: str) -> bool:
|
|
return bool(cmd) and bool(_MUTATING_CMD_RE.search(cmd))
|
|
|
|
|
|
# Project dirs that are NOT real session-log homes for client/project work:
|
|
# transcript/conversation archives, and generic catch-all buckets that should
|
|
# never become a session scope. A transcript that would otherwise classify to
|
|
# one of these must fall through to a real project (or to general).
|
|
_ARCHIVE_DIR_RE = re.compile(r"-conversation-logs$", re.IGNORECASE)
|
|
_GENERIC_PROJECT_NAMES = {"internal", "scripts", "toolkit", "utilities"}
|
|
|
|
|
|
def _is_valid_project_slug(name: str) -> bool:
|
|
"""A valid project scope is a real work dir that could sensibly own a
|
|
``session-logs/`` subdir -- not a transcript archive or a generic bucket.
|
|
"""
|
|
if _ARCHIVE_DIR_RE.search(name):
|
|
return False
|
|
if name.lower() in _GENERIC_PROJECT_NAMES:
|
|
return False
|
|
return True
|
|
|
|
|
|
def _known_slugs() -> tuple[set[str], set[str]]:
|
|
"""Return (client_slugs, project_slugs) discovered from the repo layout.
|
|
|
|
Project slugs exclude transcript archives (``*-conversation-logs``) and the
|
|
generic catch-all names (``internal``, ``scripts``, ``toolkit``,
|
|
``utilities``) -- see ``_is_valid_project_slug``.
|
|
"""
|
|
root = repo_root()
|
|
clients: set[str] = set()
|
|
projects: set[str] = set()
|
|
|
|
clients_dir = root / "clients"
|
|
if clients_dir.is_dir():
|
|
for d in clients_dir.iterdir():
|
|
if d.is_dir() and not d.name.startswith("_"):
|
|
clients.add(d.name)
|
|
wiki_clients = root / "wiki" / "clients"
|
|
if wiki_clients.is_dir():
|
|
for f in wiki_clients.glob("*.md"):
|
|
clients.add(f.stem)
|
|
|
|
proj_dir = root / "projects"
|
|
if proj_dir.is_dir():
|
|
for d in proj_dir.iterdir():
|
|
if d.is_dir() and _is_valid_project_slug(d.name):
|
|
projects.add(d.name)
|
|
msp = proj_dir / "msp-tools"
|
|
if msp.is_dir():
|
|
for d in msp.iterdir():
|
|
if d.is_dir() and _is_valid_project_slug(d.name):
|
|
projects.add(d.name)
|
|
|
|
return clients, projects
|
|
|
|
|
|
def _slug_to_words(slug: str) -> list[str]:
|
|
"""Split a slug into matchable word tokens, dropping trivial ones."""
|
|
words = [w for w in re.split(r"[-_]+", slug.lower()) if len(w) >= 4]
|
|
return words
|
|
|
|
|
|
def classify(parsed: ParsedTranscript) -> dict:
|
|
"""Classify a parsed transcript.
|
|
|
|
Returns a dict with: substantive (bool), saved (bool), scope (dict),
|
|
title (str), human_prompt_count (int), mutating_actions (list[str]).
|
|
"""
|
|
substantive = False
|
|
saved = False
|
|
mutating_actions: list[str] = []
|
|
human_prompt_count = 0
|
|
|
|
for ev in parsed.events:
|
|
if ev.kind == "human":
|
|
human_prompt_count += 1
|
|
continue
|
|
if ev.kind != "tool_use":
|
|
continue
|
|
|
|
name = ev.name
|
|
# saved? -- save/scc/checkpoint skill, or a write into session-logs/
|
|
if name == "Skill" and ev.skill in _SAVE_SKILLS:
|
|
saved = True
|
|
if name in _MUTATING_TOOLS and ev.file_path:
|
|
fp = ev.file_path.replace("\\", "/")
|
|
if any(m.replace("\\", "/") in fp or m in ev.file_path for m in _SESSION_LOG_MARKERS):
|
|
saved = True
|
|
|
|
# substantive?
|
|
if name in _MUTATING_TOOLS:
|
|
substantive = True
|
|
label = f"{name} {ev.file_path}".strip()
|
|
mutating_actions.append(label)
|
|
elif name in ("Bash", "PowerShell"):
|
|
if _is_mutating_command(ev.command):
|
|
substantive = True
|
|
mutating_actions.append(f"{name}: {_truncate(ev.command.replace(chr(10),' '),120)}")
|
|
elif name == "Skill" and ev.skill in _MUTATING_SKILLS:
|
|
substantive = True
|
|
mutating_actions.append(f"Skill: {ev.skill}")
|
|
|
|
scope = _classify_scope(parsed)
|
|
title = _derive_title(parsed)
|
|
|
|
return {
|
|
"substantive": substantive,
|
|
"saved": saved,
|
|
"scope": scope,
|
|
"title": title,
|
|
"human_prompt_count": human_prompt_count,
|
|
"mutating_actions": mutating_actions,
|
|
}
|
|
|
|
|
|
def _classify_scope(parsed: ParsedTranscript) -> dict:
|
|
"""Decide client / project / general scope. Conservative: ambiguous -> general."""
|
|
clients, projects = _known_slugs()
|
|
|
|
haystack_parts = [parsed.raw_text or "", parsed.cwd or "", parsed.git_branch or ""]
|
|
haystack = "\n".join(haystack_parts).lower()
|
|
|
|
def score(slug: str) -> int:
|
|
words = _slug_to_words(slug)
|
|
if not words:
|
|
# very short slug -- only count whole-slug hits to avoid noise
|
|
return haystack.count(slug.lower())
|
|
# Require the full slug phrase OR all words present; score by frequency
|
|
# of the longest token to keep it bounded and meaningful.
|
|
total = 0
|
|
if slug.lower() in haystack:
|
|
total += haystack.count(slug.lower()) * 3
|
|
for w in words:
|
|
total += haystack.count(w)
|
|
return total
|
|
|
|
client_scores = {s: score(s) for s in clients}
|
|
project_scores = {s: score(s) for s in projects}
|
|
|
|
best_client = max(client_scores.items(), key=lambda kv: kv[1], default=(None, 0))
|
|
best_project = max(project_scores.items(), key=lambda kv: kv[1], default=(None, 0))
|
|
|
|
# cwd-based project hint (strong signal): cwd inside projects/<x>/...
|
|
cwd_norm = (parsed.cwd or "").replace("\\", "/").lower()
|
|
cwd_project = None
|
|
m = re.search(r"/projects/(?:msp-tools/)?([a-z0-9._-]+)", cwd_norm)
|
|
if m and m.group(1) in projects:
|
|
cwd_project = m.group(1)
|
|
cwd_client = None
|
|
m = re.search(r"/clients/([a-z0-9._-]+)", cwd_norm)
|
|
if m and m.group(1) in clients:
|
|
cwd_client = m.group(1)
|
|
|
|
# Minimum confidence thresholds -- be conservative.
|
|
CLIENT_MIN = 4
|
|
PROJECT_MIN = 4
|
|
DOMINANCE = 2 # winner must beat runner-up by this factor to count
|
|
|
|
# cwd hints win if present and unambiguous.
|
|
if cwd_client and not cwd_project:
|
|
return {"type": "client", "slug": cwd_client}
|
|
if cwd_project and not cwd_client:
|
|
return {"type": "project", "slug": cwd_project}
|
|
|
|
c_slug, c_score = best_client
|
|
p_slug, p_score = best_project
|
|
|
|
# Determine the dominant category.
|
|
client_ok = c_slug and c_score >= CLIENT_MIN
|
|
project_ok = p_slug and p_score >= PROJECT_MIN
|
|
|
|
if client_ok and (not project_ok or c_score >= p_score * DOMINANCE):
|
|
return {"type": "client", "slug": c_slug}
|
|
if project_ok and (not client_ok or p_score >= c_score * DOMINANCE):
|
|
return {"type": "project", "slug": p_slug}
|
|
|
|
return {"type": "general"}
|
|
|
|
|
|
def _derive_title(parsed: ParsedTranscript) -> str:
|
|
if parsed.ai_title:
|
|
return parsed.ai_title
|
|
# first human prompt -> first sentence / first 70 chars
|
|
for ev in parsed.events:
|
|
if ev.kind == "human" and ev.text.strip():
|
|
line = ev.text.strip().splitlines()[0]
|
|
line = re.sub(r"\s+", " ", line).strip()
|
|
return _truncate(line, 70)
|
|
return "recovered session"
|
|
|
|
|
|
def _topic_slug(title: str) -> str:
|
|
slug = re.sub(r"[^a-z0-9]+", "-", (title or "").lower()).strip("-")
|
|
slug = re.sub(r"-{2,}", "-", slug)
|
|
return (slug or "session")[:48].strip("-") or "session"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Evidence extraction (verbatim -- Python only)
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
_RE_IP = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
|
|
_RE_URL = re.compile(r"https?://[^\s\"'<>)\]]+")
|
|
# A dotted run with 5+ numeric components is a version string, never an IP
|
|
# (e.g. "1.2.3.4.5"). Used to reject dotted-quad matches that are a sub-span of
|
|
# a longer version.
|
|
_RE_DOTTED_VERSION = re.compile(r"\d+(?:\.\d+){4,}")
|
|
# Version context that immediately precedes a dotted-quad marks it as a version,
|
|
# not an IP -- e.g. "version 1.9.158.0", "build 6.5.60.172", "v1.2.3.4". The
|
|
# trailing optional separators ("v"/space/colon/equals/parens) sit between the
|
|
# keyword and the number. A trailing bare "v"/"V" alone also counts.
|
|
_RE_VERSION_CONTEXT = re.compile(
|
|
r"(?:\b(?:version|ver|build|rev|revision|release|agent|firmware|fw|"
|
|
r"v)\b\s*[:=]?\s*v?|[vV])$",
|
|
re.IGNORECASE,
|
|
)
|
|
|
|
|
|
def _iter_real_ips(text: str):
|
|
"""Yield dotted-quads from ``text`` that are plausibly real IPv4 addresses.
|
|
|
|
Rejects version-like strings via a deliberately small rule set:
|
|
- any octet outside 0-255 (e.g. "1.9.158.300" is not an IP),
|
|
- a match that is a sub-span of a longer dotted version with 5+ components
|
|
(e.g. the "1.2.3.4" inside "1.2.3.4.5"),
|
|
- a match preceded by a version marker -- a bare ``v``/``V`` (e.g.
|
|
"v1.2.3.4") or a version keyword like "version"/"build" immediately
|
|
before it (e.g. "version 1.9.158.0", "build 6.5.60.172").
|
|
Order-preserving; de-duplication is the caller's job.
|
|
"""
|
|
if not text:
|
|
return
|
|
# Spans covered by a 5+-component dotted version -> not IPs.
|
|
version_spans = [m.span() for m in _RE_DOTTED_VERSION.finditer(text)]
|
|
for m in _RE_IP.finditer(text):
|
|
octets = m.group(0).split(".")
|
|
if any(not (0 <= int(o) <= 255) for o in octets):
|
|
continue
|
|
start = m.start()
|
|
# Reject if this match sits inside a longer dotted version.
|
|
if any(vs <= start and m.end() <= ve for vs, ve in version_spans):
|
|
continue
|
|
# Reject if immediately preceded by version context.
|
|
if _RE_VERSION_CONTEXT.search(text[:start]):
|
|
continue
|
|
yield m.group(0)
|
|
_RE_TICKET = re.compile(r"#\d{4,}")
|
|
_RE_COMMIT = re.compile(r"(?:\bcommit\b[^0-9a-f]{0,12})([0-9a-f]{7,40})\b", re.IGNORECASE)
|
|
_RE_UUID = re.compile(
|
|
r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b", re.IGNORECASE
|
|
)
|
|
_RE_HOST = re.compile(r"\b(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z]{2,}\b", re.IGNORECASE)
|
|
|
|
|
|
def _dedup(seq):
|
|
seen = set()
|
|
out = []
|
|
for x in seq:
|
|
if x not in seen:
|
|
seen.add(x)
|
|
out.append(x)
|
|
return out
|
|
|
|
|
|
def extract_config_changes(parsed: ParsedTranscript) -> list[tuple[str, str]]:
|
|
"""Return [(path, 'created'|'modified'), ...] deduped (first verb wins)."""
|
|
seen: dict[str, str] = {}
|
|
for ev in parsed.events:
|
|
if ev.kind == "tool_use" and ev.name in _MUTATING_TOOLS and ev.file_path:
|
|
verb = "created" if ev.name == "Write" else "modified"
|
|
if ev.file_path not in seen:
|
|
seen[ev.file_path] = verb
|
|
return list(seen.items())
|
|
|
|
|
|
def extract_commands(parsed: ParsedTranscript) -> list[tuple[str, str]]:
|
|
"""Return [(command, truncated_result), ...] for mutating shell calls.
|
|
|
|
The result is the next tool_result event following the command in timeline
|
|
order (best-effort association).
|
|
"""
|
|
out: list[tuple[str, str]] = []
|
|
events = parsed.events
|
|
for i, ev in enumerate(events):
|
|
if ev.kind == "tool_use" and ev.name in ("Bash", "PowerShell") and _is_mutating_command(ev.command):
|
|
result = ""
|
|
for j in range(i + 1, min(i + 4, len(events))):
|
|
if events[j].kind == "tool_result":
|
|
result = events[j].text
|
|
break
|
|
out.append((ev.command.strip(), result))
|
|
return out
|
|
|
|
|
|
def extract_reference(parsed: ParsedTranscript) -> dict:
|
|
text = parsed.raw_text or ""
|
|
ips = _dedup(_iter_real_ips(text))
|
|
urls = _dedup(_RE_URL.findall(text))
|
|
tickets = _dedup(_RE_TICKET.findall(text))
|
|
commits = _dedup(m for m in _RE_COMMIT.findall(text))
|
|
uuids = _dedup(_RE_UUID.findall(text))
|
|
# coord message ids = uuids appearing near the word "message"
|
|
msg_ids = _dedup(
|
|
m.group(1)
|
|
for m in re.finditer(
|
|
r"message[^0-9a-f]{0,24}([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})",
|
|
text,
|
|
re.IGNORECASE,
|
|
)
|
|
)
|
|
return {
|
|
"ips": ips,
|
|
"urls": urls,
|
|
"tickets": tickets,
|
|
"commits": commits,
|
|
"uuids": uuids,
|
|
"coord_message_ids": msg_ids,
|
|
}
|
|
|
|
|
|
def extract_infra(parsed: ParsedTranscript, ref: dict) -> dict:
|
|
text = parsed.raw_text or ""
|
|
ips = ref.get("ips", [])
|
|
# Hostnames: dotted names that are not pure IPs and look infra-ish.
|
|
hosts = []
|
|
for h in _RE_HOST.findall(text):
|
|
if _RE_IP.fullmatch(h):
|
|
continue
|
|
hosts.append(h.lower())
|
|
hosts = _dedup(hosts)
|
|
return {"ips": ips, "hosts": hosts}
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Ollama prose
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def _ollama_config() -> tuple[str, str]:
|
|
d = _identity()
|
|
o = d.get("ollama", {}) if isinstance(d.get("ollama"), dict) else {}
|
|
endpoint = o.get("endpoint") or o.get("fallback") or "http://localhost:11434"
|
|
model = o.get("prose_model") or "qwen3:14b"
|
|
return endpoint, model
|
|
|
|
|
|
_THINK_RE = re.compile(r"<think>.*?</think>", re.IGNORECASE | re.DOTALL)
|
|
|
|
|
|
def _build_digest(parsed: ParsedTranscript) -> str:
|
|
"""A bounded narrative digest for Ollama: human prompts in full, assistant
|
|
text, and tool-call one-liners. Big tool_result bodies are dropped here.
|
|
"""
|
|
lines: list[str] = []
|
|
for ev in parsed.events:
|
|
if ev.kind == "human":
|
|
lines.append(f"USER: {ev.text}")
|
|
elif ev.kind == "assistant_text":
|
|
lines.append(f"ASSISTANT: {ev.text}")
|
|
elif ev.kind == "tool_use":
|
|
lines.append(f"[tool: {ev.name} {ev.args}]")
|
|
# tool_result intentionally omitted from the digest
|
|
digest = "\n".join(lines)
|
|
if len(digest) > _DIGEST_CAP:
|
|
# keep the head and tail -- start framing + final state matter most
|
|
head = digest[: _DIGEST_CAP * 2 // 3]
|
|
tail = digest[-(_DIGEST_CAP // 3):]
|
|
digest = head + "\n...[middle elided for length]...\n" + tail
|
|
return digest
|
|
|
|
|
|
_PROSE_PROMPT = """You are writing the prose sections of an engineering session log, reconstructed from a work-session transcript. Write in plain past tense, technical, concise, NO emojis, NO filler.
|
|
|
|
Output EXACTLY these four markdown sections, with these exact headers, and nothing else (no preamble, no closing remarks):
|
|
|
|
## Session Summary
|
|
(2-4 paragraphs: what was accomplished, in what order, and why.)
|
|
|
|
## Key Decisions
|
|
(bullet list of non-obvious decisions and their rationale; "- none" if none.)
|
|
|
|
## Problems Encountered
|
|
(bullet list of problems hit and how each was resolved; "- none" if none.)
|
|
|
|
## Pending / Incomplete Tasks
|
|
(bullet list of what is left, blockers, next steps; "- none" if none.)
|
|
|
|
CRITICAL: Do NOT invent or restate specific commands, IP addresses, credentials, file paths, commit hashes, or ticket numbers -- those are recorded separately and verbatim. Describe the work at a conceptual level only.
|
|
|
|
TRANSCRIPT DIGEST:
|
|
"""
|
|
|
|
|
|
def ollama_prose(parsed: ParsedTranscript, timeout: int = 120) -> dict | None:
|
|
"""Ask Ollama for the four prose sections. Returns a dict of header->body,
|
|
or None if Ollama is unreachable / errored.
|
|
"""
|
|
endpoint, model = _ollama_config()
|
|
digest = _build_digest(parsed)
|
|
prompt = _PROSE_PROMPT + digest
|
|
body = json.dumps(
|
|
{
|
|
"model": model,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"stream": False,
|
|
"think": False,
|
|
}
|
|
).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
endpoint.rstrip("/") + "/api/chat",
|
|
data=body,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
|
payload = json.loads(resp.read().decode("utf-8"))
|
|
except (urllib.error.URLError, OSError, ValueError, TimeoutError):
|
|
return None
|
|
content = ""
|
|
try:
|
|
content = payload["message"]["content"]
|
|
except (KeyError, TypeError):
|
|
return None
|
|
content = _THINK_RE.sub("", content or "").strip()
|
|
if not content:
|
|
return None
|
|
return _split_prose_sections(content)
|
|
|
|
|
|
_PROSE_HEADERS = [
|
|
"Session Summary",
|
|
"Key Decisions",
|
|
"Problems Encountered",
|
|
"Pending / Incomplete Tasks",
|
|
]
|
|
|
|
|
|
def _split_prose_sections(text: str) -> dict:
|
|
"""Parse the four ## sections out of Ollama's output; tolerate missing ones."""
|
|
out: dict[str, str] = {}
|
|
# Build an alternation matching any of our known headers (allow minor
|
|
# whitespace variance).
|
|
header_alt = "|".join(re.escape(h) for h in _PROSE_HEADERS)
|
|
pattern = re.compile(rf"^##\s*({header_alt})\s*$", re.IGNORECASE | re.MULTILINE)
|
|
matches = list(pattern.finditer(text))
|
|
for idx, m in enumerate(matches):
|
|
header = m.group(1)
|
|
# canonicalize header capitalization to our known form
|
|
canon = next((h for h in _PROSE_HEADERS if h.lower() == header.lower()), header)
|
|
start = m.end()
|
|
end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text)
|
|
out[canon] = text[start:end].strip()
|
|
return out
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# whoami block
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def whoami_block() -> str:
|
|
"""Run whoami-block.sh and return its stdout. Falls back to a placeholder."""
|
|
script = repo_root() / ".claude" / "scripts" / "whoami-block.sh"
|
|
bash = shutil.which("bash")
|
|
if script.exists() and bash:
|
|
try:
|
|
res = subprocess.run(
|
|
[bash, str(script)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30,
|
|
cwd=str(repo_root()),
|
|
)
|
|
if res.returncode == 0 and res.stdout.strip():
|
|
return res.stdout.rstrip("\n")
|
|
except (OSError, subprocess.SubprocessError):
|
|
pass
|
|
# Fallback: build a minimal block from identity.json directly (also taken
|
|
# when 'bash' is absent from a restricted scheduler PATH).
|
|
d = _identity()
|
|
full = d.get("full_name") or d.get("user", "unknown")
|
|
user = d.get("user", "unknown")
|
|
machine = d.get("machine", "unknown")
|
|
role = d.get("role", "")
|
|
lines = ["## User", f"- **User:** {full} ({user})", f"- **Machine:** {machine}"]
|
|
if role:
|
|
lines.append(f"- **Role:** {role}")
|
|
lines.append("- **[WARNING]** whoami-block.sh unavailable; rendered from identity.json directly.")
|
|
return "\n".join(lines)
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Path computation
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def _first_ts_date(parsed: ParsedTranscript) -> str:
|
|
ts = parsed.first_ts
|
|
if ts:
|
|
try:
|
|
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
|
return dt.date().isoformat()
|
|
except ValueError:
|
|
pass
|
|
# fall back to mtime
|
|
if parsed.mtime:
|
|
return datetime.fromtimestamp(parsed.mtime, tz=timezone.utc).date().isoformat()
|
|
return datetime.now().date().isoformat()
|
|
|
|
|
|
def compute_output_path(parsed: ParsedTranscript, scope: dict, title: str) -> Path:
|
|
root = repo_root()
|
|
date = _first_ts_date(parsed)
|
|
topic = _topic_slug(title)
|
|
fname = f"{date}-recovered-{topic}.md"
|
|
|
|
if scope.get("type") == "client":
|
|
base = root / "clients" / scope["slug"] / "session-logs"
|
|
elif scope.get("type") == "project":
|
|
slug = scope["slug"]
|
|
# msp-tools sub-projects keep their session logs under the sub-project
|
|
proj_dir = root / "projects" / slug
|
|
if not proj_dir.exists():
|
|
msp_dir = root / "projects" / "msp-tools" / slug
|
|
if msp_dir.exists():
|
|
proj_dir = msp_dir
|
|
base = proj_dir / "session-logs"
|
|
else:
|
|
base = root / "session-logs"
|
|
|
|
target = base / fname
|
|
if target.exists():
|
|
short = parsed.uuid[:8]
|
|
target = base / f"{date}-recovered-{topic}-{short}.md"
|
|
return target
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Markdown assembly
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def _fmt_prose_section(header: str, prose: dict | None, fallback: str) -> str:
|
|
body = ""
|
|
if prose:
|
|
body = prose.get(header, "").strip()
|
|
if not body:
|
|
body = fallback
|
|
return f"## {header}\n\n{body}\n"
|
|
|
|
|
|
def build_log(parsed: ParsedTranscript, today: str | None = None) -> tuple[str, dict]:
|
|
"""Assemble the full reconstructed markdown log. Returns (markdown, meta)."""
|
|
today = today or datetime.now().date().isoformat()
|
|
verdict = classify(parsed)
|
|
scope = verdict["scope"]
|
|
title = verdict["title"]
|
|
|
|
prose = ollama_prose(parsed)
|
|
ollama_ok = prose is not None
|
|
placeholder = (
|
|
"_[INFO] Ollama was unreachable during recovery; this prose section was "
|
|
"not drafted. Reconstruct it from the verbatim evidence below, or re-run "
|
|
"`/recover` once Ollama is available._"
|
|
)
|
|
|
|
config_changes = extract_config_changes(parsed)
|
|
commands = extract_commands(parsed)
|
|
ref = extract_reference(parsed)
|
|
infra = extract_infra(parsed, ref)
|
|
|
|
out_path = compute_output_path(parsed, scope, title)
|
|
date = _first_ts_date(parsed)
|
|
|
|
lines: list[str] = []
|
|
|
|
# Title
|
|
lines.append(f"# [RECOVERED] {title}")
|
|
lines.append("")
|
|
|
|
# Banner
|
|
banner = (
|
|
f"> **[RECOVERED -- UNVERIFIED]** Auto-reconstructed from transcript "
|
|
f"{parsed.uuid} ({parsed.first_ts or '?'} .. {parsed.last_ts or '?'}) on "
|
|
f"{today}. Prose sections are Ollama-drafted from the transcript and may "
|
|
f"be imprecise; the Commands/Config/Reference sections are extracted "
|
|
f"verbatim. Review and correct, then remove this banner."
|
|
)
|
|
lines.append(banner)
|
|
lines.append("")
|
|
|
|
# User block
|
|
lines.append(whoami_block())
|
|
lines.append("")
|
|
|
|
# Prose sections (Ollama) -- in save.md order
|
|
lines.append(_fmt_prose_section("Session Summary", prose, placeholder))
|
|
lines.append(_fmt_prose_section("Key Decisions", prose, placeholder if not ollama_ok else "- none recorded"))
|
|
lines.append(_fmt_prose_section("Problems Encountered", prose, placeholder if not ollama_ok else "- none recorded"))
|
|
|
|
# Configuration Changes (verbatim)
|
|
lines.append("## Configuration Changes")
|
|
lines.append("")
|
|
lines.append("_Machine-extracted verbatim from the transcript (file targets of Write/Edit/NotebookEdit)._")
|
|
lines.append("")
|
|
if config_changes:
|
|
for fp, verb in config_changes:
|
|
lines.append(f"- [{verb}] `{fp}`")
|
|
else:
|
|
lines.append("- none detected")
|
|
lines.append("")
|
|
|
|
# Credentials & Secrets
|
|
lines.append("## Credentials & Secrets")
|
|
lines.append("")
|
|
lines.append("_Machine-extracted; review carefully -- secrets are not auto-harvested from transcripts._")
|
|
lines.append("")
|
|
lines.append("- none detected (verify against the Commands & Outputs section)")
|
|
lines.append("")
|
|
|
|
# Infrastructure & Servers (verbatim regex)
|
|
lines.append("## Infrastructure & Servers")
|
|
lines.append("")
|
|
lines.append("_Machine-extracted verbatim (IP / hostname regex hits across the whole transcript)._")
|
|
lines.append("")
|
|
if infra["ips"] or infra["hosts"]:
|
|
if infra["ips"]:
|
|
lines.append("- **IPs:** " + ", ".join(f"`{x}`" for x in infra["ips"][:40]))
|
|
if infra["hosts"]:
|
|
lines.append("- **Hosts:** " + ", ".join(f"`{x}`" for x in infra["hosts"][:40]))
|
|
else:
|
|
lines.append("- none detected (verify)")
|
|
lines.append("")
|
|
|
|
# Commands & Outputs (verbatim)
|
|
lines.append("## Commands & Outputs")
|
|
lines.append("")
|
|
lines.append("_Machine-extracted verbatim: mutating Bash/PowerShell commands with truncated output._")
|
|
lines.append("")
|
|
if commands:
|
|
for cmd, result in commands:
|
|
lines.append("```")
|
|
lines.append(cmd)
|
|
lines.append("```")
|
|
if result:
|
|
lines.append(f"Output: {result}")
|
|
lines.append("")
|
|
else:
|
|
lines.append("- none detected")
|
|
lines.append("")
|
|
|
|
# Pending / Incomplete Tasks (Ollama)
|
|
lines.append(_fmt_prose_section("Pending / Incomplete Tasks", prose, placeholder if not ollama_ok else "- none recorded"))
|
|
|
|
# Reference Information (verbatim)
|
|
lines.append("## Reference Information")
|
|
lines.append("")
|
|
lines.append("_Machine-extracted verbatim from the whole transcript via regex. Treat as leads, not gospel; deduped._")
|
|
lines.append("")
|
|
any_ref = False
|
|
if ref["commits"]:
|
|
any_ref = True
|
|
lines.append("- **Commit SHAs:** " + ", ".join(f"`{x}`" for x in ref["commits"][:40]))
|
|
if ref["urls"]:
|
|
any_ref = True
|
|
lines.append("- **URLs:** " + ", ".join(ref["urls"][:40]))
|
|
if ref["ips"]:
|
|
any_ref = True
|
|
lines.append("- **IPs:** " + ", ".join(f"`{x}`" for x in ref["ips"][:40]))
|
|
if ref["tickets"]:
|
|
any_ref = True
|
|
lines.append("- **Ticket numbers:** " + ", ".join(ref["tickets"][:40]))
|
|
if ref["coord_message_ids"]:
|
|
any_ref = True
|
|
lines.append("- **Coord message ids:** " + ", ".join(f"`{x}`" for x in ref["coord_message_ids"][:40]))
|
|
if not any_ref:
|
|
lines.append("- none detected")
|
|
lines.append("")
|
|
|
|
markdown = "\n".join(lines).rstrip() + "\n"
|
|
|
|
meta = {
|
|
"uuid": parsed.uuid,
|
|
"path_would_be": str(out_path),
|
|
"substantive": verdict["substantive"],
|
|
"saved": verdict["saved"],
|
|
"scope": scope,
|
|
"title": title,
|
|
"first_ts": parsed.first_ts,
|
|
"last_ts": parsed.last_ts,
|
|
"mtime": parsed.mtime,
|
|
"human_prompt_count": verdict["human_prompt_count"],
|
|
"mutating_actions": verdict["mutating_actions"],
|
|
"date": date,
|
|
"ollama_ok": ollama_ok,
|
|
}
|
|
return markdown, meta
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# CLI
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def _metadata_only(parsed: ParsedTranscript) -> dict:
|
|
"""Cheap metadata JSON without invoking Ollama or assembling markdown."""
|
|
verdict = classify(parsed)
|
|
scope = verdict["scope"]
|
|
title = verdict["title"]
|
|
out_path = compute_output_path(parsed, scope, title)
|
|
return {
|
|
"uuid": parsed.uuid,
|
|
"path_would_be": str(out_path),
|
|
"substantive": verdict["substantive"],
|
|
"saved": verdict["saved"],
|
|
"scope": scope,
|
|
"title": title,
|
|
"first_ts": parsed.first_ts,
|
|
"last_ts": parsed.last_ts,
|
|
"mtime": parsed.mtime,
|
|
"human_prompt_count": verdict["human_prompt_count"],
|
|
"mutating_actions": verdict["mutating_actions"],
|
|
"date": _first_ts_date(parsed),
|
|
}
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
# On Windows the console defaults to cp1252; transcripts (and Ollama prose)
|
|
# routinely contain characters outside that codepage. Force UTF-8 stdout so
|
|
# --print / --json never crash on an un-encodable glyph.
|
|
try:
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
|
|
except (AttributeError, ValueError):
|
|
pass
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description="Reconstruct a ClaudeTools session log from a Claude Code transcript."
|
|
)
|
|
sel = parser.add_mutually_exclusive_group(required=True)
|
|
sel.add_argument("--uuid", help="transcript uuid (filename without .jsonl)")
|
|
sel.add_argument("--latest", action="store_true", help="newest transcript by mtime")
|
|
sel.add_argument("--path", help="explicit path to a transcript .jsonl")
|
|
|
|
out = parser.add_mutually_exclusive_group()
|
|
out.add_argument("--print", dest="do_print", action="store_true", help="write markdown to stdout (default)")
|
|
out.add_argument("--auto", action="store_true", help="write the log to the computed path; print one-line JSON")
|
|
out.add_argument("--json", dest="do_json", action="store_true", help="print metadata JSON only; write nothing")
|
|
|
|
args = parser.parse_args(argv)
|
|
|
|
try:
|
|
path = resolve(uuid=args.uuid, latest=args.latest, path=args.path)
|
|
except (FileNotFoundError, ValueError) as e:
|
|
print(f"[ERROR] {e}", file=sys.stderr)
|
|
return 2
|
|
|
|
parsed = parse_transcript(path)
|
|
|
|
if args.do_json:
|
|
print(json.dumps(_metadata_only(parsed), ensure_ascii=False))
|
|
return 0
|
|
|
|
markdown, meta = build_log(parsed)
|
|
|
|
if args.auto:
|
|
out_path = Path(meta["path_would_be"])
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(markdown, encoding="utf-8")
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"written": str(out_path),
|
|
"scope": meta["scope"],
|
|
"uuid": meta["uuid"],
|
|
"date": meta["date"],
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
)
|
|
return 0
|
|
|
|
# default / --print
|
|
sys.stdout.write(markdown)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|