#!/usr/bin/env python3 """detect_orphaned_sessions.py -- find and auto-recover unsaved Claude Code sessions. A session is "orphaned" when its transcript records substantive (mutating) work but the session was never saved (no /save, /scc, or /checkpoint, and no write into a session-logs/ path). This script scans the per-machine transcript directory, classifies each idle transcript via the recover_session engine, auto-builds a banner-marked recovery log for each orphan, records every processed uuid in a machine-local ledger so it is never re-scanned, commits + pushes the recovered logs, and posts an FYI to #bot-alerts. Modes: (default) full run: build logs, update ledger, commit, push, alert --dry-run scan + print a report table; write/commit/alert nothing --idle-min N minutes of mtime-idle before a transcript is eligible (default 90) --no-commit build + ledger, but skip git commit/push --no-alert build + ledger + commit, but skip the Discord alert The detector NEVER touches sync.sh; it does its own git add/commit/push so it has no surprising side effects. Soft-fails on git/alert errors (work is already saved to disk -- those are best-effort). stdlib only; targets Python 3.11+. """ from __future__ import annotations import argparse import json import shutil import subprocess import sys from datetime import datetime, timezone from pathlib import Path # Import the shared engine (same directory). sys.path.insert(0, str(Path(__file__).resolve().parent)) import recover_session as engine # noqa: E402 LEDGER_REL = Path(".claude") / "state" / "recovered-sessions.json" def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def ledger_path() -> Path: return engine.repo_root() / LEDGER_REL def load_ledger() -> dict: p = ledger_path() if p.exists(): try: return json.loads(p.read_text(encoding="utf-8")) except (OSError, ValueError): return {} return {} def save_ledger(ledger: dict) -> None: p = ledger_path() p.parent.mkdir(parents=True, exist_ok=True) p.write_text(json.dumps(ledger, indent=2, ensure_ascii=False) + "\n", encoding="utf-8") def _scope_str(scope: dict) -> str: t = scope.get("type", "general") if t == "general": return "general" return f"{t}:{scope.get('slug', '?')}" def scan(idle_min: int, ledger: dict) -> tuple[list[dict], list[dict]]: """Scan transcripts. Returns (eligible, recoverable): eligible -- every transcript that is past idle and not already in ledger (each a dict with parsed metadata + verdict fields) recoverable -- the subset that are orphans (substantive and not saved) """ base = engine.transcript_base_dir() now = datetime.now().timestamp() idle_secs = idle_min * 60 eligible: list[dict] = [] recoverable: list[dict] = [] if not base.is_dir(): return eligible, recoverable for jf in sorted(base.glob("*.jsonl")): uuid = jf.stem try: mtime = jf.stat().st_mtime except OSError: continue # Skip recently-active sessions. if (now - mtime) < idle_secs: continue # Skip anything already processed. if uuid in ledger: continue parsed = engine.parse_transcript(jf) verdict = engine.classify(parsed) orphan = bool(verdict["substantive"] and not verdict["saved"]) rec = { "uuid": uuid, "path": jf, "mtime": mtime, "substantive": verdict["substantive"], "saved": verdict["saved"], "orphan": orphan, "scope": verdict["scope"], "title": verdict["title"], "parsed": parsed, } # would-write path (metadata-cheap; no Ollama) rec["would_write"] = str( engine.compute_output_path(parsed, verdict["scope"], verdict["title"]) ) eligible.append(rec) if orphan: recoverable.append(rec) # Process OLDEST-FIRST so a capped run drains the longest-waiting orphans # first. Prefer the transcript's first_ts when available; fall back to mtime. def _age_key(r: dict): ts = (r.get("parsed").first_ts if r.get("parsed") else "") or "" if ts: try: return datetime.fromisoformat(ts.replace("Z", "+00:00")).timestamp() except ValueError: pass return r.get("mtime", 0.0) eligible.sort(key=_age_key) recoverable.sort(key=_age_key) return eligible, recoverable def print_dry_run_table(eligible: list[dict]) -> None: if not eligible: print("[INFO] No eligible (past-idle, unprocessed) transcripts found.") return headers = ["uuid", "mtime", "subst", "saved", "orphan", "scope", "would-write-path"] rows = [] for r in eligible: mt = datetime.fromtimestamp(r["mtime"]).strftime("%Y-%m-%d %H:%M") rows.append( [ r["uuid"][:8], mt, "yes" if r["substantive"] else "no", "yes" if r["saved"] else "no", "YES" if r["orphan"] else "no", _scope_str(r["scope"]), r["would_write"], ] ) widths = [len(h) for h in headers] for row in rows: for i, cell in enumerate(row): widths[i] = max(widths[i], len(str(cell))) fmt = " ".join("{:<" + str(w) + "}" for w in widths) print(fmt.format(*headers)) print(fmt.format(*["-" * w for w in widths])) for row in rows: print(fmt.format(*[str(c) for c in row])) n_orphan = sum(1 for r in eligible if r["orphan"]) print() print(f"[INFO] {len(eligible)} eligible, {n_orphan} orphan(s) would be recovered.") def _existing_recovered_for_uuid(out_dir: Path, uuid: str) -> Path | None: """Return a prior recovered log for THIS uuid in ``out_dir``, if one exists. The tool's own collision filename embeds the 8-char uuid prefix as a trailing ``-recovered-...-.md`` suffix (see ``compute_output_path``). Matching on that prefix lets a re-run overwrite its OWN prior draft for the same uuid in place -- the one safe overwrite -- instead of minting a second suffixed copy. Only files that are clearly recovered drafts (``-recovered-`` in the name AND ending in ``-.md``) are considered. A genuine non-recovered human log will never match, so its suffix protection is preserved. """ if not out_dir.is_dir(): return None short = uuid[:8] suffix = f"-{short}.md" for f in out_dir.glob(f"*-recovered-*{suffix}"): if f.is_file() and f.name.endswith(suffix): return f return None def recover_one(rec: dict) -> str: """Build + write the recovery log for one orphan. Returns the written path. Idempotent per-uuid: if a prior recovered draft for THIS uuid already exists in the target directory (a run that died after writing but before the ledger was updated), overwrite that same file in place rather than creating a new suffixed copy. Never overwrites a non-recovered human log. """ parsed = rec["parsed"] markdown, meta = engine.build_log(parsed) out_path = Path(meta["path_would_be"]) prior = _existing_recovered_for_uuid(out_path.parent, rec["uuid"]) if prior is not None: out_path = prior out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(markdown, encoding="utf-8") rec["written"] = str(out_path) rec["date"] = meta["date"] return str(out_path) def git(*args: str) -> subprocess.CompletedProcess: return subprocess.run( ["git", *args], cwd=str(engine.repo_root()), capture_output=True, text=True, timeout=120, ) def _current_branch() -> str: """Return the current git branch name, or empty string if undeterminable.""" res = git("rev-parse", "--abbrev-ref", "HEAD") if res.returncode == 0: name = res.stdout.strip() if name and name != "HEAD": return name return "" def commit_and_push(written_paths: list[str], count: int) -> bool: """Stage only the recovered logs, commit, push. Soft-fail on errors. NEVER stages the ledger -- it is machine-local and correctly gitignored; appending it to ``git add`` aborts the whole add (exit 1) and stages nothing. Returns True only when BOTH the commit AND the push succeed. On any failure returns False so the caller knows not to mark these uuids ``recovered`` (the next run must re-attempt them). """ root = engine.repo_root() rel_paths = [] for p in written_paths: try: rel_paths.append(str(Path(p).resolve().relative_to(root))) except ValueError: rel_paths.append(p) add = git("add", "--", *rel_paths) if add.returncode != 0: print(f"[WARNING] git add failed; logs are on disk but uncommitted: {add.stderr.strip()}", file=sys.stderr) return False msg = ( f"chore: auto-recover {count} unsaved session log(s)\n\n" f"{engine._COMMIT_FOOTER}" ) commit = git("commit", "-m", msg) if commit.returncode != 0: # Nothing to commit, or hook failure -- soft-fail. print(f"[WARNING] git commit returned non-zero: {commit.stdout.strip()} {commit.stderr.strip()}", file=sys.stderr) return False print(f"[OK] committed {count} recovered log(s).") branch = _current_branch() if branch: push = git("push", "origin", branch) else: push = git("push") if push.returncode != 0: target = f"origin {branch}" if branch else "origin" print( f"[WARNING] git push to {target} failed (commit is local): {push.stderr.strip()}", file=sys.stderr, ) return False print(f"[OK] pushed to origin{(' ' + branch) if branch else ''}.") return True def post_alert(recovered: list[dict]) -> None: """Post an FYI to #bot-alerts via post-bot-alert.sh. Soft-fail.""" script = engine.repo_root() / ".claude" / "scripts" / "post-bot-alert.sh" if not script.exists(): print("[WARNING] post-bot-alert.sh not found; alert skipped.", file=sys.stderr) return bash = shutil.which("bash") if not bash: print( "[WARNING] 'bash' not found on PATH (restricted scheduler env?); " "#bot-alerts FYI skipped. Recovered logs are already committed.", file=sys.stderr, ) return lines = [ f"[INFO] Auto-recovered {len(recovered)} unsaved session log(s) -- " f"already saved to the repo; FYI, please review and remove the UNVERIFIED banner:" ] for r in recovered: lines.append( f"- {r['uuid'][:8]} | {r.get('date', '?')} | {_scope_str(r['scope'])} | {r.get('written', '?')}" ) message = "\n".join(lines) try: res = subprocess.run( [bash, str(script), message, "bot"], cwd=str(engine.repo_root()), capture_output=True, text=True, timeout=30, ) out = (res.stdout or "").strip() or (res.stderr or "").strip() if out: print(out) except (OSError, subprocess.SubprocessError) as e: print(f"[WARNING] alert post failed: {e}", file=sys.stderr) def main(argv: list[str] | None = None) -> int: # Force UTF-8 stdout (Windows console defaults to cp1252; titles/paths in # the dry-run table can contain characters outside that codepage). try: sys.stdout.reconfigure(encoding="utf-8", errors="replace") except (AttributeError, ValueError): pass parser = argparse.ArgumentParser( description="Detect and auto-recover unsaved Claude Code sessions." ) parser.add_argument("--dry-run", action="store_true", help="scan + print report; no writes/commit/alert") parser.add_argument("--idle-min", type=int, default=90, help="minutes of mtime-idle before eligible (default 90)") parser.add_argument("--max", type=int, default=25, dest="max_recover", help="max orphan logs to build per run, oldest-first (default 25)") parser.add_argument("--no-commit", action="store_true", help="skip git commit/push") parser.add_argument("--no-alert", action="store_true", help="skip the Discord alert") args = parser.parse_args(argv) # Respect the ledger in both modes (dry-run still skips already-processed). ledger = load_ledger() eligible, recoverable = scan(args.idle_min, ledger) if args.dry_run: print_dry_run_table(eligible) return 0 if not eligible: print("[INFO] No eligible transcripts to process.") return 0 written_paths: list[str] = [] recovered_recs: list[dict] = [] deferred = 0 built = 0 for rec in eligible: uuid = rec["uuid"] if rec["orphan"]: # Cap actual log-builds per run (oldest-first). Remaining orphans are # left OUT of the ledger so the next run re-attempts them. if built >= args.max_recover: deferred += 1 continue try: path = recover_one(rec) except Exception as e: # noqa: BLE001 -- never let one bad transcript abort the run print(f"[WARNING] failed to recover {uuid[:8]}: {e}", file=sys.stderr) # No on-disk artifact -> safe to mark immediately. ledger[uuid] = {"verdict": "error", "at": _now_iso(), "path": None, "error": str(e)} continue built += 1 written_paths.append(path) recovered_recs.append(rec) print(f"[OK] recovered {uuid[:8]} -> {path}") elif rec["saved"]: # No on-disk artifact -> safe to mark immediately. ledger[uuid] = {"verdict": "skipped-saved", "at": _now_iso(), "path": None} else: ledger[uuid] = {"verdict": "skipped-trivial", "at": _now_iso(), "path": None} if deferred: print(f"[INFO] {deferred} more orphan(s) deferred to next run (--max {args.max_recover}).") # Persist the skipped/error verdicts now (they have no artifact, so they are # safe regardless of the commit/push outcome below). save_ledger(ledger) if not recovered_recs: print("[INFO] No orphans recovered (all eligible sessions were saved or trivial).") return 0 if not args.no_commit: pushed = commit_and_push(written_paths, len(recovered_recs)) if pushed: # H1: only mark uuids 'recovered' AFTER a successful commit+push, so a # push failure leaves them out of the ledger for the next run to retry. for rec in recovered_recs: ledger[rec["uuid"]] = { "verdict": "recovered", "at": _now_iso(), "path": rec.get("written"), } save_ledger(ledger) else: print( "[WARNING] commit/push did not succeed; recovered uuids left UNLEDGERED " "so the next run re-attempts them (logs are on disk).", file=sys.stderr, ) else: print("[INFO] --no-commit set; recovered logs left unstaged and UNLEDGERED (next run will re-attempt).") if not args.no_alert: post_alert(recovered_recs) else: print("[INFO] --no-alert set; Discord alert skipped.") return 0 if __name__ == "__main__": raise SystemExit(main())