Files
claudetools/projects/radio-show/audio-processor/classify_qa_quality.py
Mike Swanson 42688901f9 radio: Q/A usefulness classifier + min_score search filter (Track 1)
Adds an Ollama-based content quality classifier and exposes the
results via the search API. 1,407 existing Q/A pairs were scored
in 3.5h via qwen3:14b (1,405 succeeded, 2 failed).

Distribution: 37% scored 4-5 (useful), 41% scored 1-2 (banter/promo/
off-topic). 43% flagged as banter overall. Default-on filtering at
search time will hide ~half of the noise without losing any real
listener questions.

Files:
- new classify_qa_quality.py: walks qa_pairs, calls Ollama qwen3:14b
  per row, writes usefulness_score/topic_class/is_banter back to DB.
  Idempotent (--rebuild to reprocess), --smoke for sample check, --limit
  for partial runs. Detached run handles 1407 rows in ~3.5h on a 4090.
- server/main.py: /api/search accepts min_score (0-5) and exclude_banter
  query params. NULL scores treat as "include" so unprocessed rows still
  appear. Episode detail endpoint includes the new fields in qa results.

Schema migration in import_to_sqlite.py was made by the same agent run
(visible on the live archive.db: usefulness_score / topic_class /
is_banter columns now exist on qa_pairs).

Local archive.db updated; Jupiter container has NOT been redeployed
yet — that is a separate manual step.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-29 17:32:41 -07:00

582 lines
19 KiB
Python

