""" Classify each row in qa_pairs for usefulness to listeners. Walks the qa_pairs table and uses Ollama (qwen3:14b) to score each Q/A pair on: - usefulness_score : 1..5 (5 = solid tech help, 1 = pure banter) - topic_class : computer-help | banter | promo | off-topic | unclear - is_banter : 0 or 1 Idempotent by default: only processes rows where usefulness_score IS NULL. Pass --rebuild to re-classify every row, or --limit N to sample N rows. Usage: py classify_qa_quality.py [--db PATH] [--limit N] [--rebuild] [--smoke] [--ollama URL] [--model NAME] [--batch N] Environment: No env vars required. Ollama is auto-discovered from localhost or the GURU-BEAST-ROG Tailscale address. """ import argparse import json import sqlite3 import sys import time import urllib.error import urllib.request from pathlib import Path from rich.console import Console from rich.progress import ( BarColumn, MofNCompleteColumn, Progress, SpinnerColumn, TaskProgressColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn, ) from rich.table import Table BASE = Path(__file__).parent DEFAULT_DB = BASE / "archive-data" / "archive.db" DEFAULT_MODEL = "qwen3:14b" DEFAULT_BATCH_SIZE = 25 DEFAULT_SMOKE_LIMIT = 10 OLLAMA_HOSTS = ("http://localhost:11434", "http://100.92.127.64:11434") HTTP_TIMEOUT_SECONDS = 120 # qwen3:14b can take a while on cold load PING_TIMEOUT_SECONDS = 2 VALID_TOPIC_CLASSES = { "computer-help", "banter", "promo", "off-topic", "unclear", } SYSTEM_PROMPT = ( "You are a classifier for a tech radio show's Q&A archive. " "The host (Mike Swanson) takes calls about computer problems. " "Sometimes co-hosts or producers ask questions on-air too. " "Classify each Q/A pair on whether the answer is useful to listeners " "seeking computer help. Return strict JSON, no prose." ) USER_TEMPLATE = """Question: {question} Answer: {answer} Return JSON: {{ "usefulness_score": <1-5>, "topic_class": <"computer-help" | "banter" | "promo" | "off-topic" | "unclear">, "is_banter": , "reason": "" }} Scoring guide: 5 = clear computer/tech question with substantive technical answer (Windows, Mac, hardware, software, network, security, etc.) 4 = useful tech topic but lighter answer or partially off-topic 3 = mixed / borderline / general advice 2 = mostly banter, joke, or aside; minimal useful content 1 = pure banter, social, ad-read, or unrelated chat is_banter is true when the exchange is conversational filler, jokes, or social interaction without listener-actionable content - even if asked by a "caller". A producer asking a real tech question is NOT banter. """ # Truncate Q/A text we send to the model. The classifier cares about the # nature of the exchange, not exhaustive content - so capping at ~1.2 KB Q + # ~2 KB A keeps prompts small and fast without losing classification signal. MAX_QUESTION_CHARS = 1200 MAX_ANSWER_CHARS = 2000 console = Console() # --------------------------------------------------------------------------- # Ollama discovery + chat # --------------------------------------------------------------------------- def find_ollama() -> str: """Return the first reachable Ollama base URL. Mirrors the project convention: try localhost, then the Tailscale address of the workstation that's expected to run the model. Raises if neither answers within PING_TIMEOUT_SECONDS. """ for url in OLLAMA_HOSTS: try: with urllib.request.urlopen(url + "/api/tags", timeout=PING_TIMEOUT_SECONDS): return url except Exception: continue raise RuntimeError( "No Ollama instance reachable. Tried: " + ", ".join(OLLAMA_HOSTS) ) def ollama_chat(base_url: str, model: str, messages: list[dict]) -> str: """POST to /api/chat and return the assistant message content. Uses temperature=0 + format='json' so the model is biased toward returning valid JSON. format='json' is an Ollama-native flag that constrains output to the JSON grammar. """ body = json.dumps( { "model": model, "messages": messages, "stream": False, "format": "json", "options": {"temperature": 0}, } ).encode("utf-8") req = urllib.request.Request( base_url + "/api/chat", data=body, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT_SECONDS) as resp: payload = json.loads(resp.read().decode("utf-8")) return payload["message"]["content"] # --------------------------------------------------------------------------- # Response parsing # --------------------------------------------------------------------------- def _extract_json_object(raw: str) -> dict: """Locate the first '{...}' span in the response and json.loads it. qwen3 with format='json' usually returns clean JSON, but tolerate stray whitespace, code fences, or thinking tags by clipping to the outermost braces before parsing. """ start = raw.find("{") end = raw.rfind("}") if start < 0 or end <= start: raise ValueError("no JSON object found in model response") return json.loads(raw[start : end + 1]) def _normalize(parsed: dict) -> dict: """Validate + coerce a parsed response into the canonical shape. Returns a dict with keys: usefulness_score (int 1..5), topic_class (str from VALID_TOPIC_CLASSES), is_banter (0|1), reason (str). Raises ValueError for any out-of-range or missing field. """ score = parsed.get("usefulness_score") if isinstance(score, str): score = score.strip() if score.isdigit(): score = int(score) if not isinstance(score, int) or not (1 <= score <= 5): raise ValueError(f"usefulness_score out of range: {parsed.get('usefulness_score')!r}") topic_class = parsed.get("topic_class") if not isinstance(topic_class, str): raise ValueError(f"topic_class not a string: {topic_class!r}") topic_class = topic_class.strip().lower() if topic_class not in VALID_TOPIC_CLASSES: raise ValueError(f"topic_class not recognized: {topic_class!r}") is_banter_raw = parsed.get("is_banter") if isinstance(is_banter_raw, bool): is_banter = 1 if is_banter_raw else 0 elif isinstance(is_banter_raw, (int, float)): is_banter = 1 if int(is_banter_raw) else 0 elif isinstance(is_banter_raw, str): v = is_banter_raw.strip().lower() if v in ("true", "yes", "1"): is_banter = 1 elif v in ("false", "no", "0"): is_banter = 0 else: raise ValueError(f"is_banter not boolean-coercible: {is_banter_raw!r}") else: raise ValueError(f"is_banter missing or unrecognized: {is_banter_raw!r}") reason = parsed.get("reason") or "" if not isinstance(reason, str): reason = str(reason) return { "usefulness_score": score, "topic_class": topic_class, "is_banter": is_banter, "reason": reason.strip(), } def classify_one( base_url: str, model: str, question: str, answer: str, ) -> dict | None: """Classify a single Q/A pair. Returns normalized dict or None on failure. Retries once on JSON-parse failure with a corrective follow-up message. Network/HTTP errors propagate to the caller (treated as transient). """ q = (question or "")[:MAX_QUESTION_CHARS] a = (answer or "")[:MAX_ANSWER_CHARS] user_msg = USER_TEMPLATE.format(question=q, answer=a) messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_msg}, ] raw = ollama_chat(base_url, model, messages) try: return _normalize(_extract_json_object(raw)) except (ValueError, json.JSONDecodeError) as first_err: # One retry: tell the model its previous reply was invalid and ask # for a clean JSON object. messages.append({"role": "assistant", "content": raw}) messages.append( { "role": "user", "content": ( "Your last reply wasn't valid JSON in the required shape. " "Return ONLY a JSON object with keys: usefulness_score (1-5 integer), " 'topic_class (one of "computer-help","banter","promo","off-topic","unclear"), ' "is_banter (true/false), reason (string)." ), } ) try: raw2 = ollama_chat(base_url, model, messages) return _normalize(_extract_json_object(raw2)) except (ValueError, json.JSONDecodeError) as second_err: console.print( f"[yellow][WARNING][/yellow] JSON parse failed twice " f"(first: {first_err}; second: {second_err}); skipping row" ) return None # --------------------------------------------------------------------------- # DB helpers # --------------------------------------------------------------------------- def fetch_rows( conn: sqlite3.Connection, rebuild: bool, limit: int | None, ) -> list[sqlite3.Row]: """Pull rows that need classification, in deterministic id order.""" sql = ( "SELECT id, question_text, answer_text, caller_name " "FROM qa_pairs" ) if not rebuild: sql += " WHERE usefulness_score IS NULL" sql += " ORDER BY id" if limit is not None and limit > 0: sql += f" LIMIT {int(limit)}" return conn.execute(sql).fetchall() def commit_batch(conn: sqlite3.Connection, updates: list[tuple]) -> None: """Flush a batch of (score, topic_class, is_banter, id) tuples.""" if not updates: return with conn: conn.executemany( "UPDATE qa_pairs " "SET usefulness_score = ?, topic_class = ?, is_banter = ? " "WHERE id = ?", updates, ) def summarize(conn: sqlite3.Connection) -> None: """Print final distribution tables to stdout.""" total = conn.execute("SELECT COUNT(*) FROM qa_pairs").fetchone()[0] scored = conn.execute( "SELECT COUNT(*) FROM qa_pairs WHERE usefulness_score IS NOT NULL" ).fetchone()[0] console.print() console.print( f"[bold]Coverage:[/bold] {scored} / {total} rows scored " f"({100 * scored / total:.1f}%)" ) by_class = conn.execute( "SELECT topic_class, COUNT(*) AS n FROM qa_pairs " "WHERE topic_class IS NOT NULL GROUP BY topic_class ORDER BY n DESC" ).fetchall() if by_class: t = Table(title="Distribution by topic_class", show_lines=False) t.add_column("topic_class") t.add_column("count", justify="right") t.add_column("share", justify="right") for row in by_class: share = 100 * row[1] / scored if scored else 0 t.add_row(row[0], str(row[1]), f"{share:.1f}%") console.print(t) by_score = conn.execute( "SELECT usefulness_score, COUNT(*) AS n FROM qa_pairs " "WHERE usefulness_score IS NOT NULL GROUP BY usefulness_score " "ORDER BY usefulness_score DESC" ).fetchall() if by_score: t = Table(title="Distribution by usefulness_score", show_lines=False) t.add_column("score", justify="right") t.add_column("count", justify="right") t.add_column("share", justify="right") for row in by_score: share = 100 * row[1] / scored if scored else 0 t.add_row(str(row[0]), str(row[1]), f"{share:.1f}%") console.print(t) banter_count = conn.execute( "SELECT COUNT(*) FROM qa_pairs WHERE is_banter = 1" ).fetchone()[0] console.print( f"[bold]Flagged as banter:[/bold] {banter_count} " f"({100 * banter_count / scored:.1f}% of scored)" if scored else "[bold]Flagged as banter:[/bold] 0" ) # --------------------------------------------------------------------------- # Modes: smoke, full # --------------------------------------------------------------------------- def run_smoke( conn: sqlite3.Connection, base_url: str, model: str, sample_size: int, ) -> None: """Print classifications for the first N rows. No DB writes.""" rows = conn.execute( "SELECT id, question_text, answer_text, caller_name " "FROM qa_pairs ORDER BY id LIMIT ?", (sample_size,), ).fetchall() console.print( f"[bold]Smoke test:[/bold] classifying first {len(rows)} rows " f"(no DB writes)\n" ) for row in rows: qid, q_text, a_text, caller = row console.rule(f"[bold]Row {qid}[/bold] caller={caller or '-'}") q_excerpt = (q_text or "").strip().replace("\n", " ")[:240] a_excerpt = (a_text or "").strip().replace("\n", " ")[:240] console.print(f"[cyan]Q:[/cyan] {q_excerpt}") console.print(f"[cyan]A:[/cyan] {a_excerpt}") try: t0 = time.monotonic() result = classify_one(base_url, model, q_text or "", a_text or "") dt = time.monotonic() - t0 except Exception as e: console.print(f"[red][ERROR][/red] {e}") continue if result is None: console.print("[yellow][WARNING][/yellow] classifier returned no result") continue console.print( f"[green]score={result['usefulness_score']}[/green] " f"class=[magenta]{result['topic_class']}[/magenta] " f"is_banter={result['is_banter']} " f"({dt:.1f}s)" ) console.print(f" reason: {result['reason']}") def run_full( conn: sqlite3.Connection, base_url: str, model: str, rebuild: bool, limit: int | None, batch_size: int, ) -> tuple[int, int, int]: """Walk all eligible rows, classify, persist in batches. Returns (n_processed, n_skipped, n_failed). """ rows = fetch_rows(conn, rebuild=rebuild, limit=limit) if not rows: console.print("[green][OK][/green] no rows to process - everything is already scored") return (0, 0, 0) console.print( f"[bold]Classifying {len(rows)} rows[/bold] via {model} at {base_url} " f"(batch={batch_size}, rebuild={rebuild})" ) pending: list[tuple] = [] n_processed = 0 n_failed = 0 progress = Progress( SpinnerColumn(), TextColumn("[bold blue]classify[/bold blue]"), BarColumn(bar_width=40), MofNCompleteColumn(), TaskProgressColumn(), TimeElapsedColumn(), TimeRemainingColumn(), TextColumn("{task.fields[stat]}"), console=console, ) with progress: task = progress.add_task("rows", total=len(rows), stat="") for row in rows: qid = row["id"] try: result = classify_one( base_url, model, row["question_text"] or "", row["answer_text"] or "", ) except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as e: console.print(f"[red][ERROR][/red] row {qid}: network failure: {e}") result = None except Exception as e: console.print( f"[red][ERROR][/red] row {qid}: unexpected {type(e).__name__}: {e}" ) result = None if result is None: n_failed += 1 else: pending.append( ( result["usefulness_score"], result["topic_class"], result["is_banter"], qid, ) ) n_processed += 1 if len(pending) >= batch_size: commit_batch(conn, pending) pending.clear() progress.update( task, advance=1, stat=f"ok={n_processed} fail={n_failed}", ) # Flush remainder. commit_batch(conn, pending) pending.clear() return (n_processed, 0, n_failed) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main() -> int: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--db", default=str(DEFAULT_DB), help="Path to archive.db") ap.add_argument( "--model", default=DEFAULT_MODEL, help=f"Ollama model name (default: {DEFAULT_MODEL})", ) ap.add_argument( "--ollama", default=None, help="Ollama base URL (default: auto-discover localhost / Tailscale)", ) ap.add_argument( "--limit", type=int, default=None, help="Process at most N rows (sampling mode)", ) ap.add_argument( "--rebuild", action="store_true", help="Re-classify rows that already have a usefulness_score", ) ap.add_argument( "--smoke", action="store_true", help=f"Print {DEFAULT_SMOKE_LIMIT}-row sample without writing to DB", ) ap.add_argument( "--batch", type=int, default=DEFAULT_BATCH_SIZE, help=f"Commit every N rows (default {DEFAULT_BATCH_SIZE})", ) args = ap.parse_args() db_path = Path(args.db) if not db_path.exists(): console.print(f"[red][ERROR][/red] DB not found: {db_path}") return 1 try: base_url = args.ollama or find_ollama() except RuntimeError as e: console.print(f"[red][ERROR][/red] {e}") return 1 console.print(f"[INFO] Ollama: {base_url} model: {args.model}") # For smoke mode we open RO; full mode needs RW. if args.smoke: conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) conn.row_factory = sqlite3.Row try: run_smoke(conn, base_url, args.model, DEFAULT_SMOKE_LIMIT) finally: conn.close() return 0 # Confirm migration has run before we try to write. conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cols = {row[1] for row in conn.execute("PRAGMA table_info(qa_pairs)").fetchall()} missing = {"usefulness_score", "topic_class", "is_banter"} - cols if missing: console.print( f"[red][ERROR][/red] qa_pairs is missing columns: {sorted(missing)}. " "Run import_to_sqlite.py first (it auto-migrates)." ) conn.close() return 1 t0 = time.monotonic() try: n_ok, _n_skip, n_fail = run_full( conn, base_url, args.model, rebuild=args.rebuild, limit=args.limit, batch_size=args.batch, ) finally: elapsed = time.monotonic() - t0 summarize(conn) conn.close() console.print( f"\n[bold]Done in {elapsed:.1f}s[/bold] " f"processed={n_ok} failed={n_fail}" ) if n_fail > 0: return 2 return 0 if __name__ == "__main__": sys.exit(main())