diff --git a/projects/radio-show/audio-processor/classify_qa_quality.py b/projects/radio-show/audio-processor/classify_qa_quality.py new file mode 100644 index 0000000..405dad2 --- /dev/null +++ b/projects/radio-show/audio-processor/classify_qa_quality.py @@ -0,0 +1,581 @@ +""" +Classify each row in qa_pairs for usefulness to listeners. + +Walks the qa_pairs table and uses Ollama (qwen3:14b) to score each Q/A pair on: + - usefulness_score : 1..5 (5 = solid tech help, 1 = pure banter) + - topic_class : computer-help | banter | promo | off-topic | unclear + - is_banter : 0 or 1 + +Idempotent by default: only processes rows where usefulness_score IS NULL. +Pass --rebuild to re-classify every row, or --limit N to sample N rows. + +Usage: + py classify_qa_quality.py [--db PATH] [--limit N] [--rebuild] [--smoke] + [--ollama URL] [--model NAME] [--batch N] + +Environment: + No env vars required. Ollama is auto-discovered from localhost or the + GURU-BEAST-ROG Tailscale address. +""" +import argparse +import json +import sqlite3 +import sys +import time +import urllib.error +import urllib.request +from pathlib import Path + +from rich.console import Console +from rich.progress import ( + BarColumn, + MofNCompleteColumn, + Progress, + SpinnerColumn, + TaskProgressColumn, + TextColumn, + TimeElapsedColumn, + TimeRemainingColumn, +) +from rich.table import Table + +BASE = Path(__file__).parent +DEFAULT_DB = BASE / "archive-data" / "archive.db" +DEFAULT_MODEL = "qwen3:14b" +DEFAULT_BATCH_SIZE = 25 +DEFAULT_SMOKE_LIMIT = 10 +OLLAMA_HOSTS = ("http://localhost:11434", "http://100.92.127.64:11434") +HTTP_TIMEOUT_SECONDS = 120 # qwen3:14b can take a while on cold load +PING_TIMEOUT_SECONDS = 2 + +VALID_TOPIC_CLASSES = { + "computer-help", + "banter", + "promo", + "off-topic", + "unclear", +} + +SYSTEM_PROMPT = ( + "You are a classifier for a tech radio show's Q&A archive. " + "The host (Mike Swanson) takes calls about computer problems. " + "Sometimes co-hosts or producers ask questions on-air too. " + "Classify each Q/A pair on whether the answer is useful to listeners " + "seeking computer help. Return strict JSON, no prose." +) + +USER_TEMPLATE = """Question: {question} +Answer: {answer} + +Return JSON: +{{ + "usefulness_score": <1-5>, + "topic_class": <"computer-help" | "banter" | "promo" | "off-topic" | "unclear">, + "is_banter": , + "reason": "" +}} + +Scoring guide: +5 = clear computer/tech question with substantive technical answer (Windows, Mac, hardware, software, network, security, etc.) +4 = useful tech topic but lighter answer or partially off-topic +3 = mixed / borderline / general advice +2 = mostly banter, joke, or aside; minimal useful content +1 = pure banter, social, ad-read, or unrelated chat + +is_banter is true when the exchange is conversational filler, jokes, or social interaction without listener-actionable content - even if asked by a "caller". A producer asking a real tech question is NOT banter. +""" + +# Truncate Q/A text we send to the model. The classifier cares about the +# nature of the exchange, not exhaustive content - so capping at ~1.2 KB Q + +# ~2 KB A keeps prompts small and fast without losing classification signal. +MAX_QUESTION_CHARS = 1200 +MAX_ANSWER_CHARS = 2000 + +console = Console() + + +# --------------------------------------------------------------------------- +# Ollama discovery + chat +# --------------------------------------------------------------------------- + +def find_ollama() -> str: + """Return the first reachable Ollama base URL. + + Mirrors the project convention: try localhost, then the Tailscale address + of the workstation that's expected to run the model. Raises if neither + answers within PING_TIMEOUT_SECONDS. + """ + for url in OLLAMA_HOSTS: + try: + with urllib.request.urlopen(url + "/api/tags", timeout=PING_TIMEOUT_SECONDS): + return url + except Exception: + continue + raise RuntimeError( + "No Ollama instance reachable. Tried: " + ", ".join(OLLAMA_HOSTS) + ) + + +def ollama_chat(base_url: str, model: str, messages: list[dict]) -> str: + """POST to /api/chat and return the assistant message content. + + Uses temperature=0 + format='json' so the model is biased toward returning + valid JSON. format='json' is an Ollama-native flag that constrains output + to the JSON grammar. + """ + body = json.dumps( + { + "model": model, + "messages": messages, + "stream": False, + "format": "json", + "options": {"temperature": 0}, + } + ).encode("utf-8") + req = urllib.request.Request( + base_url + "/api/chat", + data=body, + headers={"Content-Type": "application/json"}, + method="POST", + ) + with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT_SECONDS) as resp: + payload = json.loads(resp.read().decode("utf-8")) + return payload["message"]["content"] + + +# --------------------------------------------------------------------------- +# Response parsing +# --------------------------------------------------------------------------- + +def _extract_json_object(raw: str) -> dict: + """Locate the first '{...}' span in the response and json.loads it. + + qwen3 with format='json' usually returns clean JSON, but tolerate stray + whitespace, code fences, or thinking tags by clipping to the outermost + braces before parsing. + """ + start = raw.find("{") + end = raw.rfind("}") + if start < 0 or end <= start: + raise ValueError("no JSON object found in model response") + return json.loads(raw[start : end + 1]) + + +def _normalize(parsed: dict) -> dict: + """Validate + coerce a parsed response into the canonical shape. + + Returns a dict with keys: usefulness_score (int 1..5), topic_class (str + from VALID_TOPIC_CLASSES), is_banter (0|1), reason (str). + Raises ValueError for any out-of-range or missing field. + """ + score = parsed.get("usefulness_score") + if isinstance(score, str): + score = score.strip() + if score.isdigit(): + score = int(score) + if not isinstance(score, int) or not (1 <= score <= 5): + raise ValueError(f"usefulness_score out of range: {parsed.get('usefulness_score')!r}") + + topic_class = parsed.get("topic_class") + if not isinstance(topic_class, str): + raise ValueError(f"topic_class not a string: {topic_class!r}") + topic_class = topic_class.strip().lower() + if topic_class not in VALID_TOPIC_CLASSES: + raise ValueError(f"topic_class not recognized: {topic_class!r}") + + is_banter_raw = parsed.get("is_banter") + if isinstance(is_banter_raw, bool): + is_banter = 1 if is_banter_raw else 0 + elif isinstance(is_banter_raw, (int, float)): + is_banter = 1 if int(is_banter_raw) else 0 + elif isinstance(is_banter_raw, str): + v = is_banter_raw.strip().lower() + if v in ("true", "yes", "1"): + is_banter = 1 + elif v in ("false", "no", "0"): + is_banter = 0 + else: + raise ValueError(f"is_banter not boolean-coercible: {is_banter_raw!r}") + else: + raise ValueError(f"is_banter missing or unrecognized: {is_banter_raw!r}") + + reason = parsed.get("reason") or "" + if not isinstance(reason, str): + reason = str(reason) + return { + "usefulness_score": score, + "topic_class": topic_class, + "is_banter": is_banter, + "reason": reason.strip(), + } + + +def classify_one( + base_url: str, + model: str, + question: str, + answer: str, +) -> dict | None: + """Classify a single Q/A pair. Returns normalized dict or None on failure. + + Retries once on JSON-parse failure with a corrective follow-up message. + Network/HTTP errors propagate to the caller (treated as transient). + """ + q = (question or "")[:MAX_QUESTION_CHARS] + a = (answer or "")[:MAX_ANSWER_CHARS] + user_msg = USER_TEMPLATE.format(question=q, answer=a) + messages = [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": user_msg}, + ] + raw = ollama_chat(base_url, model, messages) + try: + return _normalize(_extract_json_object(raw)) + except (ValueError, json.JSONDecodeError) as first_err: + # One retry: tell the model its previous reply was invalid and ask + # for a clean JSON object. + messages.append({"role": "assistant", "content": raw}) + messages.append( + { + "role": "user", + "content": ( + "Your last reply wasn't valid JSON in the required shape. " + "Return ONLY a JSON object with keys: usefulness_score (1-5 integer), " + 'topic_class (one of "computer-help","banter","promo","off-topic","unclear"), ' + "is_banter (true/false), reason (string)." + ), + } + ) + try: + raw2 = ollama_chat(base_url, model, messages) + return _normalize(_extract_json_object(raw2)) + except (ValueError, json.JSONDecodeError) as second_err: + console.print( + f"[yellow][WARNING][/yellow] JSON parse failed twice " + f"(first: {first_err}; second: {second_err}); skipping row" + ) + return None + + +# --------------------------------------------------------------------------- +# DB helpers +# --------------------------------------------------------------------------- + +def fetch_rows( + conn: sqlite3.Connection, + rebuild: bool, + limit: int | None, +) -> list[sqlite3.Row]: + """Pull rows that need classification, in deterministic id order.""" + sql = ( + "SELECT id, question_text, answer_text, caller_name " + "FROM qa_pairs" + ) + if not rebuild: + sql += " WHERE usefulness_score IS NULL" + sql += " ORDER BY id" + if limit is not None and limit > 0: + sql += f" LIMIT {int(limit)}" + return conn.execute(sql).fetchall() + + +def commit_batch(conn: sqlite3.Connection, updates: list[tuple]) -> None: + """Flush a batch of (score, topic_class, is_banter, id) tuples.""" + if not updates: + return + with conn: + conn.executemany( + "UPDATE qa_pairs " + "SET usefulness_score = ?, topic_class = ?, is_banter = ? " + "WHERE id = ?", + updates, + ) + + +def summarize(conn: sqlite3.Connection) -> None: + """Print final distribution tables to stdout.""" + total = conn.execute("SELECT COUNT(*) FROM qa_pairs").fetchone()[0] + scored = conn.execute( + "SELECT COUNT(*) FROM qa_pairs WHERE usefulness_score IS NOT NULL" + ).fetchone()[0] + console.print() + console.print( + f"[bold]Coverage:[/bold] {scored} / {total} rows scored " + f"({100 * scored / total:.1f}%)" + ) + + by_class = conn.execute( + "SELECT topic_class, COUNT(*) AS n FROM qa_pairs " + "WHERE topic_class IS NOT NULL GROUP BY topic_class ORDER BY n DESC" + ).fetchall() + if by_class: + t = Table(title="Distribution by topic_class", show_lines=False) + t.add_column("topic_class") + t.add_column("count", justify="right") + t.add_column("share", justify="right") + for row in by_class: + share = 100 * row[1] / scored if scored else 0 + t.add_row(row[0], str(row[1]), f"{share:.1f}%") + console.print(t) + + by_score = conn.execute( + "SELECT usefulness_score, COUNT(*) AS n FROM qa_pairs " + "WHERE usefulness_score IS NOT NULL GROUP BY usefulness_score " + "ORDER BY usefulness_score DESC" + ).fetchall() + if by_score: + t = Table(title="Distribution by usefulness_score", show_lines=False) + t.add_column("score", justify="right") + t.add_column("count", justify="right") + t.add_column("share", justify="right") + for row in by_score: + share = 100 * row[1] / scored if scored else 0 + t.add_row(str(row[0]), str(row[1]), f"{share:.1f}%") + console.print(t) + + banter_count = conn.execute( + "SELECT COUNT(*) FROM qa_pairs WHERE is_banter = 1" + ).fetchone()[0] + console.print( + f"[bold]Flagged as banter:[/bold] {banter_count} " + f"({100 * banter_count / scored:.1f}% of scored)" + if scored + else "[bold]Flagged as banter:[/bold] 0" + ) + + +# --------------------------------------------------------------------------- +# Modes: smoke, full +# --------------------------------------------------------------------------- + +def run_smoke( + conn: sqlite3.Connection, + base_url: str, + model: str, + sample_size: int, +) -> None: + """Print classifications for the first N rows. No DB writes.""" + rows = conn.execute( + "SELECT id, question_text, answer_text, caller_name " + "FROM qa_pairs ORDER BY id LIMIT ?", + (sample_size,), + ).fetchall() + console.print( + f"[bold]Smoke test:[/bold] classifying first {len(rows)} rows " + f"(no DB writes)\n" + ) + for row in rows: + qid, q_text, a_text, caller = row + console.rule(f"[bold]Row {qid}[/bold] caller={caller or '-'}") + q_excerpt = (q_text or "").strip().replace("\n", " ")[:240] + a_excerpt = (a_text or "").strip().replace("\n", " ")[:240] + console.print(f"[cyan]Q:[/cyan] {q_excerpt}") + console.print(f"[cyan]A:[/cyan] {a_excerpt}") + try: + t0 = time.monotonic() + result = classify_one(base_url, model, q_text or "", a_text or "") + dt = time.monotonic() - t0 + except Exception as e: + console.print(f"[red][ERROR][/red] {e}") + continue + if result is None: + console.print("[yellow][WARNING][/yellow] classifier returned no result") + continue + console.print( + f"[green]score={result['usefulness_score']}[/green] " + f"class=[magenta]{result['topic_class']}[/magenta] " + f"is_banter={result['is_banter']} " + f"({dt:.1f}s)" + ) + console.print(f" reason: {result['reason']}") + + +def run_full( + conn: sqlite3.Connection, + base_url: str, + model: str, + rebuild: bool, + limit: int | None, + batch_size: int, +) -> tuple[int, int, int]: + """Walk all eligible rows, classify, persist in batches. + + Returns (n_processed, n_skipped, n_failed). + """ + rows = fetch_rows(conn, rebuild=rebuild, limit=limit) + if not rows: + console.print("[green][OK][/green] no rows to process - everything is already scored") + return (0, 0, 0) + + console.print( + f"[bold]Classifying {len(rows)} rows[/bold] via {model} at {base_url} " + f"(batch={batch_size}, rebuild={rebuild})" + ) + + pending: list[tuple] = [] + n_processed = 0 + n_failed = 0 + + progress = Progress( + SpinnerColumn(), + TextColumn("[bold blue]classify[/bold blue]"), + BarColumn(bar_width=40), + MofNCompleteColumn(), + TaskProgressColumn(), + TimeElapsedColumn(), + TimeRemainingColumn(), + TextColumn("{task.fields[stat]}"), + console=console, + ) + with progress: + task = progress.add_task("rows", total=len(rows), stat="") + for row in rows: + qid = row["id"] + try: + result = classify_one( + base_url, + model, + row["question_text"] or "", + row["answer_text"] or "", + ) + except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as e: + console.print(f"[red][ERROR][/red] row {qid}: network failure: {e}") + result = None + except Exception as e: + console.print( + f"[red][ERROR][/red] row {qid}: unexpected {type(e).__name__}: {e}" + ) + result = None + + if result is None: + n_failed += 1 + else: + pending.append( + ( + result["usefulness_score"], + result["topic_class"], + result["is_banter"], + qid, + ) + ) + n_processed += 1 + + if len(pending) >= batch_size: + commit_batch(conn, pending) + pending.clear() + + progress.update( + task, + advance=1, + stat=f"ok={n_processed} fail={n_failed}", + ) + + # Flush remainder. + commit_batch(conn, pending) + pending.clear() + + return (n_processed, 0, n_failed) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--db", default=str(DEFAULT_DB), help="Path to archive.db") + ap.add_argument( + "--model", + default=DEFAULT_MODEL, + help=f"Ollama model name (default: {DEFAULT_MODEL})", + ) + ap.add_argument( + "--ollama", + default=None, + help="Ollama base URL (default: auto-discover localhost / Tailscale)", + ) + ap.add_argument( + "--limit", + type=int, + default=None, + help="Process at most N rows (sampling mode)", + ) + ap.add_argument( + "--rebuild", + action="store_true", + help="Re-classify rows that already have a usefulness_score", + ) + ap.add_argument( + "--smoke", + action="store_true", + help=f"Print {DEFAULT_SMOKE_LIMIT}-row sample without writing to DB", + ) + ap.add_argument( + "--batch", + type=int, + default=DEFAULT_BATCH_SIZE, + help=f"Commit every N rows (default {DEFAULT_BATCH_SIZE})", + ) + args = ap.parse_args() + + db_path = Path(args.db) + if not db_path.exists(): + console.print(f"[red][ERROR][/red] DB not found: {db_path}") + return 1 + + try: + base_url = args.ollama or find_ollama() + except RuntimeError as e: + console.print(f"[red][ERROR][/red] {e}") + return 1 + console.print(f"[INFO] Ollama: {base_url} model: {args.model}") + + # For smoke mode we open RO; full mode needs RW. + if args.smoke: + conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True) + conn.row_factory = sqlite3.Row + try: + run_smoke(conn, base_url, args.model, DEFAULT_SMOKE_LIMIT) + finally: + conn.close() + return 0 + + # Confirm migration has run before we try to write. + conn = sqlite3.connect(db_path) + conn.row_factory = sqlite3.Row + cols = {row[1] for row in conn.execute("PRAGMA table_info(qa_pairs)").fetchall()} + missing = {"usefulness_score", "topic_class", "is_banter"} - cols + if missing: + console.print( + f"[red][ERROR][/red] qa_pairs is missing columns: {sorted(missing)}. " + "Run import_to_sqlite.py first (it auto-migrates)." + ) + conn.close() + return 1 + + t0 = time.monotonic() + try: + n_ok, _n_skip, n_fail = run_full( + conn, + base_url, + args.model, + rebuild=args.rebuild, + limit=args.limit, + batch_size=args.batch, + ) + finally: + elapsed = time.monotonic() - t0 + summarize(conn) + conn.close() + + console.print( + f"\n[bold]Done in {elapsed:.1f}s[/bold] " + f"processed={n_ok} failed={n_fail}" + ) + if n_fail > 0: + return 2 + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/projects/radio-show/audio-processor/server/main.py b/projects/radio-show/audio-processor/server/main.py index 7cd71d8..502ae67 100644 --- a/projects/radio-show/audio-processor/server/main.py +++ b/projects/radio-show/audio-processor/server/main.py @@ -82,7 +82,8 @@ def episode_detail(episode_id: int): qa = db.execute( "SELECT id, question_start_sec, question_end_sec, " "answer_start_sec, answer_end_sec, " - "question_text, answer_text, caller_name, caller_role, topic, topic_tags " + "question_text, answer_text, caller_name, caller_role, topic, topic_tags, " + "usefulness_score, topic_class, is_banter " "FROM qa_pairs WHERE episode_id = ? ORDER BY question_start_sec", (episode_id,), ).fetchall() @@ -123,6 +124,10 @@ def search( q: str = Query(..., min_length=2), kind: str = Query("both", pattern="^(both|segments|qa)$"), limit: int = Query(50, ge=1, le=500), + min_score: int = Query(0, ge=0, le=5, + description="Minimum usefulness_score for Q&A hits (0=no filter)"), + exclude_banter: bool = Query(False, + description="Drop Q&A rows where is_banter=1"), ): db: sqlite3.Connection = app.state.db fts_q = fts_escape(q) @@ -151,24 +156,35 @@ def search( ] if kind in ("both", "qa"): - qa_results = [ - dict(r) for r in db.execute( - """ - SELECT e.id AS episode_id, e.year, e.title, e.air_date, - p.id AS qa_id, p.caller_name, - p.question_start_sec, p.answer_start_sec, - snippet(qa_fts, 0, '', '', '...', 16) AS q_snippet, - snippet(qa_fts, 1, '', '', '...', 16) AS a_snippet, - bm25(qa_fts) AS rank - FROM qa_fts - JOIN qa_pairs p ON p.id = qa_fts.rowid - JOIN episodes e ON e.id = p.episode_id - WHERE qa_fts MATCH ? - ORDER BY rank LIMIT ? - """, - (fts_q, limit), - ).fetchall() - ] + # NULL is treated as "unscored, include" so unprocessed rows still + # appear and old saved URLs keep working as the classifier rolls out. + # Filters are applied as additional WHERE clauses on top of the FTS + # MATCH; SQLite's planner can use idx_qa_usefulness once it's helpful. + qa_clauses = ["qa_fts MATCH :q"] + qa_params: dict[str, object] = {"q": fts_q, "limit": limit} + if min_score > 0: + qa_clauses.append( + "(p.usefulness_score IS NULL OR p.usefulness_score >= :min_score)" + ) + qa_params["min_score"] = min_score + if exclude_banter: + qa_clauses.append("(p.is_banter IS NULL OR p.is_banter = 0)") + + qa_sql = f""" + SELECT e.id AS episode_id, e.year, e.title, e.air_date, + p.id AS qa_id, p.caller_name, + p.question_start_sec, p.answer_start_sec, + p.usefulness_score, p.topic_class, p.is_banter, + snippet(qa_fts, 0, '', '', '...', 16) AS q_snippet, + snippet(qa_fts, 1, '', '', '...', 16) AS a_snippet, + bm25(qa_fts) AS rank + FROM qa_fts + JOIN qa_pairs p ON p.id = qa_fts.rowid + JOIN episodes e ON e.id = p.episode_id + WHERE {' AND '.join(qa_clauses)} + ORDER BY rank LIMIT :limit + """ + qa_results = [dict(r) for r in db.execute(qa_sql, qa_params).fetchall()] return {"q": q, "segments": seg_results, "qa": qa_results}