"""
Classify each row in qa_pairs for usefulness to listeners.
Walks the qa_pairs table and uses Ollama (qwen3:14b) to score each Q/A pair on:
- usefulness_score : 1..5 (5 = solid tech help, 1 = pure banter)
- topic_class : computer-help | banter | promo | off-topic | unclear
- is_banter : 0 or 1
Idempotent by default: only processes rows where usefulness_score IS NULL.
Pass --rebuild to re-classify every row, or --limit N to sample N rows.
Usage:
py classify_qa_quality.py [--db PATH] [--limit N] [--rebuild] [--smoke]
[--ollama URL] [--model NAME] [--batch N]
Environment:
No env vars required. Ollama is auto-discovered from localhost or the
GURU-BEAST-ROG Tailscale address.
"""
import argparse
import json
import sqlite3
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from rich.console import Console
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
from rich.table import Table
BASE = Path(__file__).parent
DEFAULT_DB = BASE / "archive-data" / "archive.db"
DEFAULT_MODEL = "qwen3:14b"
DEFAULT_BATCH_SIZE = 25
DEFAULT_SMOKE_LIMIT = 10
OLLAMA_HOSTS = ("http://localhost:11434", "http://100.92.127.64:11434")
HTTP_TIMEOUT_SECONDS = 120 # qwen3:14b can take a while on cold load
PING_TIMEOUT_SECONDS = 2
VALID_TOPIC_CLASSES = {
"computer-help",
"banter",
"promo",
"off-topic",
"unclear",
}
SYSTEM_PROMPT = (
"You are a classifier for a tech radio show's Q&A archive. "
"The host (Mike Swanson) takes calls about computer problems. "
"Sometimes co-hosts or producers ask questions on-air too. "
"Classify each Q/A pair on whether the answer is useful to listeners "
"seeking computer help. Return strict JSON, no prose."
)
USER_TEMPLATE = """Question: {question}
Answer: {answer}
Return JSON:
{{
"usefulness_score": <1-5>,
"topic_class": <"computer-help" | "banter" | "promo" | "off-topic" | "unclear">,
"is_banter": <true|false>,
"reason": "<one short sentence>"
}}
Scoring guide:
5 = clear computer/tech question with substantive technical answer (Windows, Mac, hardware, software, network, security, etc.)
4 = useful tech topic but lighter answer or partially off-topic
3 = mixed / borderline / general advice
2 = mostly banter, joke, or aside; minimal useful content
1 = pure banter, social, ad-read, or unrelated chat
is_banter is true when the exchange is conversational filler, jokes, or social interaction without listener-actionable content - even if asked by a "caller". A producer asking a real tech question is NOT banter.
"""
# Truncate Q/A text we send to the model. The classifier cares about the
# nature of the exchange, not exhaustive content - so capping at ~1.2 KB Q +
# ~2 KB A keeps prompts small and fast without losing classification signal.
MAX_QUESTION_CHARS = 1200
MAX_ANSWER_CHARS = 2000
console = Console()
# ---------------------------------------------------------------------------
# Ollama discovery + chat
# ---------------------------------------------------------------------------
def find_ollama() -> str:
"""Return the first reachable Ollama base URL.
Mirrors the project convention: try localhost, then the Tailscale address
of the workstation that's expected to run the model. Raises if neither
answers within PING_TIMEOUT_SECONDS.
"""
for url in OLLAMA_HOSTS:
try:
with urllib.request.urlopen(url + "/api/tags", timeout=PING_TIMEOUT_SECONDS):
return url
except Exception:
continue
raise RuntimeError(
"No Ollama instance reachable. Tried: " + ", ".join(OLLAMA_HOSTS)
)
def ollama_chat(base_url: str, model: str, messages: list[dict]) -> str:
"""POST to /api/chat and return the assistant message content.
Uses temperature=0 + format='json' so the model is biased toward returning
valid JSON. format='json' is an Ollama-native flag that constrains output
to the JSON grammar.
"""
body = json.dumps(
{
"model": model,
"messages": messages,
"stream": False,
"format": "json",
"options": {"temperature": 0},
}
).encode("utf-8")
req = urllib.request.Request(
base_url + "/api/chat",
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT_SECONDS) as resp:
payload = json.loads(resp.read().decode("utf-8"))
return payload["message"]["content"]
# ---------------------------------------------------------------------------
# Response parsing
# ---------------------------------------------------------------------------
def _extract_json_object(raw: str) -> dict:
"""Locate the first '{...}' span in the response and json.loads it.
qwen3 with format='json' usually returns clean JSON, but tolerate stray
whitespace, code fences, or thinking tags by clipping to the outermost
braces before parsing.
"""
start = raw.find("{")
end = raw.rfind("}")
if start < 0 or end <= start:
raise ValueError("no JSON object found in model response")
return json.loads(raw[start : end + 1])
def _normalize(parsed: dict) -> dict:
"""Validate + coerce a parsed response into the canonical shape.
Returns a dict with keys: usefulness_score (int 1..5), topic_class (str
from VALID_TOPIC_CLASSES), is_banter (0|1), reason (str).
Raises ValueError for any out-of-range or missing field.
"""
score = parsed.get("usefulness_score")
if isinstance(score, str):
score = score.strip()
if score.isdigit():
score = int(score)
if not isinstance(score, int) or not (1 <= score <= 5):
raise ValueError(f"usefulness_score out of range: {parsed.get('usefulness_score')!r}")
topic_class = parsed.get("topic_class")
if not isinstance(topic_class, str):
raise ValueError(f"topic_class not a string: {topic_class!r}")
topic_class = topic_class.strip().lower()
if topic_class not in VALID_TOPIC_CLASSES:
raise ValueError(f"topic_class not recognized: {topic_class!r}")
is_banter_raw = parsed.get("is_banter")
if isinstance(is_banter_raw, bool):
is_banter = 1 if is_banter_raw else 0
elif isinstance(is_banter_raw, (int, float)):
is_banter = 1 if int(is_banter_raw) else 0
elif isinstance(is_banter_raw, str):
v = is_banter_raw.strip().lower()
if v in ("true", "yes", "1"):
is_banter = 1
elif v in ("false", "no", "0"):
is_banter = 0
else:
raise ValueError(f"is_banter not boolean-coercible: {is_banter_raw!r}")
else:
raise ValueError(f"is_banter missing or unrecognized: {is_banter_raw!r}")
reason = parsed.get("reason") or ""
if not isinstance(reason, str):
reason = str(reason)
return {
"usefulness_score": score,
"topic_class": topic_class,
"is_banter": is_banter,
"reason": reason.strip(),
}
def classify_one(
base_url: str,
model: str,
question: str,
answer: str,
) -> dict | None:
"""Classify a single Q/A pair. Returns normalized dict or None on failure.
Retries once on JSON-parse failure with a corrective follow-up message.
Network/HTTP errors propagate to the caller (treated as transient).
"""
q = (question or "")[:MAX_QUESTION_CHARS]
a = (answer or "")[:MAX_ANSWER_CHARS]
user_msg = USER_TEMPLATE.format(question=q, answer=a)
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_msg},
]
raw = ollama_chat(base_url, model, messages)
try:
return _normalize(_extract_json_object(raw))
except (ValueError, json.JSONDecodeError) as first_err:
# One retry: tell the model its previous reply was invalid and ask
# for a clean JSON object.
messages.append({"role": "assistant", "content": raw})
messages.append(
{
"role": "user",
"content": (
"Your last reply wasn't valid JSON in the required shape. "
"Return ONLY a JSON object with keys: usefulness_score (1-5 integer), "
'topic_class (one of "computer-help","banter","promo","off-topic","unclear"), '
"is_banter (true/false), reason (string)."
),
}
)
try:
raw2 = ollama_chat(base_url, model, messages)
return _normalize(_extract_json_object(raw2))
except (ValueError, json.JSONDecodeError) as second_err:
console.print(
f"[yellow][WARNING][/yellow] JSON parse failed twice "
f"(first: {first_err}; second: {second_err}); skipping row"
)
return None
# ---------------------------------------------------------------------------
# DB helpers
# ---------------------------------------------------------------------------
def fetch_rows(
conn: sqlite3.Connection,
rebuild: bool,
limit: int | None,
) -> list[sqlite3.Row]:
"""Pull rows that need classification, in deterministic id order."""
sql = (
"SELECT id, question_text, answer_text, caller_name "
"FROM qa_pairs"
)
if not rebuild:
sql += " WHERE usefulness_score IS NULL"
sql += " ORDER BY id"
if limit is not None and limit > 0:
sql += f" LIMIT {int(limit)}"
return conn.execute(sql).fetchall()
def commit_batch(conn: sqlite3.Connection, updates: list[tuple]) -> None:
"""Flush a batch of (score, topic_class, is_banter, id) tuples."""
if not updates:
return
with conn:
conn.executemany(
"UPDATE qa_pairs "
"SET usefulness_score = ?, topic_class = ?, is_banter = ? "
"WHERE id = ?",
updates,
)
def summarize(conn: sqlite3.Connection) -> None:
"""Print final distribution tables to stdout."""
total = conn.execute("SELECT COUNT(*) FROM qa_pairs").fetchone()[0]
scored = conn.execute(
"SELECT COUNT(*) FROM qa_pairs WHERE usefulness_score IS NOT NULL"
).fetchone()[0]
console.print()
console.print(
f"[bold]Coverage:[/bold] {scored} / {total} rows scored "
f"({100 * scored / total:.1f}%)"
)
by_class = conn.execute(
"SELECT topic_class, COUNT(*) AS n FROM qa_pairs "
"WHERE topic_class IS NOT NULL GROUP BY topic_class ORDER BY n DESC"
).fetchall()
if by_class:
t = Table(title="Distribution by topic_class", show_lines=False)
t.add_column("topic_class")
t.add_column("count", justify="right")
t.add_column("share", justify="right")
for row in by_class:
share = 100 * row[1] / scored if scored else 0
t.add_row(row[0], str(row[1]), f"{share:.1f}%")
console.print(t)
by_score = conn.execute(
"SELECT usefulness_score, COUNT(*) AS n FROM qa_pairs "
"WHERE usefulness_score IS NOT NULL GROUP BY usefulness_score "
"ORDER BY usefulness_score DESC"
).fetchall()
if by_score:
t = Table(title="Distribution by usefulness_score", show_lines=False)
t.add_column("score", justify="right")
t.add_column("count", justify="right")
t.add_column("share", justify="right")
for row in by_score:
share = 100 * row[1] / scored if scored else 0
t.add_row(str(row[0]), str(row[1]), f"{share:.1f}%")
console.print(t)
banter_count = conn.execute(
"SELECT COUNT(*) FROM qa_pairs WHERE is_banter = 1"
).fetchone()[0]
console.print(
f"[bold]Flagged as banter:[/bold] {banter_count} "
f"({100 * banter_count / scored:.1f}% of scored)"
if scored
else "[bold]Flagged as banter:[/bold] 0"
)
# ---------------------------------------------------------------------------
# Modes: smoke, full
# ---------------------------------------------------------------------------
def run_smoke(
conn: sqlite3.Connection,
base_url: str,
model: str,
sample_size: int,
) -> None:
"""Print classifications for the first N rows. No DB writes."""
rows = conn.execute(
"SELECT id, question_text, answer_text, caller_name "
"FROM qa_pairs ORDER BY id LIMIT ?",
(sample_size,),
).fetchall()
console.print(
f"[bold]Smoke test:[/bold] classifying first {len(rows)} rows "
f"(no DB writes)\n"
)
for row in rows:
qid, q_text, a_text, caller = row
console.rule(f"[bold]Row {qid}[/bold] caller={caller or '-'}")
q_excerpt = (q_text or "").strip().replace("\n", " ")[:240]
a_excerpt = (a_text or "").strip().replace("\n", " ")[:240]
console.print(f"[cyan]Q:[/cyan] {q_excerpt}")
console.print(f"[cyan]A:[/cyan] {a_excerpt}")
try:
t0 = time.monotonic()
result = classify_one(base_url, model, q_text or "", a_text or "")
dt = time.monotonic() - t0
except Exception as e:
console.print(f"[red][ERROR][/red] {e}")
continue
if result is None:
console.print("[yellow][WARNING][/yellow] classifier returned no result")
continue
console.print(
f"[green]score={result['usefulness_score']}[/green] "
f"class=[magenta]{result['topic_class']}[/magenta] "
f"is_banter={result['is_banter']} "
f"({dt:.1f}s)"
)
console.print(f" reason: {result['reason']}")
def run_full(
conn: sqlite3.Connection,
base_url: str,
model: str,
rebuild: bool,
limit: int | None,
batch_size: int,
) -> tuple[int, int, int]:
"""Walk all eligible rows, classify, persist in batches.
Returns (n_processed, n_skipped, n_failed).
"""
rows = fetch_rows(conn, rebuild=rebuild, limit=limit)
if not rows:
console.print("[green][OK][/green] no rows to process - everything is already scored")
return (0, 0, 0)
console.print(
f"[bold]Classifying {len(rows)} rows[/bold] via {model} at {base_url} "
f"(batch={batch_size}, rebuild={rebuild})"
)
pending: list[tuple] = []
n_processed = 0
n_failed = 0
progress = Progress(
SpinnerColumn(),
TextColumn("[bold blue]classify[/bold blue]"),
BarColumn(bar_width=40),
MofNCompleteColumn(),
TaskProgressColumn(),
TimeElapsedColumn(),
TimeRemainingColumn(),
TextColumn("{task.fields[stat]}"),
console=console,
)
with progress:
task = progress.add_task("rows", total=len(rows), stat="")
for row in rows:
qid = row["id"]
try:
result = classify_one(
base_url,
model,
row["question_text"] or "",
row["answer_text"] or "",
)
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as e:
console.print(f"[red][ERROR][/red] row {qid}: network failure: {e}")
result = None
except Exception as e:
console.print(
f"[red][ERROR][/red] row {qid}: unexpected {type(e).__name__}: {e}"
)
result = None
if result is None:
n_failed += 1
else:
pending.append(
(
result["usefulness_score"],
result["topic_class"],
result["is_banter"],
qid,
)
)
n_processed += 1
if len(pending) >= batch_size:
commit_batch(conn, pending)
pending.clear()
progress.update(
task,
advance=1,
stat=f"ok={n_processed} fail={n_failed}",
)
# Flush remainder.
commit_batch(conn, pending)
pending.clear()
return (n_processed, 0, n_failed)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--db", default=str(DEFAULT_DB), help="Path to archive.db")
ap.add_argument(
"--model",
default=DEFAULT_MODEL,
help=f"Ollama model name (default: {DEFAULT_MODEL})",
)
ap.add_argument(
"--ollama",
default=None,
help="Ollama base URL (default: auto-discover localhost / Tailscale)",
)
ap.add_argument(
"--limit",
type=int,
default=None,
help="Process at most N rows (sampling mode)",
)
ap.add_argument(
"--rebuild",
action="store_true",
help="Re-classify rows that already have a usefulness_score",
)
ap.add_argument(
"--smoke",
action="store_true",
help=f"Print {DEFAULT_SMOKE_LIMIT}-row sample without writing to DB",
)
ap.add_argument(
"--batch",
type=int,
default=DEFAULT_BATCH_SIZE,
help=f"Commit every N rows (default {DEFAULT_BATCH_SIZE})",
)
args = ap.parse_args()
db_path = Path(args.db)
if not db_path.exists():
console.print(f"[red][ERROR][/red] DB not found: {db_path}")
return 1
try:
base_url = args.ollama or find_ollama()
except RuntimeError as e:
console.print(f"[red][ERROR][/red] {e}")
return 1
console.print(f"[INFO] Ollama: {base_url} model: {args.model}")
# For smoke mode we open RO; full mode needs RW.
if args.smoke:
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
conn.row_factory = sqlite3.Row
try:
run_smoke(conn, base_url, args.model, DEFAULT_SMOKE_LIMIT)
finally:
conn.close()
return 0
# Confirm migration has run before we try to write.
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
cols = {row[1] for row in conn.execute("PRAGMA table_info(qa_pairs)").fetchall()}
missing = {"usefulness_score", "topic_class", "is_banter"} - cols
if missing:
console.print(
f"[red][ERROR][/red] qa_pairs is missing columns: {sorted(missing)}. "
"Run import_to_sqlite.py first (it auto-migrates)."
)
conn.close()
return 1
t0 = time.monotonic()
try:
n_ok, _n_skip, n_fail = run_full(
conn,
base_url,
args.model,
rebuild=args.rebuild,
limit=args.limit,
batch_size=args.batch,
)
finally:
elapsed = time.monotonic() - t0
summarize(conn)
conn.close()
console.print(
f"\n[bold]Done in {elapsed:.1f}s[/bold] "
f"processed={n_ok} failed={n_fail}"
)
if n_fail > 0:
return 2
return 0
if __name__ == "__main__":
sys.exit(main())