Files
claudetools/.claude/skills/errorlog-dream/scripts/errorlog_dream.py
Mike Swanson 2937b00ebf sync: auto-sync from GURU-5070 at 2026-07-01 15:49:56
Author: Mike Swanson
Machine: GURU-5070
Timestamp: 2026-07-01 15:49:56
2026-07-01 15:50:54 -07:00

348 lines
13 KiB
Python

#!/usr/bin/env python
"""errorlog-dream: lint the fleet error log (errorlog.md).
Read-only by default. Parses the canonical entry format
YYYY-MM-DD | MACHINE | skill/context | [type] message [ctx: k=v ...] (xN)
and reports the patterns the log exists to surface: which contexts generate
the most failures, which documented rules keep getting violated (repeat ref=
citations), which identical failures recur across days (noise clusters that
need a skill-side fix, not more logging), resolved entries, machine-name
drift, and entries old enough to archive.
The single mutating mode, --apply-archive, moves entries older than --days
(default 60) into errorlog-archive/YYYY-MM.md. Everything judgment-shaped
stays in the PROPOSED section for the operator, mirroring memory-dream.
"""
import argparse
import io
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta, timezone
MARKER = "<!-- Append entries below this line -->"
ENTRY_RE = re.compile(
r"^(\d{4}-\d{2}-\d{2}) \| ([^|]+?) \| ([^|]+?) \| (.*)$"
)
TYPE_RE = re.compile(r"^\[(correction|friction|[a-z-]+)\]\s+")
CTX_RE = re.compile(r"\[ctx: ([^\]]*)\]")
REF_RE = re.compile(r"ref=([A-Za-z0-9_./#-]+)")
COUNT_RE = re.compile(r" \(x(\d+)\)\s*$")
RESOLVED_RE = re.compile(r"\[RESOLVED[^\]]*\]", re.IGNORECASE)
def find_root():
env = os.environ.get("CLAUDETOOLS_ROOT")
if env and os.path.isdir(env):
return env
here = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
idf = os.path.join(here, ".claude", "identity.json")
if os.path.isfile(idf):
try:
with io.open(idf, encoding="utf-8") as fh:
root = json.load(fh).get("claudetools_root")
if root and os.path.isdir(root):
return root
except Exception:
pass
return here
class Entry(object):
__slots__ = ("date", "machine", "skill", "msg", "etype", "ctx", "refs",
"count", "resolved", "lines", "raw_first")
def __init__(self, date, machine, skill, msg, lines):
self.date = date
self.machine = machine.strip()
self.skill = skill.strip()
self.lines = lines # verbatim block lines (for archiving)
self.raw_first = lines[0]
m = COUNT_RE.search(msg)
self.count = int(m.group(1)) if m else 1
msg = COUNT_RE.sub("", msg)
t = TYPE_RE.match(msg)
self.etype = t.group(1) if t else "exec"
cm = CTX_RE.search(msg)
self.ctx = cm.group(1) if cm else ""
self.refs = REF_RE.findall(msg)
self.resolved = bool(RESOLVED_RE.search(" ".join(lines)))
self.msg = msg
@property
def context_group(self):
return self.skill.split("/", 1)[0].strip()
def norm_msg(self):
"""Message with volatile tokens (ids, numbers, hex, paths' digits)
collapsed, for grouping recurring failures across days."""
m = TYPE_RE.sub("", self.msg)
m = CTX_RE.sub("", m)
m = re.sub(r"[0-9a-fA-F]{8}-[0-9a-fA-F-]{27,}", "<uuid>", m)
m = re.sub(r"\b[0-9a-fA-F]{12,}\b", "<hex>", m)
m = re.sub(r"\d+", "<n>", m)
return re.sub(r"\s+", " ", m).strip().lower()
def parse_log(path):
"""Return (header_lines, entries, unparsed_blocks, trailing_map).
Blocks are runs of consecutive non-blank lines after the marker. A block
whose first line matches ENTRY_RE is an Entry (continuation lines belong
to it -- the pre-helper era wrote multi-line entries by hand); anything
else is an unparsed block, reported but never touched.
"""
with io.open(path, encoding="utf-8") as fh:
lines = fh.read().splitlines()
header, rest, seen_marker = [], [], False
for ln in lines:
(rest if seen_marker else header).append(ln)
if not seen_marker and ln.strip() == MARKER:
seen_marker = True
if not seen_marker:
rest, header = header, []
blocks, cur = [], []
for ln in rest:
if ln.strip():
cur.append(ln)
elif cur:
blocks.append(cur)
cur = []
if cur:
blocks.append(cur)
entries, unparsed = [], []
for b in blocks:
m = ENTRY_RE.match(b[0])
if m:
entries.append(Entry(m.group(1), m.group(2), m.group(3), m.group(4), b))
else:
unparsed.append(b)
return header, entries, unparsed
def analyze(entries, unparsed, root, archive_days, today):
r = {}
weight = lambda es: sum(e.count for e in es)
r["total"] = len(entries)
r["weighted"] = weight(entries)
r["unparsed"] = len(unparsed)
if entries:
r["span"] = (min(e.date for e in entries), max(e.date for e in entries))
by_type = defaultdict(int)
for e in entries:
by_type[e.etype] += 1
r["by_type"] = dict(by_type)
ctxs = defaultdict(list)
for e in entries:
ctxs[e.context_group].append(e)
r["top_contexts"] = sorted(
((c, weight(es), len(es)) for c, es in ctxs.items()),
key=lambda t: -t[1])[:15]
# repeat ref= citations: >=2 means a documented rule/memory is not sticking
refs = defaultdict(list)
for e in entries:
for ref in e.refs:
refs[ref].append(e)
mem_dir = os.path.join(root, ".claude", "memory")
rep = []
for ref, es in sorted(refs.items(), key=lambda kv: -len(kv[1])):
if len(es) < 2:
continue
base = ref.split("#", 1)[0].split("/")[-1]
cand = base if base.endswith(".md") else base + ".md"
exists = os.path.isfile(os.path.join(mem_dir, cand))
rep.append((ref, len(es), exists, sorted({e.date for e in es})[-3:]))
r["repeat_refs"] = rep
# noise clusters: same machine+skill+normalized message on >=3 distinct
# days (the helper's (xN) dedup already collapses same-day repeats)
clusters = defaultdict(list)
for e in entries:
clusters[(e.machine, e.skill, e.norm_msg())].append(e)
noise = []
for (mach, skill, norm), es in clusters.items():
days = sorted({e.date for e in es})
if len(days) >= 3 or weight(es) >= 5:
noise.append((mach, skill, norm[:110], weight(es), len(days)))
r["noise"] = sorted(noise, key=lambda t: -t[3])[:15]
r["resolved"] = [e for e in entries if e.resolved]
machines = defaultdict(set)
for e in entries:
machines[e.machine.lower()].add(e.machine)
r["machine_drift"] = {k: sorted(v) for k, v in machines.items() if len(v) > 1}
r["by_machine"] = sorted(
((m, weight(es)) for m, es in
((m, [e for e in entries if e.machine.lower() == m]) for m in machines)),
key=lambda t: -t[1])
cutoff = (today - timedelta(days=archive_days)).strftime("%Y-%m-%d")
r["cutoff"] = cutoff
r["archive"] = [e for e in entries if e.date < cutoff]
return r
def render(r, archive_days):
L = []
add = L.append
add("# errorlog-dream report")
add("")
add("## SUMMARY")
span = r.get("span")
add("- entries: %d parsed (%d weighted with (xN) counters), %d unparsed legacy block(s)"
% (r["total"], r["weighted"], r["unparsed"]))
if span:
add("- span: %s .. %s" % span)
add("- by type: " + ", ".join("%s=%d" % kv for kv in sorted(r["by_type"].items())))
add("- by machine: " + ", ".join("%s=%d" % kv for kv in r["by_machine"]))
add("")
add("## TOP CONTEXTS (weighted)")
for c, w, n in r["top_contexts"]:
add("- %-22s %4d (%d entries)" % (c, w, n))
add("")
add("## REPEAT REFS -- documented rules that are NOT sticking")
if r["repeat_refs"]:
for ref, n, exists, dates in r["repeat_refs"]:
add("- ref=%s cited %dx (last: %s) -- memory file %s"
% (ref, n, ", ".join(dates), "exists" if exists else "NOT FOUND"))
else:
add("- none")
add("")
add("## NOISE CLUSTERS -- identical failures recurring across days")
if r["noise"]:
for mach, skill, norm, w, days in r["noise"]:
add("- %s | %s | %dx over %d day(s): %s" % (mach, skill, w, days, norm))
else:
add("- none")
add("")
add("## RESOLVED entries (archive candidates regardless of age)")
for e in r["resolved"]:
add("- %s | %s | %s" % (e.date, e.machine, e.skill))
if not r["resolved"]:
add("- none")
add("")
add("## MACHINE-NAME DRIFT")
if r["machine_drift"]:
for k, variants in sorted(r["machine_drift"].items()):
add("- %s spelled %s -- normalize identity.json .machine on the odd one out"
% (k, " / ".join(variants)))
else:
add("- none")
add("")
add("## ARCHIVE CANDIDATES (older than %d days, cutoff %s)" % (archive_days, r["cutoff"]))
add("- %d entr%s -- run --apply-archive to move them to errorlog-archive/YYYY-MM.md"
% (len(r["archive"]), "y" if len(r["archive"]) == 1 else "ies"))
add("")
add("## PROPOSED (needs human approval)")
for ref, n, exists, dates in r["repeat_refs"]:
add("- [STRENGTHEN?] ref=%s keeps repeating (%dx)%s -- the prose rule failed; "
"add a mechanical guard (hook/wrapper/preflight) or rewrite the memory"
% (ref, n, "" if exists else " (and the cited memory file is MISSING)"))
for mach, skill, norm, w, days in r["noise"]:
add("- [SUPPRESS?] %s/%s fails identically %dx over %d days -- fix the skill "
"(backoff, expected-condition filter, or health-gate), don't keep logging it"
% (mach, skill, w, days))
for e in r["resolved"]:
add("- [ARCHIVE?] resolved entry %s | %s | %s can move to the archive now"
% (e.date, e.machine, e.skill))
if not (r["repeat_refs"] or r["noise"] or r["resolved"]):
add("- nothing to propose")
add("")
return "\n".join(L)
def apply_archive(log_path, root, header, entries, unparsed, cutoff_entries):
"""Move cutoff_entries' blocks into errorlog-archive/YYYY-MM.md (append,
newest-first order preserved as-is) and rewrite errorlog.md without them.
Unparsed blocks are never moved."""
arch_dir = os.path.join(root, "errorlog-archive")
if not os.path.isdir(arch_dir):
os.makedirs(arch_dir)
by_month = defaultdict(list)
for e in cutoff_entries:
by_month[e.date[:7]].append(e)
for month, es in sorted(by_month.items()):
p = os.path.join(arch_dir, "%s.md" % month)
new = not os.path.isfile(p)
with io.open(p, "a", encoding="utf-8", newline="\n") as fh:
if new:
fh.write("# Error Log archive -- %s\n\nMoved out of errorlog.md by "
"errorlog-dream --apply-archive.\n" % month)
for e in es:
fh.write("\n" + "\n".join(e.lines) + "\n")
print("[OK] archived %d entr%s -> errorlog-archive/%s.md"
% (len(es), "y" if len(es) == 1 else "ies", month))
keep_ids = {id(e) for e in entries} - {id(e) for e in cutoff_entries}
out = list(header)
for e in entries:
if id(e) in keep_ids:
out.append("")
out.extend(e.lines)
for b in unparsed:
out.append("")
out.extend(b)
out.append("")
with io.open(log_path, "w", encoding="utf-8", newline="\n") as fh:
fh.write("\n".join(out))
print("[OK] errorlog.md rewritten: %d entries kept, %d archived, %d unparsed block(s) untouched"
% (len(keep_ids), len(cutoff_entries), len(unparsed)))
def main(argv=None):
ap = argparse.ArgumentParser(description="lint errorlog.md")
ap.add_argument("--days", type=int, default=60,
help="archive-candidate age threshold (default 60)")
ap.add_argument("--apply-archive", action="store_true",
help="move entries older than --days to errorlog-archive/")
ap.add_argument("--no-file", action="store_true",
help="print report to stdout only")
ap.add_argument("--report-file", default=None)
ap.add_argument("--log", default=None, help="path to errorlog.md (for tests)")
ap.add_argument("--root", default=None, help="repo root override (for tests)")
args = ap.parse_args(argv)
root = args.root or find_root()
log_path = args.log or os.path.join(root, "errorlog.md")
if not os.path.isfile(log_path):
print("[ERROR] %s not found" % log_path, file=sys.stderr)
return 2
header, entries, unparsed = parse_log(log_path)
today = datetime.now(timezone.utc)
r = analyze(entries, unparsed, root, args.days, today)
report = render(r, args.days)
print(report)
if not args.no_file:
rp = args.report_file
if not rp:
rdir = os.path.join(root, "errorlog-archive", "_reports")
if not os.path.isdir(rdir):
os.makedirs(rdir)
rp = os.path.join(rdir, today.strftime("%Y-%m-%d-%H%M") + "-dream.md")
with io.open(rp, "w", encoding="utf-8", newline="\n") as fh:
fh.write(report + "\n")
print("[OK] report written: %s" % os.path.relpath(rp, root))
if args.apply_archive:
if r["archive"]:
apply_archive(log_path, root, header, entries, unparsed, r["archive"])
else:
print("[OK] nothing old enough to archive (cutoff %s)" % r["cutoff"])
return 0
if __name__ == "__main__":
raise SystemExit(main())