#!/usr/bin/env python3 """recover_session.py -- reconstruct a ClaudeTools session log from a Claude Code transcript. Claude Code writes every session live to a transcript JSONL under ``~/.claude/projects//.jsonl`` (slug = the repo root path with ``/``, ``\\`` and ``:`` each replaced by ``-``). When a session crashes or is closed before ``/save`` runs, the work is still fully recorded in that transcript. This module distills a transcript back into a normal session log that follows the ``.claude/commands/save.md`` format. Accuracy split (deliberate): - Ollama drafts ONLY the prose sections (Session Summary, Key Decisions, Problems Encountered, Pending / Incomplete Tasks). It never sees -- and never emits -- commands, IPs, credentials, file paths, commit SHAs, or ticket IDs. - Python extracts the high-value, accuracy-critical evidence verbatim (Configuration Changes, Commands & Outputs, Reference Information, Infrastructure & Servers, Credentials & Secrets). If Ollama is unreachable the log is still produced -- the prose sections carry a placeholder note and the verbatim evidence appendix (the important part) is intact. CLI: recover_session.py --uuid [--print | --auto | --json] recover_session.py --latest [--print | --auto | --json] recover_session.py --path [--print | --auto | --json] Importable API (the detector uses these): iter_events(path) -> yields raw decoded JSON objects, in file order parse_transcript(path) -> ParsedTranscript classify(parsed) -> dict with substantive/saved/scope/... verdict build_log(parsed, today=None) -> (markdown_str, meta_dict) resolve(uuid=None, latest=False, path=None) -> Path stdlib only; targets Python 3.11+. """ from __future__ import annotations import argparse import json import re import shutil import subprocess import sys import urllib.error import urllib.request from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path # --------------------------------------------------------------------------- # # Constants # --------------------------------------------------------------------------- # # Tools that, when used, mark a session as "substantive" (it mutated something). _MUTATING_TOOLS = {"Write", "Edit", "NotebookEdit"} # Shell commands (Bash / PowerShell) that count as mutating actions. Matched # case-insensitively as a search (not anchored) against the command string. _MUTATING_CMD_PATTERNS = [ r"git\s+(commit|push|add)\b", r"\bssh\b", r"\bschtasks\b", r"\bNew-Item\b", r"\bSet-Content\b", r"\bRemove-Item\b", r"\bOut-File\b", r"curl\b.*-X\s*(POST|PUT|DELETE|PATCH)", r"/api/", r"vault\.sh\b", r"Invoke-RestMethod\b.*-Method\s*(Post|Put|Delete)", ] _MUTATING_CMD_RE = re.compile("|".join(_MUTATING_CMD_PATTERNS), re.IGNORECASE) # Skills whose use implies real, mutating work was performed. _MUTATING_SKILLS = { "syncro", "rmm", "remediation-tool", "mailbox", "forum-post", "syncro-emergency-billing", } # Skills / file markers that indicate the session WAS already saved. _SAVE_SKILLS = {"save", "scc", "checkpoint"} _SESSION_LOG_MARKERS = ("session-logs/", "session-logs\\") # Tool-result truncation budget. _RESULT_TRUNC = 300 # Ollama digest budget. _DIGEST_CAP = 16000 # Commit footer (matches the repo's standard). _COMMIT_FOOTER = "Co-Authored-By: Claude Opus 4.8 (1M context) " # --------------------------------------------------------------------------- # # Repo / path resolution # --------------------------------------------------------------------------- # def repo_root() -> Path: """Return the ClaudeTools repo root. Prefer ``claudetools_root`` from ``.claude/identity.json`` (portable across machines); fall back to two levels up from this script (``.../.claude/scripts/`` -> repo root). """ here = Path(__file__).resolve() fallback = here.parents[2] # .../.claude/scripts/recover_session.py -> repo root id_path = fallback / ".claude" / "identity.json" try: data = json.loads(id_path.read_text(encoding="utf-8")) root = data.get("claudetools_root") if root: p = Path(root) if p.exists(): return p except (OSError, ValueError): pass return fallback def _identity() -> dict: try: return json.loads( (repo_root() / ".claude" / "identity.json").read_text(encoding="utf-8") ) except (OSError, ValueError): return {} def transcript_base_dir() -> Path: """Compute ``~/.claude/projects/`` from identity's claudetools_root.""" root = _identity().get("claudetools_root") or str(repo_root()) slug = re.sub(r"[/\\:]", "-", root) return Path.home() / ".claude" / "projects" / slug def resolve(uuid: str | None = None, latest: bool = False, path: str | None = None) -> Path: """Resolve the transcript file path from one of the three selectors.""" if path: p = Path(path) if not p.exists(): raise FileNotFoundError(f"transcript not found: {p}") return p base = transcript_base_dir() if latest: candidates = sorted( base.glob("*.jsonl"), key=lambda f: f.stat().st_mtime, reverse=True ) if not candidates: raise FileNotFoundError(f"no transcripts in {base}") return candidates[0] if uuid: p = base / f"{uuid}.jsonl" if not p.exists(): raise FileNotFoundError(f"transcript not found: {p}") return p raise ValueError("one of uuid / latest / path is required") # --------------------------------------------------------------------------- # # Parsing # --------------------------------------------------------------------------- # _SYSTEM_REMINDER_RE = re.compile( r".*?", re.IGNORECASE | re.DOTALL ) # Long base64-ish blobs (data URLs and bare runs). Replace with a placeholder. _DATAURL_RE = re.compile(r"data:[^;]+;base64,[A-Za-z0-9+/=\s]+", re.IGNORECASE) _BASE64_RUN_RE = re.compile(r"[A-Za-z0-9+/]{200,}={0,2}") def _strip_noise(text: str) -> str: if not text: return "" text = _SYSTEM_REMINDER_RE.sub("", text) text = _DATAURL_RE.sub("[base64 blob omitted]", text) text = _BASE64_RUN_RE.sub("[base64 blob omitted]", text) return text def _truncate(text: str, limit: int = _RESULT_TRUNC) -> str: text = (text or "").strip() if len(text) <= limit: return text return text[:limit].rstrip() + " ... [truncated]" def _flatten_content(content) -> str: """Flatten a message-content value (str, or list of blocks) to plain text.""" if content is None: return "" if isinstance(content, str): return content parts: list[str] = [] if isinstance(content, list): for blk in content: if isinstance(blk, str): parts.append(blk) elif isinstance(blk, dict): if blk.get("type") == "text" and isinstance(blk.get("text"), str): parts.append(blk["text"]) elif "text" in blk and isinstance(blk["text"], str): parts.append(blk["text"]) return "\n".join(parts) def _concise_args(name: str, inp: dict) -> str: """One-line, human-readable summary of a tool_use input.""" if not isinstance(inp, dict): return "" if name in ("Bash", "PowerShell"): cmd = inp.get("command", "") return _truncate(cmd.replace("\n", " "), 200) if name in ("Write", "Edit", "NotebookEdit"): return inp.get("file_path") or inp.get("notebook_path") or "" if name == "Read": return inp.get("file_path", "") if name in ("Glob", "Grep"): bits = [] if inp.get("pattern"): bits.append(f"pattern={inp['pattern']}") if inp.get("path"): bits.append(f"path={inp['path']}") if inp.get("glob"): bits.append(f"glob={inp['glob']}") return " ".join(bits) if name == "Skill": skill = inp.get("skill", "") args = _truncate(str(inp.get("args", "")).replace("\n", " "), 160) return f"{skill}: {args}" if args else skill if name in ("WebFetch", "WebSearch"): return _truncate(str(inp.get("url") or inp.get("query") or ""), 160) # generic return _truncate(json.dumps(inp, ensure_ascii=False), 160) @dataclass class Event: kind: str # "human" | "assistant_text" | "tool_use" | "tool_result" text: str = "" name: str = "" # tool name (tool_use) args: str = "" # concise args (tool_use) file_path: str = "" # for Write/Edit/NotebookEdit skill: str = "" # for Skill tool_use command: str = "" # raw command for Bash/PowerShell tool_use timestamp: str = "" @dataclass class ParsedTranscript: path: Path uuid: str events: list[Event] = field(default_factory=list) first_ts: str = "" last_ts: str = "" cwd: str = "" git_branch: str = "" ai_title: str = "" raw_text: str = "" # whole-transcript concatenation for regex sweeps mtime: float = 0.0 def iter_events(path: str | Path): """Yield raw decoded JSON objects from a transcript, in file order. Malformed lines are skipped silently (transcripts can have partial last lines after a crash -- exactly the case we exist to recover from). """ p = Path(path) with p.open("r", encoding="utf-8", errors="replace") as fh: for line in fh: line = line.strip() if not line: continue try: yield json.loads(line) except ValueError: continue def parse_transcript(path: str | Path) -> ParsedTranscript: """Parse a transcript into an ordered Event list plus metadata.""" p = Path(path) parsed = ParsedTranscript(path=p, uuid=p.stem) try: parsed.mtime = p.stat().st_mtime except OSError: parsed.mtime = 0.0 raw_chunks: list[str] = [] for obj in iter_events(p): if not isinstance(obj, dict): continue t = obj.get("type") ts = obj.get("timestamp") if isinstance(ts, str): if not parsed.first_ts: parsed.first_ts = ts parsed.last_ts = ts if obj.get("cwd") and not parsed.cwd: parsed.cwd = obj["cwd"] if obj.get("gitBranch") and not parsed.git_branch: parsed.git_branch = obj["gitBranch"] # ai-title metadata -- usable title hint if t == "ai-title": title = obj.get("aiTitle") if isinstance(title, str) and title.strip(): parsed.ai_title = title.strip() continue # Skip subagent / sidechain lines for the main timeline. if obj.get("isSidechain"): continue if t == "assistant": msg = obj.get("message", {}) or {} content = msg.get("content", []) if not isinstance(content, list): continue for blk in content: if not isinstance(blk, dict): continue btype = blk.get("type") if btype == "text": txt = _strip_noise(blk.get("text", "")).strip() if txt: parsed.events.append( Event(kind="assistant_text", text=txt, timestamp=ts or "") ) raw_chunks.append(txt) elif btype == "tool_use": name = blk.get("name", "") inp = blk.get("input", {}) or {} ev = Event( kind="tool_use", name=name, args=_concise_args(name, inp), timestamp=ts or "", ) if name in _MUTATING_TOOLS: ev.file_path = inp.get("file_path") or inp.get( "notebook_path", "" ) if name == "Skill": ev.skill = inp.get("skill", "") if name in ("Bash", "PowerShell"): ev.command = inp.get("command", "") or "" parsed.events.append(ev) raw_chunks.append(f"{name} {ev.args}") elif t == "user": msg = obj.get("message", {}) or {} content = msg.get("content") if isinstance(content, str): # A real human-typed prompt. txt = _strip_noise(content).strip() if txt: parsed.events.append( Event(kind="human", text=txt, timestamp=ts or "") ) raw_chunks.append(txt) elif isinstance(content, list): # tool_result blocks (tool output -- NOT a human prompt). for blk in content: if not isinstance(blk, dict): continue if blk.get("type") == "tool_result": body = _flatten_content(blk.get("content")) body = _strip_noise(body) if body.strip(): parsed.events.append( Event( kind="tool_result", text=_truncate(body), timestamp=ts or "", ) ) raw_chunks.append(body[:1000]) # other metadata types (mode, permission-mode, system, attachment, # file-history-snapshot, queue-operation, last-prompt) -> skipped. parsed.raw_text = "\n".join(raw_chunks) return parsed # --------------------------------------------------------------------------- # # Classification # --------------------------------------------------------------------------- # def _is_mutating_command(cmd: str) -> bool: return bool(cmd) and bool(_MUTATING_CMD_RE.search(cmd)) # Project dirs that are NOT real session-log homes for client/project work: # transcript/conversation archives, and generic catch-all buckets that should # never become a session scope. A transcript that would otherwise classify to # one of these must fall through to a real project (or to general). _ARCHIVE_DIR_RE = re.compile(r"-conversation-logs$", re.IGNORECASE) _GENERIC_PROJECT_NAMES = {"internal", "scripts", "toolkit", "utilities"} def _is_valid_project_slug(name: str) -> bool: """A valid project scope is a real work dir that could sensibly own a ``session-logs/`` subdir -- not a transcript archive or a generic bucket. """ if _ARCHIVE_DIR_RE.search(name): return False if name.lower() in _GENERIC_PROJECT_NAMES: return False return True def _known_slugs() -> tuple[set[str], set[str]]: """Return (client_slugs, project_slugs) discovered from the repo layout. Project slugs exclude transcript archives (``*-conversation-logs``) and the generic catch-all names (``internal``, ``scripts``, ``toolkit``, ``utilities``) -- see ``_is_valid_project_slug``. """ root = repo_root() clients: set[str] = set() projects: set[str] = set() clients_dir = root / "clients" if clients_dir.is_dir(): for d in clients_dir.iterdir(): if d.is_dir() and not d.name.startswith("_"): clients.add(d.name) wiki_clients = root / "wiki" / "clients" if wiki_clients.is_dir(): for f in wiki_clients.glob("*.md"): clients.add(f.stem) proj_dir = root / "projects" if proj_dir.is_dir(): for d in proj_dir.iterdir(): if d.is_dir() and _is_valid_project_slug(d.name): projects.add(d.name) msp = proj_dir / "msp-tools" if msp.is_dir(): for d in msp.iterdir(): if d.is_dir() and _is_valid_project_slug(d.name): projects.add(d.name) return clients, projects def _slug_to_words(slug: str) -> list[str]: """Split a slug into matchable word tokens, dropping trivial ones.""" words = [w for w in re.split(r"[-_]+", slug.lower()) if len(w) >= 4] return words def classify(parsed: ParsedTranscript) -> dict: """Classify a parsed transcript. Returns a dict with: substantive (bool), saved (bool), scope (dict), title (str), human_prompt_count (int), mutating_actions (list[str]). """ substantive = False saved = False mutating_actions: list[str] = [] human_prompt_count = 0 for ev in parsed.events: if ev.kind == "human": human_prompt_count += 1 continue if ev.kind != "tool_use": continue name = ev.name # saved? -- save/scc/checkpoint skill, or a write into session-logs/ if name == "Skill" and ev.skill in _SAVE_SKILLS: saved = True if name in _MUTATING_TOOLS and ev.file_path: fp = ev.file_path.replace("\\", "/") if any(m.replace("\\", "/") in fp or m in ev.file_path for m in _SESSION_LOG_MARKERS): saved = True # substantive? if name in _MUTATING_TOOLS: substantive = True label = f"{name} {ev.file_path}".strip() mutating_actions.append(label) elif name in ("Bash", "PowerShell"): if _is_mutating_command(ev.command): substantive = True mutating_actions.append(f"{name}: {_truncate(ev.command.replace(chr(10),' '),120)}") elif name == "Skill" and ev.skill in _MUTATING_SKILLS: substantive = True mutating_actions.append(f"Skill: {ev.skill}") scope = _classify_scope(parsed) title = _derive_title(parsed) return { "substantive": substantive, "saved": saved, "scope": scope, "title": title, "human_prompt_count": human_prompt_count, "mutating_actions": mutating_actions, } def _classify_scope(parsed: ParsedTranscript) -> dict: """Decide client / project / general scope. Conservative: ambiguous -> general.""" clients, projects = _known_slugs() haystack_parts = [parsed.raw_text or "", parsed.cwd or "", parsed.git_branch or ""] haystack = "\n".join(haystack_parts).lower() def score(slug: str) -> int: words = _slug_to_words(slug) if not words: # very short slug -- only count whole-slug hits to avoid noise return haystack.count(slug.lower()) # Require the full slug phrase OR all words present; score by frequency # of the longest token to keep it bounded and meaningful. total = 0 if slug.lower() in haystack: total += haystack.count(slug.lower()) * 3 for w in words: total += haystack.count(w) return total client_scores = {s: score(s) for s in clients} project_scores = {s: score(s) for s in projects} best_client = max(client_scores.items(), key=lambda kv: kv[1], default=(None, 0)) best_project = max(project_scores.items(), key=lambda kv: kv[1], default=(None, 0)) # cwd-based project hint (strong signal): cwd inside projects//... cwd_norm = (parsed.cwd or "").replace("\\", "/").lower() cwd_project = None m = re.search(r"/projects/(?:msp-tools/)?([a-z0-9._-]+)", cwd_norm) if m and m.group(1) in projects: cwd_project = m.group(1) cwd_client = None m = re.search(r"/clients/([a-z0-9._-]+)", cwd_norm) if m and m.group(1) in clients: cwd_client = m.group(1) # Minimum confidence thresholds -- be conservative. CLIENT_MIN = 4 PROJECT_MIN = 4 DOMINANCE = 2 # winner must beat runner-up by this factor to count # cwd hints win if present and unambiguous. if cwd_client and not cwd_project: return {"type": "client", "slug": cwd_client} if cwd_project and not cwd_client: return {"type": "project", "slug": cwd_project} c_slug, c_score = best_client p_slug, p_score = best_project # Determine the dominant category. client_ok = c_slug and c_score >= CLIENT_MIN project_ok = p_slug and p_score >= PROJECT_MIN if client_ok and (not project_ok or c_score >= p_score * DOMINANCE): return {"type": "client", "slug": c_slug} if project_ok and (not client_ok or p_score >= c_score * DOMINANCE): return {"type": "project", "slug": p_slug} return {"type": "general"} def _derive_title(parsed: ParsedTranscript) -> str: if parsed.ai_title: return parsed.ai_title # first human prompt -> first sentence / first 70 chars for ev in parsed.events: if ev.kind == "human" and ev.text.strip(): line = ev.text.strip().splitlines()[0] line = re.sub(r"\s+", " ", line).strip() return _truncate(line, 70) return "recovered session" def _topic_slug(title: str) -> str: slug = re.sub(r"[^a-z0-9]+", "-", (title or "").lower()).strip("-") slug = re.sub(r"-{2,}", "-", slug) return (slug or "session")[:48].strip("-") or "session" # --------------------------------------------------------------------------- # # Evidence extraction (verbatim -- Python only) # --------------------------------------------------------------------------- # _RE_IP = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b") _RE_URL = re.compile(r"https?://[^\s\"'<>)\]]+") # A dotted run with 5+ numeric components is a version string, never an IP # (e.g. "1.2.3.4.5"). Used to reject dotted-quad matches that are a sub-span of # a longer version. _RE_DOTTED_VERSION = re.compile(r"\d+(?:\.\d+){4,}") # Version context that immediately precedes a dotted-quad marks it as a version, # not an IP -- e.g. "version 1.9.158.0", "build 6.5.60.172", "v1.2.3.4". The # trailing optional separators ("v"/space/colon/equals/parens) sit between the # keyword and the number. A trailing bare "v"/"V" alone also counts. _RE_VERSION_CONTEXT = re.compile( r"(?:\b(?:version|ver|build|rev|revision|release|agent|firmware|fw|" r"v)\b\s*[:=]?\s*v?|[vV])$", re.IGNORECASE, ) def _iter_real_ips(text: str): """Yield dotted-quads from ``text`` that are plausibly real IPv4 addresses. Rejects version-like strings via a deliberately small rule set: - any octet outside 0-255 (e.g. "1.9.158.300" is not an IP), - a match that is a sub-span of a longer dotted version with 5+ components (e.g. the "1.2.3.4" inside "1.2.3.4.5"), - a match preceded by a version marker -- a bare ``v``/``V`` (e.g. "v1.2.3.4") or a version keyword like "version"/"build" immediately before it (e.g. "version 1.9.158.0", "build 6.5.60.172"). Order-preserving; de-duplication is the caller's job. """ if not text: return # Spans covered by a 5+-component dotted version -> not IPs. version_spans = [m.span() for m in _RE_DOTTED_VERSION.finditer(text)] for m in _RE_IP.finditer(text): octets = m.group(0).split(".") if any(not (0 <= int(o) <= 255) for o in octets): continue start = m.start() # Reject if this match sits inside a longer dotted version. if any(vs <= start and m.end() <= ve for vs, ve in version_spans): continue # Reject if immediately preceded by version context. if _RE_VERSION_CONTEXT.search(text[:start]): continue yield m.group(0) _RE_TICKET = re.compile(r"#\d{4,}") _RE_COMMIT = re.compile(r"(?:\bcommit\b[^0-9a-f]{0,12})([0-9a-f]{7,40})\b", re.IGNORECASE) _RE_UUID = re.compile( r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b", re.IGNORECASE ) _RE_HOST = re.compile(r"\b(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z]{2,}\b", re.IGNORECASE) def _dedup(seq): seen = set() out = [] for x in seq: if x not in seen: seen.add(x) out.append(x) return out def extract_config_changes(parsed: ParsedTranscript) -> list[tuple[str, str]]: """Return [(path, 'created'|'modified'), ...] deduped (first verb wins).""" seen: dict[str, str] = {} for ev in parsed.events: if ev.kind == "tool_use" and ev.name in _MUTATING_TOOLS and ev.file_path: verb = "created" if ev.name == "Write" else "modified" if ev.file_path not in seen: seen[ev.file_path] = verb return list(seen.items()) def extract_commands(parsed: ParsedTranscript) -> list[tuple[str, str]]: """Return [(command, truncated_result), ...] for mutating shell calls. The result is the next tool_result event following the command in timeline order (best-effort association). """ out: list[tuple[str, str]] = [] events = parsed.events for i, ev in enumerate(events): if ev.kind == "tool_use" and ev.name in ("Bash", "PowerShell") and _is_mutating_command(ev.command): result = "" for j in range(i + 1, min(i + 4, len(events))): if events[j].kind == "tool_result": result = events[j].text break out.append((ev.command.strip(), result)) return out def extract_reference(parsed: ParsedTranscript) -> dict: text = parsed.raw_text or "" ips = _dedup(_iter_real_ips(text)) urls = _dedup(_RE_URL.findall(text)) tickets = _dedup(_RE_TICKET.findall(text)) commits = _dedup(m for m in _RE_COMMIT.findall(text)) uuids = _dedup(_RE_UUID.findall(text)) # coord message ids = uuids appearing near the word "message" msg_ids = _dedup( m.group(1) for m in re.finditer( r"message[^0-9a-f]{0,24}([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})", text, re.IGNORECASE, ) ) return { "ips": ips, "urls": urls, "tickets": tickets, "commits": commits, "uuids": uuids, "coord_message_ids": msg_ids, } def extract_infra(parsed: ParsedTranscript, ref: dict) -> dict: text = parsed.raw_text or "" ips = ref.get("ips", []) # Hostnames: dotted names that are not pure IPs and look infra-ish. hosts = [] for h in _RE_HOST.findall(text): if _RE_IP.fullmatch(h): continue hosts.append(h.lower()) hosts = _dedup(hosts) return {"ips": ips, "hosts": hosts} # --------------------------------------------------------------------------- # # Ollama prose # --------------------------------------------------------------------------- # def _ollama_config() -> tuple[str, str]: d = _identity() o = d.get("ollama", {}) if isinstance(d.get("ollama"), dict) else {} endpoint = o.get("endpoint") or o.get("fallback") or "http://localhost:11434" model = o.get("prose_model") or "qwen3:14b" return endpoint, model _THINK_RE = re.compile(r".*?", re.IGNORECASE | re.DOTALL) def _build_digest(parsed: ParsedTranscript) -> str: """A bounded narrative digest for Ollama: human prompts in full, assistant text, and tool-call one-liners. Big tool_result bodies are dropped here. """ lines: list[str] = [] for ev in parsed.events: if ev.kind == "human": lines.append(f"USER: {ev.text}") elif ev.kind == "assistant_text": lines.append(f"ASSISTANT: {ev.text}") elif ev.kind == "tool_use": lines.append(f"[tool: {ev.name} {ev.args}]") # tool_result intentionally omitted from the digest digest = "\n".join(lines) if len(digest) > _DIGEST_CAP: # keep the head and tail -- start framing + final state matter most head = digest[: _DIGEST_CAP * 2 // 3] tail = digest[-(_DIGEST_CAP // 3):] digest = head + "\n...[middle elided for length]...\n" + tail return digest _PROSE_PROMPT = """You are writing the prose sections of an engineering session log, reconstructed from a work-session transcript. Write in plain past tense, technical, concise, NO emojis, NO filler. Output EXACTLY these four markdown sections, with these exact headers, and nothing else (no preamble, no closing remarks): ## Session Summary (2-4 paragraphs: what was accomplished, in what order, and why.) ## Key Decisions (bullet list of non-obvious decisions and their rationale; "- none" if none.) ## Problems Encountered (bullet list of problems hit and how each was resolved; "- none" if none.) ## Pending / Incomplete Tasks (bullet list of what is left, blockers, next steps; "- none" if none.) CRITICAL: Do NOT invent or restate specific commands, IP addresses, credentials, file paths, commit hashes, or ticket numbers -- those are recorded separately and verbatim. Describe the work at a conceptual level only. TRANSCRIPT DIGEST: """ def ollama_prose(parsed: ParsedTranscript, timeout: int = 120) -> dict | None: """Ask Ollama for the four prose sections. Returns a dict of header->body, or None if Ollama is unreachable / errored. """ endpoint, model = _ollama_config() digest = _build_digest(parsed) prompt = _PROSE_PROMPT + digest body = json.dumps( { "model": model, "messages": [{"role": "user", "content": prompt}], "stream": False, "think": False, } ).encode("utf-8") req = urllib.request.Request( endpoint.rstrip("/") + "/api/chat", data=body, headers={"Content-Type": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=timeout) as resp: payload = json.loads(resp.read().decode("utf-8")) except (urllib.error.URLError, OSError, ValueError, TimeoutError): return None content = "" try: content = payload["message"]["content"] except (KeyError, TypeError): return None content = _THINK_RE.sub("", content or "").strip() if not content: return None return _split_prose_sections(content) _PROSE_HEADERS = [ "Session Summary", "Key Decisions", "Problems Encountered", "Pending / Incomplete Tasks", ] def _split_prose_sections(text: str) -> dict: """Parse the four ## sections out of Ollama's output; tolerate missing ones.""" out: dict[str, str] = {} # Build an alternation matching any of our known headers (allow minor # whitespace variance). header_alt = "|".join(re.escape(h) for h in _PROSE_HEADERS) pattern = re.compile(rf"^##\s*({header_alt})\s*$", re.IGNORECASE | re.MULTILINE) matches = list(pattern.finditer(text)) for idx, m in enumerate(matches): header = m.group(1) # canonicalize header capitalization to our known form canon = next((h for h in _PROSE_HEADERS if h.lower() == header.lower()), header) start = m.end() end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text) out[canon] = text[start:end].strip() return out # --------------------------------------------------------------------------- # # whoami block # --------------------------------------------------------------------------- # def whoami_block() -> str: """Run whoami-block.sh and return its stdout. Falls back to a placeholder.""" script = repo_root() / ".claude" / "scripts" / "whoami-block.sh" bash = shutil.which("bash") if script.exists() and bash: try: res = subprocess.run( [bash, str(script)], capture_output=True, text=True, timeout=30, cwd=str(repo_root()), ) if res.returncode == 0 and res.stdout.strip(): return res.stdout.rstrip("\n") except (OSError, subprocess.SubprocessError): pass # Fallback: build a minimal block from identity.json directly (also taken # when 'bash' is absent from a restricted scheduler PATH). d = _identity() full = d.get("full_name") or d.get("user", "unknown") user = d.get("user", "unknown") machine = d.get("machine", "unknown") role = d.get("role", "") lines = ["## User", f"- **User:** {full} ({user})", f"- **Machine:** {machine}"] if role: lines.append(f"- **Role:** {role}") lines.append("- **[WARNING]** whoami-block.sh unavailable; rendered from identity.json directly.") return "\n".join(lines) # --------------------------------------------------------------------------- # # Path computation # --------------------------------------------------------------------------- # def _first_ts_date(parsed: ParsedTranscript) -> str: ts = parsed.first_ts if ts: try: dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) return dt.date().isoformat() except ValueError: pass # fall back to mtime if parsed.mtime: return datetime.fromtimestamp(parsed.mtime, tz=timezone.utc).date().isoformat() return datetime.now().date().isoformat() def compute_output_path(parsed: ParsedTranscript, scope: dict, title: str) -> Path: root = repo_root() date = _first_ts_date(parsed) topic = _topic_slug(title) fname = f"{date}-recovered-{topic}.md" if scope.get("type") == "client": base = root / "clients" / scope["slug"] / "session-logs" elif scope.get("type") == "project": slug = scope["slug"] # msp-tools sub-projects keep their session logs under the sub-project proj_dir = root / "projects" / slug if not proj_dir.exists(): msp_dir = root / "projects" / "msp-tools" / slug if msp_dir.exists(): proj_dir = msp_dir base = proj_dir / "session-logs" else: base = root / "session-logs" target = base / fname if target.exists(): short = parsed.uuid[:8] target = base / f"{date}-recovered-{topic}-{short}.md" return target # --------------------------------------------------------------------------- # # Markdown assembly # --------------------------------------------------------------------------- # def _fmt_prose_section(header: str, prose: dict | None, fallback: str) -> str: body = "" if prose: body = prose.get(header, "").strip() if not body: body = fallback return f"## {header}\n\n{body}\n" def build_log(parsed: ParsedTranscript, today: str | None = None) -> tuple[str, dict]: """Assemble the full reconstructed markdown log. Returns (markdown, meta).""" today = today or datetime.now().date().isoformat() verdict = classify(parsed) scope = verdict["scope"] title = verdict["title"] prose = ollama_prose(parsed) ollama_ok = prose is not None placeholder = ( "_[INFO] Ollama was unreachable during recovery; this prose section was " "not drafted. Reconstruct it from the verbatim evidence below, or re-run " "`/recover` once Ollama is available._" ) config_changes = extract_config_changes(parsed) commands = extract_commands(parsed) ref = extract_reference(parsed) infra = extract_infra(parsed, ref) out_path = compute_output_path(parsed, scope, title) date = _first_ts_date(parsed) lines: list[str] = [] # Title lines.append(f"# [RECOVERED] {title}") lines.append("") # Banner banner = ( f"> **[RECOVERED -- UNVERIFIED]** Auto-reconstructed from transcript " f"{parsed.uuid} ({parsed.first_ts or '?'} .. {parsed.last_ts or '?'}) on " f"{today}. Prose sections are Ollama-drafted from the transcript and may " f"be imprecise; the Commands/Config/Reference sections are extracted " f"verbatim. Review and correct, then remove this banner." ) lines.append(banner) lines.append("") # User block lines.append(whoami_block()) lines.append("") # Prose sections (Ollama) -- in save.md order lines.append(_fmt_prose_section("Session Summary", prose, placeholder)) lines.append(_fmt_prose_section("Key Decisions", prose, placeholder if not ollama_ok else "- none recorded")) lines.append(_fmt_prose_section("Problems Encountered", prose, placeholder if not ollama_ok else "- none recorded")) # Configuration Changes (verbatim) lines.append("## Configuration Changes") lines.append("") lines.append("_Machine-extracted verbatim from the transcript (file targets of Write/Edit/NotebookEdit)._") lines.append("") if config_changes: for fp, verb in config_changes: lines.append(f"- [{verb}] `{fp}`") else: lines.append("- none detected") lines.append("") # Credentials & Secrets lines.append("## Credentials & Secrets") lines.append("") lines.append("_Machine-extracted; review carefully -- secrets are not auto-harvested from transcripts._") lines.append("") lines.append("- none detected (verify against the Commands & Outputs section)") lines.append("") # Infrastructure & Servers (verbatim regex) lines.append("## Infrastructure & Servers") lines.append("") lines.append("_Machine-extracted verbatim (IP / hostname regex hits across the whole transcript)._") lines.append("") if infra["ips"] or infra["hosts"]: if infra["ips"]: lines.append("- **IPs:** " + ", ".join(f"`{x}`" for x in infra["ips"][:40])) if infra["hosts"]: lines.append("- **Hosts:** " + ", ".join(f"`{x}`" for x in infra["hosts"][:40])) else: lines.append("- none detected (verify)") lines.append("") # Commands & Outputs (verbatim) lines.append("## Commands & Outputs") lines.append("") lines.append("_Machine-extracted verbatim: mutating Bash/PowerShell commands with truncated output._") lines.append("") if commands: for cmd, result in commands: lines.append("```") lines.append(cmd) lines.append("```") if result: lines.append(f"Output: {result}") lines.append("") else: lines.append("- none detected") lines.append("") # Pending / Incomplete Tasks (Ollama) lines.append(_fmt_prose_section("Pending / Incomplete Tasks", prose, placeholder if not ollama_ok else "- none recorded")) # Reference Information (verbatim) lines.append("## Reference Information") lines.append("") lines.append("_Machine-extracted verbatim from the whole transcript via regex. Treat as leads, not gospel; deduped._") lines.append("") any_ref = False if ref["commits"]: any_ref = True lines.append("- **Commit SHAs:** " + ", ".join(f"`{x}`" for x in ref["commits"][:40])) if ref["urls"]: any_ref = True lines.append("- **URLs:** " + ", ".join(ref["urls"][:40])) if ref["ips"]: any_ref = True lines.append("- **IPs:** " + ", ".join(f"`{x}`" for x in ref["ips"][:40])) if ref["tickets"]: any_ref = True lines.append("- **Ticket numbers:** " + ", ".join(ref["tickets"][:40])) if ref["coord_message_ids"]: any_ref = True lines.append("- **Coord message ids:** " + ", ".join(f"`{x}`" for x in ref["coord_message_ids"][:40])) if not any_ref: lines.append("- none detected") lines.append("") markdown = "\n".join(lines).rstrip() + "\n" meta = { "uuid": parsed.uuid, "path_would_be": str(out_path), "substantive": verdict["substantive"], "saved": verdict["saved"], "scope": scope, "title": title, "first_ts": parsed.first_ts, "last_ts": parsed.last_ts, "mtime": parsed.mtime, "human_prompt_count": verdict["human_prompt_count"], "mutating_actions": verdict["mutating_actions"], "date": date, "ollama_ok": ollama_ok, } return markdown, meta # --------------------------------------------------------------------------- # # CLI # --------------------------------------------------------------------------- # def _metadata_only(parsed: ParsedTranscript) -> dict: """Cheap metadata JSON without invoking Ollama or assembling markdown.""" verdict = classify(parsed) scope = verdict["scope"] title = verdict["title"] out_path = compute_output_path(parsed, scope, title) return { "uuid": parsed.uuid, "path_would_be": str(out_path), "substantive": verdict["substantive"], "saved": verdict["saved"], "scope": scope, "title": title, "first_ts": parsed.first_ts, "last_ts": parsed.last_ts, "mtime": parsed.mtime, "human_prompt_count": verdict["human_prompt_count"], "mutating_actions": verdict["mutating_actions"], "date": _first_ts_date(parsed), } def main(argv: list[str] | None = None) -> int: # On Windows the console defaults to cp1252; transcripts (and Ollama prose) # routinely contain characters outside that codepage. Force UTF-8 stdout so # --print / --json never crash on an un-encodable glyph. try: sys.stdout.reconfigure(encoding="utf-8", errors="replace") except (AttributeError, ValueError): pass parser = argparse.ArgumentParser( description="Reconstruct a ClaudeTools session log from a Claude Code transcript." ) sel = parser.add_mutually_exclusive_group(required=True) sel.add_argument("--uuid", help="transcript uuid (filename without .jsonl)") sel.add_argument("--latest", action="store_true", help="newest transcript by mtime") sel.add_argument("--path", help="explicit path to a transcript .jsonl") out = parser.add_mutually_exclusive_group() out.add_argument("--print", dest="do_print", action="store_true", help="write markdown to stdout (default)") out.add_argument("--auto", action="store_true", help="write the log to the computed path; print one-line JSON") out.add_argument("--json", dest="do_json", action="store_true", help="print metadata JSON only; write nothing") args = parser.parse_args(argv) try: path = resolve(uuid=args.uuid, latest=args.latest, path=args.path) except (FileNotFoundError, ValueError) as e: print(f"[ERROR] {e}", file=sys.stderr) return 2 parsed = parse_transcript(path) if args.do_json: print(json.dumps(_metadata_only(parsed), ensure_ascii=False)) return 0 markdown, meta = build_log(parsed) if args.auto: out_path = Path(meta["path_would_be"]) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(markdown, encoding="utf-8") print( json.dumps( { "written": str(out_path), "scope": meta["scope"], "uuid": meta["uuid"], "date": meta["date"], }, ensure_ascii=False, ) ) return 0 # default / --print sys.stdout.write(markdown) return 0 if __name__ == "__main__": raise SystemExit(main())