Files
claudetools/.claude/scripts/recover_session.py
Mike Swanson 59397e8de3 fix(recovery): never write recovered logs into a git submodule
compute_output_path now parses .gitmodules and, for a project scope whose
dir is a submodule (guru-rmm, guru-connect, youtube-sync-docker), falls
back to the MAIN repo root session-logs/ per convention. Non-submodule
projects (gururmm-agent, dataforth-dos) unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 19:15:11 -07:00

1260 lines
45 KiB
Python

#!/usr/bin/env python3
"""recover_session.py -- reconstruct a ClaudeTools session log from a Claude Code transcript.
Claude Code writes every session live to a transcript JSONL under
``~/.claude/projects/<slug>/<uuid>.jsonl`` (slug = the repo root path with ``/``,
``\\`` and ``:`` each replaced by ``-``). When a session crashes or is closed
before ``/save`` runs, the work is still fully recorded in that transcript. This
module distills a transcript back into a normal session log that follows the
``.claude/commands/save.md`` format.
Accuracy split (deliberate):
- Ollama drafts ONLY the prose sections (Session Summary, Key Decisions,
Problems Encountered, Pending / Incomplete Tasks). It never sees -- and never
emits -- commands, IPs, credentials, file paths, commit SHAs, or ticket IDs.
- Python extracts the high-value, accuracy-critical evidence verbatim
(Configuration Changes, Commands & Outputs, Reference Information,
Infrastructure & Servers, Credentials & Secrets).
If Ollama is unreachable the log is still produced -- the prose sections carry a
placeholder note and the verbatim evidence appendix (the important part) is
intact.
CLI:
recover_session.py --uuid <uuid> [--print | --auto | --json]
recover_session.py --latest [--print | --auto | --json]
recover_session.py --path <file> [--print | --auto | --json]
Importable API (the detector uses these):
iter_events(path) -> yields raw decoded JSON objects, in file order
parse_transcript(path) -> ParsedTranscript
classify(parsed) -> dict with substantive/saved/scope/... verdict
build_log(parsed, today=None) -> (markdown_str, meta_dict)
resolve(uuid=None, latest=False, path=None) -> Path
stdlib only; targets Python 3.11+.
"""
from __future__ import annotations
import argparse
import json
import re
import shutil
import subprocess
import sys
import urllib.error
import urllib.request
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
# --------------------------------------------------------------------------- #
# Constants
# --------------------------------------------------------------------------- #
# Tools that, when used, mark a session as "substantive" (it mutated something).
_MUTATING_TOOLS = {"Write", "Edit", "NotebookEdit"}
# Shell commands (Bash / PowerShell) that count as mutating actions. Matched
# case-insensitively as a search (not anchored) against the command string.
_MUTATING_CMD_PATTERNS = [
r"git\s+(commit|push|add)\b",
r"\bssh\b",
r"\bschtasks\b",
r"\bNew-Item\b",
r"\bSet-Content\b",
r"\bRemove-Item\b",
r"\bOut-File\b",
r"curl\b.*-X\s*(POST|PUT|DELETE|PATCH)",
r"/api/",
r"vault\.sh\b",
r"Invoke-RestMethod\b.*-Method\s*(Post|Put|Delete)",
]
_MUTATING_CMD_RE = re.compile("|".join(_MUTATING_CMD_PATTERNS), re.IGNORECASE)
# Skills whose use implies real, mutating work was performed.
_MUTATING_SKILLS = {
"syncro",
"rmm",
"remediation-tool",
"mailbox",
"forum-post",
"syncro-emergency-billing",
}
# Skills / file markers that indicate the session WAS already saved.
_SAVE_SKILLS = {"save", "scc", "checkpoint"}
_SESSION_LOG_MARKERS = ("session-logs/", "session-logs\\")
# Tool-result truncation budget.
_RESULT_TRUNC = 300
# Ollama digest budget.
_DIGEST_CAP = 16000
# Commit footer (matches the repo's standard).
_COMMIT_FOOTER = "Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>"
# --------------------------------------------------------------------------- #
# Repo / path resolution
# --------------------------------------------------------------------------- #
def repo_root() -> Path:
"""Return the ClaudeTools repo root.
Prefer ``claudetools_root`` from ``.claude/identity.json`` (portable across
machines); fall back to two levels up from this script
(``.../.claude/scripts/`` -> repo root).
"""
here = Path(__file__).resolve()
fallback = here.parents[2] # .../.claude/scripts/recover_session.py -> repo root
id_path = fallback / ".claude" / "identity.json"
try:
data = json.loads(id_path.read_text(encoding="utf-8"))
root = data.get("claudetools_root")
if root:
p = Path(root)
if p.exists():
return p
except (OSError, ValueError):
pass
return fallback
def _identity() -> dict:
try:
return json.loads(
(repo_root() / ".claude" / "identity.json").read_text(encoding="utf-8")
)
except (OSError, ValueError):
return {}
# Cache for parsed .gitmodules paths (keyed by repo root path string).
_SUBMODULE_PATHS_CACHE: dict[str, frozenset[str]] = {}
# Matches `path = projects/foo` lines in .gitmodules (leading whitespace, any
# spacing around `=`). The captured value is the repo-relative submodule path.
_GITMODULES_PATH_RE = re.compile(r"^\s*path\s*=\s*(.+?)\s*$", re.MULTILINE)
def _submodule_paths() -> frozenset[str]:
"""Return the set of repo-relative submodule paths from ``.gitmodules``.
Parses the ``path = ...`` lines via a simple line scan (no git calls).
Paths are normalized to forward slashes with no trailing slash so they can
be compared against forward-slashed, repo-relative work paths. Robust to a
missing ``.gitmodules`` (returns an empty set -> no submodules). Cached per
repo root for the life of the process.
"""
root = repo_root()
key = str(root)
cached = _SUBMODULE_PATHS_CACHE.get(key)
if cached is not None:
return cached
paths: set[str] = set()
gitmodules = root / ".gitmodules"
try:
text = gitmodules.read_text(encoding="utf-8")
except OSError:
text = ""
for m in _GITMODULES_PATH_RE.finditer(text):
rel = m.group(1).strip().replace("\\", "/").rstrip("/")
if rel:
paths.add(rel)
result = frozenset(paths)
_SUBMODULE_PATHS_CACHE[key] = result
return result
def _is_inside_submodule(target_dir: Path) -> bool:
"""True if ``target_dir`` is at or under any submodule path.
Comparison is done on repo-relative, forward-slashed path components so a
submodule ``projects/msp-tools/guru-rmm`` matches that directory and
anything beneath it (e.g. its ``session-logs/``), but does NOT match a
sibling like ``projects/msp-tools/guru-rmm-extra``.
"""
subs = _submodule_paths()
if not subs:
return False
root = repo_root()
try:
rel = target_dir.resolve().relative_to(root.resolve())
except (OSError, ValueError):
# target is not under the repo root (or cannot resolve) -> not a submodule.
return False
rel_parts = rel.parts
for sub in subs:
sub_parts = tuple(p for p in sub.split("/") if p)
if not sub_parts:
continue
if rel_parts[: len(sub_parts)] == sub_parts:
return True
return False
def transcript_base_dir() -> Path:
"""Compute ``~/.claude/projects/<slug>`` from identity's claudetools_root."""
root = _identity().get("claudetools_root") or str(repo_root())
slug = re.sub(r"[/\\:]", "-", root)
return Path.home() / ".claude" / "projects" / slug
def resolve(uuid: str | None = None, latest: bool = False, path: str | None = None) -> Path:
"""Resolve the transcript file path from one of the three selectors."""
if path:
p = Path(path)
if not p.exists():
raise FileNotFoundError(f"transcript not found: {p}")
return p
base = transcript_base_dir()
if latest:
candidates = sorted(
base.glob("*.jsonl"), key=lambda f: f.stat().st_mtime, reverse=True
)
if not candidates:
raise FileNotFoundError(f"no transcripts in {base}")
return candidates[0]
if uuid:
p = base / f"{uuid}.jsonl"
if not p.exists():
raise FileNotFoundError(f"transcript not found: {p}")
return p
raise ValueError("one of uuid / latest / path is required")
# --------------------------------------------------------------------------- #
# Parsing
# --------------------------------------------------------------------------- #
_SYSTEM_REMINDER_RE = re.compile(
r"<system-reminder>.*?</system-reminder>", re.IGNORECASE | re.DOTALL
)
# Long base64-ish blobs (data URLs and bare runs). Replace with a placeholder.
_DATAURL_RE = re.compile(r"data:[^;]+;base64,[A-Za-z0-9+/=\s]+", re.IGNORECASE)
_BASE64_RUN_RE = re.compile(r"[A-Za-z0-9+/]{200,}={0,2}")
def _strip_noise(text: str) -> str:
if not text:
return ""
text = _SYSTEM_REMINDER_RE.sub("", text)
text = _DATAURL_RE.sub("[base64 blob omitted]", text)
text = _BASE64_RUN_RE.sub("[base64 blob omitted]", text)
return text
def _truncate(text: str, limit: int = _RESULT_TRUNC) -> str:
text = (text or "").strip()
if len(text) <= limit:
return text
return text[:limit].rstrip() + " ... [truncated]"
def _flatten_content(content) -> str:
"""Flatten a message-content value (str, or list of blocks) to plain text."""
if content is None:
return ""
if isinstance(content, str):
return content
parts: list[str] = []
if isinstance(content, list):
for blk in content:
if isinstance(blk, str):
parts.append(blk)
elif isinstance(blk, dict):
if blk.get("type") == "text" and isinstance(blk.get("text"), str):
parts.append(blk["text"])
elif "text" in blk and isinstance(blk["text"], str):
parts.append(blk["text"])
return "\n".join(parts)
def _concise_args(name: str, inp: dict) -> str:
"""One-line, human-readable summary of a tool_use input."""
if not isinstance(inp, dict):
return ""
if name in ("Bash", "PowerShell"):
cmd = inp.get("command", "")
return _truncate(cmd.replace("\n", " "), 200)
if name in ("Write", "Edit", "NotebookEdit"):
return inp.get("file_path") or inp.get("notebook_path") or ""
if name == "Read":
return inp.get("file_path", "")
if name in ("Glob", "Grep"):
bits = []
if inp.get("pattern"):
bits.append(f"pattern={inp['pattern']}")
if inp.get("path"):
bits.append(f"path={inp['path']}")
if inp.get("glob"):
bits.append(f"glob={inp['glob']}")
return " ".join(bits)
if name == "Skill":
skill = inp.get("skill", "")
args = _truncate(str(inp.get("args", "")).replace("\n", " "), 160)
return f"{skill}: {args}" if args else skill
if name in ("WebFetch", "WebSearch"):
return _truncate(str(inp.get("url") or inp.get("query") or ""), 160)
# generic
return _truncate(json.dumps(inp, ensure_ascii=False), 160)
@dataclass
class Event:
kind: str # "human" | "assistant_text" | "tool_use" | "tool_result"
text: str = ""
name: str = "" # tool name (tool_use)
args: str = "" # concise args (tool_use)
file_path: str = "" # for Write/Edit/NotebookEdit
skill: str = "" # for Skill tool_use
command: str = "" # raw command for Bash/PowerShell tool_use
timestamp: str = ""
@dataclass
class ParsedTranscript:
path: Path
uuid: str
events: list[Event] = field(default_factory=list)
first_ts: str = ""
last_ts: str = ""
cwd: str = ""
git_branch: str = ""
ai_title: str = ""
raw_text: str = "" # whole-transcript concatenation for regex sweeps
mtime: float = 0.0
def iter_events(path: str | Path):
"""Yield raw decoded JSON objects from a transcript, in file order.
Malformed lines are skipped silently (transcripts can have partial last
lines after a crash -- exactly the case we exist to recover from).
"""
p = Path(path)
with p.open("r", encoding="utf-8", errors="replace") as fh:
for line in fh:
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except ValueError:
continue
def parse_transcript(path: str | Path) -> ParsedTranscript:
"""Parse a transcript into an ordered Event list plus metadata."""
p = Path(path)
parsed = ParsedTranscript(path=p, uuid=p.stem)
try:
parsed.mtime = p.stat().st_mtime
except OSError:
parsed.mtime = 0.0
raw_chunks: list[str] = []
for obj in iter_events(p):
if not isinstance(obj, dict):
continue
t = obj.get("type")
ts = obj.get("timestamp")
if isinstance(ts, str):
if not parsed.first_ts:
parsed.first_ts = ts
parsed.last_ts = ts
if obj.get("cwd") and not parsed.cwd:
parsed.cwd = obj["cwd"]
if obj.get("gitBranch") and not parsed.git_branch:
parsed.git_branch = obj["gitBranch"]
# ai-title metadata -- usable title hint
if t == "ai-title":
title = obj.get("aiTitle")
if isinstance(title, str) and title.strip():
parsed.ai_title = title.strip()
continue
# Skip subagent / sidechain lines for the main timeline.
if obj.get("isSidechain"):
continue
if t == "assistant":
msg = obj.get("message", {}) or {}
content = msg.get("content", [])
if not isinstance(content, list):
continue
for blk in content:
if not isinstance(blk, dict):
continue
btype = blk.get("type")
if btype == "text":
txt = _strip_noise(blk.get("text", "")).strip()
if txt:
parsed.events.append(
Event(kind="assistant_text", text=txt, timestamp=ts or "")
)
raw_chunks.append(txt)
elif btype == "tool_use":
name = blk.get("name", "")
inp = blk.get("input", {}) or {}
ev = Event(
kind="tool_use",
name=name,
args=_concise_args(name, inp),
timestamp=ts or "",
)
if name in _MUTATING_TOOLS:
ev.file_path = inp.get("file_path") or inp.get(
"notebook_path", ""
)
if name == "Skill":
ev.skill = inp.get("skill", "")
if name in ("Bash", "PowerShell"):
ev.command = inp.get("command", "") or ""
parsed.events.append(ev)
raw_chunks.append(f"{name} {ev.args}")
elif t == "user":
msg = obj.get("message", {}) or {}
content = msg.get("content")
if isinstance(content, str):
# A real human-typed prompt.
txt = _strip_noise(content).strip()
if txt:
parsed.events.append(
Event(kind="human", text=txt, timestamp=ts or "")
)
raw_chunks.append(txt)
elif isinstance(content, list):
# tool_result blocks (tool output -- NOT a human prompt).
for blk in content:
if not isinstance(blk, dict):
continue
if blk.get("type") == "tool_result":
body = _flatten_content(blk.get("content"))
body = _strip_noise(body)
if body.strip():
parsed.events.append(
Event(
kind="tool_result",
text=_truncate(body),
timestamp=ts or "",
)
)
raw_chunks.append(body[:1000])
# other metadata types (mode, permission-mode, system, attachment,
# file-history-snapshot, queue-operation, last-prompt) -> skipped.
parsed.raw_text = "\n".join(raw_chunks)
return parsed
# --------------------------------------------------------------------------- #
# Classification
# --------------------------------------------------------------------------- #
def _is_mutating_command(cmd: str) -> bool:
return bool(cmd) and bool(_MUTATING_CMD_RE.search(cmd))
# Project dirs that are NOT real session-log homes for client/project work:
# transcript/conversation archives, and generic catch-all buckets that should
# never become a session scope. A transcript that would otherwise classify to
# one of these must fall through to a real project (or to general).
_ARCHIVE_DIR_RE = re.compile(r"-conversation-logs$", re.IGNORECASE)
_GENERIC_PROJECT_NAMES = {"internal", "scripts", "toolkit", "utilities"}
def _is_valid_project_slug(name: str) -> bool:
"""A valid project scope is a real work dir that could sensibly own a
``session-logs/`` subdir -- not a transcript archive or a generic bucket.
"""
if _ARCHIVE_DIR_RE.search(name):
return False
if name.lower() in _GENERIC_PROJECT_NAMES:
return False
return True
def _known_slugs() -> tuple[set[str], set[str]]:
"""Return (client_slugs, project_slugs) discovered from the repo layout.
Project slugs exclude transcript archives (``*-conversation-logs``) and the
generic catch-all names (``internal``, ``scripts``, ``toolkit``,
``utilities``) -- see ``_is_valid_project_slug``.
"""
root = repo_root()
clients: set[str] = set()
projects: set[str] = set()
clients_dir = root / "clients"
if clients_dir.is_dir():
for d in clients_dir.iterdir():
if d.is_dir() and not d.name.startswith("_"):
clients.add(d.name)
wiki_clients = root / "wiki" / "clients"
if wiki_clients.is_dir():
for f in wiki_clients.glob("*.md"):
clients.add(f.stem)
proj_dir = root / "projects"
if proj_dir.is_dir():
for d in proj_dir.iterdir():
if d.is_dir() and _is_valid_project_slug(d.name):
projects.add(d.name)
msp = proj_dir / "msp-tools"
if msp.is_dir():
for d in msp.iterdir():
if d.is_dir() and _is_valid_project_slug(d.name):
projects.add(d.name)
return clients, projects
def _slug_to_words(slug: str) -> list[str]:
"""Split a slug into matchable word tokens, dropping trivial ones."""
words = [w for w in re.split(r"[-_]+", slug.lower()) if len(w) >= 4]
return words
def classify(parsed: ParsedTranscript) -> dict:
"""Classify a parsed transcript.
Returns a dict with: substantive (bool), saved (bool), scope (dict),
title (str), human_prompt_count (int), mutating_actions (list[str]).
"""
substantive = False
saved = False
mutating_actions: list[str] = []
human_prompt_count = 0
for ev in parsed.events:
if ev.kind == "human":
human_prompt_count += 1
continue
if ev.kind != "tool_use":
continue
name = ev.name
# saved? -- save/scc/checkpoint skill, or a write into session-logs/
if name == "Skill" and ev.skill in _SAVE_SKILLS:
saved = True
if name in _MUTATING_TOOLS and ev.file_path:
fp = ev.file_path.replace("\\", "/")
if any(m.replace("\\", "/") in fp or m in ev.file_path for m in _SESSION_LOG_MARKERS):
saved = True
# substantive?
if name in _MUTATING_TOOLS:
substantive = True
label = f"{name} {ev.file_path}".strip()
mutating_actions.append(label)
elif name in ("Bash", "PowerShell"):
if _is_mutating_command(ev.command):
substantive = True
mutating_actions.append(f"{name}: {_truncate(ev.command.replace(chr(10),' '),120)}")
elif name == "Skill" and ev.skill in _MUTATING_SKILLS:
substantive = True
mutating_actions.append(f"Skill: {ev.skill}")
scope = _classify_scope(parsed)
title = _derive_title(parsed)
return {
"substantive": substantive,
"saved": saved,
"scope": scope,
"title": title,
"human_prompt_count": human_prompt_count,
"mutating_actions": mutating_actions,
}
def _classify_scope(parsed: ParsedTranscript) -> dict:
"""Decide client / project / general scope. Conservative: ambiguous -> general."""
clients, projects = _known_slugs()
haystack_parts = [parsed.raw_text or "", parsed.cwd or "", parsed.git_branch or ""]
haystack = "\n".join(haystack_parts).lower()
def score(slug: str) -> int:
words = _slug_to_words(slug)
if not words:
# very short slug -- only count whole-slug hits to avoid noise
return haystack.count(slug.lower())
# Require the full slug phrase OR all words present; score by frequency
# of the longest token to keep it bounded and meaningful.
total = 0
if slug.lower() in haystack:
total += haystack.count(slug.lower()) * 3
for w in words:
total += haystack.count(w)
return total
client_scores = {s: score(s) for s in clients}
project_scores = {s: score(s) for s in projects}
best_client = max(client_scores.items(), key=lambda kv: kv[1], default=(None, 0))
best_project = max(project_scores.items(), key=lambda kv: kv[1], default=(None, 0))
# cwd-based project hint (strong signal): cwd inside projects/<x>/...
cwd_norm = (parsed.cwd or "").replace("\\", "/").lower()
cwd_project = None
m = re.search(r"/projects/(?:msp-tools/)?([a-z0-9._-]+)", cwd_norm)
if m and m.group(1) in projects:
cwd_project = m.group(1)
cwd_client = None
m = re.search(r"/clients/([a-z0-9._-]+)", cwd_norm)
if m and m.group(1) in clients:
cwd_client = m.group(1)
# Minimum confidence thresholds -- be conservative.
CLIENT_MIN = 4
PROJECT_MIN = 4
DOMINANCE = 2 # winner must beat runner-up by this factor to count
# cwd hints win if present and unambiguous.
if cwd_client and not cwd_project:
return {"type": "client", "slug": cwd_client}
if cwd_project and not cwd_client:
return {"type": "project", "slug": cwd_project}
c_slug, c_score = best_client
p_slug, p_score = best_project
# Determine the dominant category.
client_ok = c_slug and c_score >= CLIENT_MIN
project_ok = p_slug and p_score >= PROJECT_MIN
if client_ok and (not project_ok or c_score >= p_score * DOMINANCE):
return {"type": "client", "slug": c_slug}
if project_ok and (not client_ok or p_score >= c_score * DOMINANCE):
return {"type": "project", "slug": p_slug}
return {"type": "general"}
def _derive_title(parsed: ParsedTranscript) -> str:
if parsed.ai_title:
return parsed.ai_title
# first human prompt -> first sentence / first 70 chars
for ev in parsed.events:
if ev.kind == "human" and ev.text.strip():
line = ev.text.strip().splitlines()[0]
line = re.sub(r"\s+", " ", line).strip()
return _truncate(line, 70)
return "recovered session"
def _topic_slug(title: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", (title or "").lower()).strip("-")
slug = re.sub(r"-{2,}", "-", slug)
return (slug or "session")[:48].strip("-") or "session"
# --------------------------------------------------------------------------- #
# Evidence extraction (verbatim -- Python only)
# --------------------------------------------------------------------------- #
_RE_IP = re.compile(r"\b(?:\d{1,3}\.){3}\d{1,3}\b")
_RE_URL = re.compile(r"https?://[^\s\"'<>)\]]+")
# A dotted run with 5+ numeric components is a version string, never an IP
# (e.g. "1.2.3.4.5"). Used to reject dotted-quad matches that are a sub-span of
# a longer version.
_RE_DOTTED_VERSION = re.compile(r"\d+(?:\.\d+){4,}")
# Version context that immediately precedes a dotted-quad marks it as a version,
# not an IP -- e.g. "version 1.9.158.0", "build 6.5.60.172", "v1.2.3.4". The
# trailing optional separators ("v"/space/colon/equals/parens) sit between the
# keyword and the number. A trailing bare "v"/"V" alone also counts.
_RE_VERSION_CONTEXT = re.compile(
r"(?:\b(?:version|ver|build|rev|revision|release|agent|firmware|fw|"
r"v)\b\s*[:=]?\s*v?|[vV])$",
re.IGNORECASE,
)
def _iter_real_ips(text: str):
"""Yield dotted-quads from ``text`` that are plausibly real IPv4 addresses.
Rejects version-like strings via a deliberately small rule set:
- any octet outside 0-255 (e.g. "1.9.158.300" is not an IP),
- a match that is a sub-span of a longer dotted version with 5+ components
(e.g. the "1.2.3.4" inside "1.2.3.4.5"),
- a match preceded by a version marker -- a bare ``v``/``V`` (e.g.
"v1.2.3.4") or a version keyword like "version"/"build" immediately
before it (e.g. "version 1.9.158.0", "build 6.5.60.172").
Order-preserving; de-duplication is the caller's job.
"""
if not text:
return
# Spans covered by a 5+-component dotted version -> not IPs.
version_spans = [m.span() for m in _RE_DOTTED_VERSION.finditer(text)]
for m in _RE_IP.finditer(text):
octets = m.group(0).split(".")
if any(not (0 <= int(o) <= 255) for o in octets):
continue
start = m.start()
# Reject if this match sits inside a longer dotted version.
if any(vs <= start and m.end() <= ve for vs, ve in version_spans):
continue
# Reject if immediately preceded by version context.
if _RE_VERSION_CONTEXT.search(text[:start]):
continue
yield m.group(0)
_RE_TICKET = re.compile(r"#\d{4,}")
_RE_COMMIT = re.compile(r"(?:\bcommit\b[^0-9a-f]{0,12})([0-9a-f]{7,40})\b", re.IGNORECASE)
_RE_UUID = re.compile(
r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b", re.IGNORECASE
)
_RE_HOST = re.compile(r"\b(?:[a-z0-9](?:[a-z0-9-]{0,61}[a-z0-9])?\.)+[a-z]{2,}\b", re.IGNORECASE)
def _dedup(seq):
seen = set()
out = []
for x in seq:
if x not in seen:
seen.add(x)
out.append(x)
return out
def extract_config_changes(parsed: ParsedTranscript) -> list[tuple[str, str]]:
"""Return [(path, 'created'|'modified'), ...] deduped (first verb wins)."""
seen: dict[str, str] = {}
for ev in parsed.events:
if ev.kind == "tool_use" and ev.name in _MUTATING_TOOLS and ev.file_path:
verb = "created" if ev.name == "Write" else "modified"
if ev.file_path not in seen:
seen[ev.file_path] = verb
return list(seen.items())
def extract_commands(parsed: ParsedTranscript) -> list[tuple[str, str]]:
"""Return [(command, truncated_result), ...] for mutating shell calls.
The result is the next tool_result event following the command in timeline
order (best-effort association).
"""
out: list[tuple[str, str]] = []
events = parsed.events
for i, ev in enumerate(events):
if ev.kind == "tool_use" and ev.name in ("Bash", "PowerShell") and _is_mutating_command(ev.command):
result = ""
for j in range(i + 1, min(i + 4, len(events))):
if events[j].kind == "tool_result":
result = events[j].text
break
out.append((ev.command.strip(), result))
return out
def extract_reference(parsed: ParsedTranscript) -> dict:
text = parsed.raw_text or ""
ips = _dedup(_iter_real_ips(text))
urls = _dedup(_RE_URL.findall(text))
tickets = _dedup(_RE_TICKET.findall(text))
commits = _dedup(m for m in _RE_COMMIT.findall(text))
uuids = _dedup(_RE_UUID.findall(text))
# coord message ids = uuids appearing near the word "message"
msg_ids = _dedup(
m.group(1)
for m in re.finditer(
r"message[^0-9a-f]{0,24}([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})",
text,
re.IGNORECASE,
)
)
return {
"ips": ips,
"urls": urls,
"tickets": tickets,
"commits": commits,
"uuids": uuids,
"coord_message_ids": msg_ids,
}
def extract_infra(parsed: ParsedTranscript, ref: dict) -> dict:
text = parsed.raw_text or ""
ips = ref.get("ips", [])
# Hostnames: dotted names that are not pure IPs and look infra-ish.
hosts = []
for h in _RE_HOST.findall(text):
if _RE_IP.fullmatch(h):
continue
hosts.append(h.lower())
hosts = _dedup(hosts)
return {"ips": ips, "hosts": hosts}
# --------------------------------------------------------------------------- #
# Ollama prose
# --------------------------------------------------------------------------- #
def _ollama_config() -> tuple[str, str]:
d = _identity()
o = d.get("ollama", {}) if isinstance(d.get("ollama"), dict) else {}
endpoint = o.get("endpoint") or o.get("fallback") or "http://localhost:11434"
model = o.get("prose_model") or "qwen3:14b"
return endpoint, model
_THINK_RE = re.compile(r"<think>.*?</think>", re.IGNORECASE | re.DOTALL)
def _build_digest(parsed: ParsedTranscript) -> str:
"""A bounded narrative digest for Ollama: human prompts in full, assistant
text, and tool-call one-liners. Big tool_result bodies are dropped here.
"""
lines: list[str] = []
for ev in parsed.events:
if ev.kind == "human":
lines.append(f"USER: {ev.text}")
elif ev.kind == "assistant_text":
lines.append(f"ASSISTANT: {ev.text}")
elif ev.kind == "tool_use":
lines.append(f"[tool: {ev.name} {ev.args}]")
# tool_result intentionally omitted from the digest
digest = "\n".join(lines)
if len(digest) > _DIGEST_CAP:
# keep the head and tail -- start framing + final state matter most
head = digest[: _DIGEST_CAP * 2 // 3]
tail = digest[-(_DIGEST_CAP // 3):]
digest = head + "\n...[middle elided for length]...\n" + tail
return digest
_PROSE_PROMPT = """You are writing the prose sections of an engineering session log, reconstructed from a work-session transcript. Write in plain past tense, technical, concise, NO emojis, NO filler.
Output EXACTLY these four markdown sections, with these exact headers, and nothing else (no preamble, no closing remarks):
## Session Summary
(2-4 paragraphs: what was accomplished, in what order, and why.)
## Key Decisions
(bullet list of non-obvious decisions and their rationale; "- none" if none.)
## Problems Encountered
(bullet list of problems hit and how each was resolved; "- none" if none.)
## Pending / Incomplete Tasks
(bullet list of what is left, blockers, next steps; "- none" if none.)
CRITICAL: Do NOT invent or restate specific commands, IP addresses, credentials, file paths, commit hashes, or ticket numbers -- those are recorded separately and verbatim. Describe the work at a conceptual level only.
TRANSCRIPT DIGEST:
"""
def ollama_prose(parsed: ParsedTranscript, timeout: int = 120) -> dict | None:
"""Ask Ollama for the four prose sections. Returns a dict of header->body,
or None if Ollama is unreachable / errored.
"""
endpoint, model = _ollama_config()
digest = _build_digest(parsed)
prompt = _PROSE_PROMPT + digest
body = json.dumps(
{
"model": model,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"think": False,
}
).encode("utf-8")
req = urllib.request.Request(
endpoint.rstrip("/") + "/api/chat",
data=body,
headers={"Content-Type": "application/json"},
)
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except (urllib.error.URLError, OSError, ValueError, TimeoutError):
return None
content = ""
try:
content = payload["message"]["content"]
except (KeyError, TypeError):
return None
content = _THINK_RE.sub("", content or "").strip()
if not content:
return None
return _split_prose_sections(content)
_PROSE_HEADERS = [
"Session Summary",
"Key Decisions",
"Problems Encountered",
"Pending / Incomplete Tasks",
]
def _split_prose_sections(text: str) -> dict:
"""Parse the four ## sections out of Ollama's output; tolerate missing ones."""
out: dict[str, str] = {}
# Build an alternation matching any of our known headers (allow minor
# whitespace variance).
header_alt = "|".join(re.escape(h) for h in _PROSE_HEADERS)
pattern = re.compile(rf"^##\s*({header_alt})\s*$", re.IGNORECASE | re.MULTILINE)
matches = list(pattern.finditer(text))
for idx, m in enumerate(matches):
header = m.group(1)
# canonicalize header capitalization to our known form
canon = next((h for h in _PROSE_HEADERS if h.lower() == header.lower()), header)
start = m.end()
end = matches[idx + 1].start() if idx + 1 < len(matches) else len(text)
out[canon] = text[start:end].strip()
return out
# --------------------------------------------------------------------------- #
# whoami block
# --------------------------------------------------------------------------- #
def whoami_block() -> str:
"""Run whoami-block.sh and return its stdout. Falls back to a placeholder."""
script = repo_root() / ".claude" / "scripts" / "whoami-block.sh"
bash = shutil.which("bash")
if script.exists() and bash:
try:
res = subprocess.run(
[bash, str(script)],
capture_output=True,
text=True,
timeout=30,
cwd=str(repo_root()),
)
if res.returncode == 0 and res.stdout.strip():
return res.stdout.rstrip("\n")
except (OSError, subprocess.SubprocessError):
pass
# Fallback: build a minimal block from identity.json directly (also taken
# when 'bash' is absent from a restricted scheduler PATH).
d = _identity()
full = d.get("full_name") or d.get("user", "unknown")
user = d.get("user", "unknown")
machine = d.get("machine", "unknown")
role = d.get("role", "")
lines = ["## User", f"- **User:** {full} ({user})", f"- **Machine:** {machine}"]
if role:
lines.append(f"- **Role:** {role}")
lines.append("- **[WARNING]** whoami-block.sh unavailable; rendered from identity.json directly.")
return "\n".join(lines)
# --------------------------------------------------------------------------- #
# Path computation
# --------------------------------------------------------------------------- #
def _first_ts_date(parsed: ParsedTranscript) -> str:
ts = parsed.first_ts
if ts:
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
return dt.date().isoformat()
except ValueError:
pass
# fall back to mtime
if parsed.mtime:
return datetime.fromtimestamp(parsed.mtime, tz=timezone.utc).date().isoformat()
return datetime.now().date().isoformat()
def compute_output_path(parsed: ParsedTranscript, scope: dict, title: str) -> Path:
root = repo_root()
date = _first_ts_date(parsed)
topic = _topic_slug(title)
fname = f"{date}-recovered-{topic}.md"
if scope.get("type") == "client":
base = root / "clients" / scope["slug"] / "session-logs"
elif scope.get("type") == "project":
slug = scope["slug"]
# msp-tools sub-projects keep their session logs under the sub-project
proj_dir = root / "projects" / slug
if not proj_dir.exists():
msp_dir = root / "projects" / "msp-tools" / slug
if msp_dir.exists():
proj_dir = msp_dir
# If the project dir is a git submodule (or inside one), its working
# tree must NOT be written to -- repo convention keeps those session
# logs in the MAIN repo root instead, and an unattended write would
# dirty the submodule. Fall back to the main root session-logs dir.
if _is_inside_submodule(proj_dir):
base = root / "session-logs"
else:
base = proj_dir / "session-logs"
else:
base = root / "session-logs"
target = base / fname
if target.exists():
short = parsed.uuid[:8]
target = base / f"{date}-recovered-{topic}-{short}.md"
return target
# --------------------------------------------------------------------------- #
# Markdown assembly
# --------------------------------------------------------------------------- #
def _fmt_prose_section(header: str, prose: dict | None, fallback: str) -> str:
body = ""
if prose:
body = prose.get(header, "").strip()
if not body:
body = fallback
return f"## {header}\n\n{body}\n"
def build_log(parsed: ParsedTranscript, today: str | None = None) -> tuple[str, dict]:
"""Assemble the full reconstructed markdown log. Returns (markdown, meta)."""
today = today or datetime.now().date().isoformat()
verdict = classify(parsed)
scope = verdict["scope"]
title = verdict["title"]
prose = ollama_prose(parsed)
ollama_ok = prose is not None
placeholder = (
"_[INFO] Ollama was unreachable during recovery; this prose section was "
"not drafted. Reconstruct it from the verbatim evidence below, or re-run "
"`/recover` once Ollama is available._"
)
config_changes = extract_config_changes(parsed)
commands = extract_commands(parsed)
ref = extract_reference(parsed)
infra = extract_infra(parsed, ref)
out_path = compute_output_path(parsed, scope, title)
date = _first_ts_date(parsed)
lines: list[str] = []
# Title
lines.append(f"# [RECOVERED] {title}")
lines.append("")
# Banner
banner = (
f"> **[RECOVERED -- UNVERIFIED]** Auto-reconstructed from transcript "
f"{parsed.uuid} ({parsed.first_ts or '?'} .. {parsed.last_ts or '?'}) on "
f"{today}. Prose sections are Ollama-drafted from the transcript and may "
f"be imprecise; the Commands/Config/Reference sections are extracted "
f"verbatim. Review and correct, then remove this banner."
)
lines.append(banner)
lines.append("")
# User block
lines.append(whoami_block())
lines.append("")
# Prose sections (Ollama) -- in save.md order
lines.append(_fmt_prose_section("Session Summary", prose, placeholder))
lines.append(_fmt_prose_section("Key Decisions", prose, placeholder if not ollama_ok else "- none recorded"))
lines.append(_fmt_prose_section("Problems Encountered", prose, placeholder if not ollama_ok else "- none recorded"))
# Configuration Changes (verbatim)
lines.append("## Configuration Changes")
lines.append("")
lines.append("_Machine-extracted verbatim from the transcript (file targets of Write/Edit/NotebookEdit)._")
lines.append("")
if config_changes:
for fp, verb in config_changes:
lines.append(f"- [{verb}] `{fp}`")
else:
lines.append("- none detected")
lines.append("")
# Credentials & Secrets
lines.append("## Credentials & Secrets")
lines.append("")
lines.append("_Machine-extracted; review carefully -- secrets are not auto-harvested from transcripts._")
lines.append("")
lines.append("- none detected (verify against the Commands & Outputs section)")
lines.append("")
# Infrastructure & Servers (verbatim regex)
lines.append("## Infrastructure & Servers")
lines.append("")
lines.append("_Machine-extracted verbatim (IP / hostname regex hits across the whole transcript)._")
lines.append("")
if infra["ips"] or infra["hosts"]:
if infra["ips"]:
lines.append("- **IPs:** " + ", ".join(f"`{x}`" for x in infra["ips"][:40]))
if infra["hosts"]:
lines.append("- **Hosts:** " + ", ".join(f"`{x}`" for x in infra["hosts"][:40]))
else:
lines.append("- none detected (verify)")
lines.append("")
# Commands & Outputs (verbatim)
lines.append("## Commands & Outputs")
lines.append("")
lines.append("_Machine-extracted verbatim: mutating Bash/PowerShell commands with truncated output._")
lines.append("")
if commands:
for cmd, result in commands:
lines.append("```")
lines.append(cmd)
lines.append("```")
if result:
lines.append(f"Output: {result}")
lines.append("")
else:
lines.append("- none detected")
lines.append("")
# Pending / Incomplete Tasks (Ollama)
lines.append(_fmt_prose_section("Pending / Incomplete Tasks", prose, placeholder if not ollama_ok else "- none recorded"))
# Reference Information (verbatim)
lines.append("## Reference Information")
lines.append("")
lines.append("_Machine-extracted verbatim from the whole transcript via regex. Treat as leads, not gospel; deduped._")
lines.append("")
any_ref = False
if ref["commits"]:
any_ref = True
lines.append("- **Commit SHAs:** " + ", ".join(f"`{x}`" for x in ref["commits"][:40]))
if ref["urls"]:
any_ref = True
lines.append("- **URLs:** " + ", ".join(ref["urls"][:40]))
if ref["ips"]:
any_ref = True
lines.append("- **IPs:** " + ", ".join(f"`{x}`" for x in ref["ips"][:40]))
if ref["tickets"]:
any_ref = True
lines.append("- **Ticket numbers:** " + ", ".join(ref["tickets"][:40]))
if ref["coord_message_ids"]:
any_ref = True
lines.append("- **Coord message ids:** " + ", ".join(f"`{x}`" for x in ref["coord_message_ids"][:40]))
if not any_ref:
lines.append("- none detected")
lines.append("")
markdown = "\n".join(lines).rstrip() + "\n"
meta = {
"uuid": parsed.uuid,
"path_would_be": str(out_path),
"substantive": verdict["substantive"],
"saved": verdict["saved"],
"scope": scope,
"title": title,
"first_ts": parsed.first_ts,
"last_ts": parsed.last_ts,
"mtime": parsed.mtime,
"human_prompt_count": verdict["human_prompt_count"],
"mutating_actions": verdict["mutating_actions"],
"date": date,
"ollama_ok": ollama_ok,
}
return markdown, meta
# --------------------------------------------------------------------------- #
# CLI
# --------------------------------------------------------------------------- #
def _metadata_only(parsed: ParsedTranscript) -> dict:
"""Cheap metadata JSON without invoking Ollama or assembling markdown."""
verdict = classify(parsed)
scope = verdict["scope"]
title = verdict["title"]
out_path = compute_output_path(parsed, scope, title)
return {
"uuid": parsed.uuid,
"path_would_be": str(out_path),
"substantive": verdict["substantive"],
"saved": verdict["saved"],
"scope": scope,
"title": title,
"first_ts": parsed.first_ts,
"last_ts": parsed.last_ts,
"mtime": parsed.mtime,
"human_prompt_count": verdict["human_prompt_count"],
"mutating_actions": verdict["mutating_actions"],
"date": _first_ts_date(parsed),
}
def main(argv: list[str] | None = None) -> int:
# On Windows the console defaults to cp1252; transcripts (and Ollama prose)
# routinely contain characters outside that codepage. Force UTF-8 stdout so
# --print / --json never crash on an un-encodable glyph.
try:
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
except (AttributeError, ValueError):
pass
parser = argparse.ArgumentParser(
description="Reconstruct a ClaudeTools session log from a Claude Code transcript."
)
sel = parser.add_mutually_exclusive_group(required=True)
sel.add_argument("--uuid", help="transcript uuid (filename without .jsonl)")
sel.add_argument("--latest", action="store_true", help="newest transcript by mtime")
sel.add_argument("--path", help="explicit path to a transcript .jsonl")
out = parser.add_mutually_exclusive_group()
out.add_argument("--print", dest="do_print", action="store_true", help="write markdown to stdout (default)")
out.add_argument("--auto", action="store_true", help="write the log to the computed path; print one-line JSON")
out.add_argument("--json", dest="do_json", action="store_true", help="print metadata JSON only; write nothing")
args = parser.parse_args(argv)
try:
path = resolve(uuid=args.uuid, latest=args.latest, path=args.path)
except (FileNotFoundError, ValueError) as e:
print(f"[ERROR] {e}", file=sys.stderr)
return 2
parsed = parse_transcript(path)
if args.do_json:
print(json.dumps(_metadata_only(parsed), ensure_ascii=False))
return 0
markdown, meta = build_log(parsed)
if args.auto:
out_path = Path(meta["path_would_be"])
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(markdown, encoding="utf-8")
print(
json.dumps(
{
"written": str(out_path),
"scope": meta["scope"],
"uuid": meta["uuid"],
"date": meta["date"],
},
ensure_ascii=False,
)
)
return 0
# default / --print
sys.stdout.write(markdown)
return 0
if __name__ == "__main__":
raise SystemExit(main())