Adds an Ollama-based content quality classifier and exposes the results via the search API. 1,407 existing Q/A pairs were scored in 3.5h via qwen3:14b (1,405 succeeded, 2 failed). Distribution: 37% scored 4-5 (useful), 41% scored 1-2 (banter/promo/ off-topic). 43% flagged as banter overall. Default-on filtering at search time will hide ~half of the noise without losing any real listener questions. Files: - new classify_qa_quality.py: walks qa_pairs, calls Ollama qwen3:14b per row, writes usefulness_score/topic_class/is_banter back to DB. Idempotent (--rebuild to reprocess), --smoke for sample check, --limit for partial runs. Detached run handles 1407 rows in ~3.5h on a 4090. - server/main.py: /api/search accepts min_score (0-5) and exclude_banter query params. NULL scores treat as "include" so unprocessed rows still appear. Episode detail endpoint includes the new fields in qa results. Schema migration in import_to_sqlite.py was made by the same agent run (visible on the live archive.db: usefulness_score / topic_class / is_banter columns now exist on qa_pairs). Local archive.db updated; Jupiter container has NOT been redeployed yet — that is a separate manual step. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
582 lines
19 KiB
Python
582 lines
19 KiB
Python
"""
|
|
Classify each row in qa_pairs for usefulness to listeners.
|
|
|
|
Walks the qa_pairs table and uses Ollama (qwen3:14b) to score each Q/A pair on:
|
|
- usefulness_score : 1..5 (5 = solid tech help, 1 = pure banter)
|
|
- topic_class : computer-help | banter | promo | off-topic | unclear
|
|
- is_banter : 0 or 1
|
|
|
|
Idempotent by default: only processes rows where usefulness_score IS NULL.
|
|
Pass --rebuild to re-classify every row, or --limit N to sample N rows.
|
|
|
|
Usage:
|
|
py classify_qa_quality.py [--db PATH] [--limit N] [--rebuild] [--smoke]
|
|
[--ollama URL] [--model NAME] [--batch N]
|
|
|
|
Environment:
|
|
No env vars required. Ollama is auto-discovered from localhost or the
|
|
GURU-BEAST-ROG Tailscale address.
|
|
"""
|
|
import argparse
|
|
import json
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
from rich.console import Console
|
|
from rich.progress import (
|
|
BarColumn,
|
|
MofNCompleteColumn,
|
|
Progress,
|
|
SpinnerColumn,
|
|
TaskProgressColumn,
|
|
TextColumn,
|
|
TimeElapsedColumn,
|
|
TimeRemainingColumn,
|
|
)
|
|
from rich.table import Table
|
|
|
|
BASE = Path(__file__).parent
|
|
DEFAULT_DB = BASE / "archive-data" / "archive.db"
|
|
DEFAULT_MODEL = "qwen3:14b"
|
|
DEFAULT_BATCH_SIZE = 25
|
|
DEFAULT_SMOKE_LIMIT = 10
|
|
OLLAMA_HOSTS = ("http://localhost:11434", "http://100.92.127.64:11434")
|
|
HTTP_TIMEOUT_SECONDS = 120 # qwen3:14b can take a while on cold load
|
|
PING_TIMEOUT_SECONDS = 2
|
|
|
|
VALID_TOPIC_CLASSES = {
|
|
"computer-help",
|
|
"banter",
|
|
"promo",
|
|
"off-topic",
|
|
"unclear",
|
|
}
|
|
|
|
SYSTEM_PROMPT = (
|
|
"You are a classifier for a tech radio show's Q&A archive. "
|
|
"The host (Mike Swanson) takes calls about computer problems. "
|
|
"Sometimes co-hosts or producers ask questions on-air too. "
|
|
"Classify each Q/A pair on whether the answer is useful to listeners "
|
|
"seeking computer help. Return strict JSON, no prose."
|
|
)
|
|
|
|
USER_TEMPLATE = """Question: {question}
|
|
Answer: {answer}
|
|
|
|
Return JSON:
|
|
{{
|
|
"usefulness_score": <1-5>,
|
|
"topic_class": <"computer-help" | "banter" | "promo" | "off-topic" | "unclear">,
|
|
"is_banter": <true|false>,
|
|
"reason": "<one short sentence>"
|
|
}}
|
|
|
|
Scoring guide:
|
|
5 = clear computer/tech question with substantive technical answer (Windows, Mac, hardware, software, network, security, etc.)
|
|
4 = useful tech topic but lighter answer or partially off-topic
|
|
3 = mixed / borderline / general advice
|
|
2 = mostly banter, joke, or aside; minimal useful content
|
|
1 = pure banter, social, ad-read, or unrelated chat
|
|
|
|
is_banter is true when the exchange is conversational filler, jokes, or social interaction without listener-actionable content - even if asked by a "caller". A producer asking a real tech question is NOT banter.
|
|
"""
|
|
|
|
# Truncate Q/A text we send to the model. The classifier cares about the
|
|
# nature of the exchange, not exhaustive content - so capping at ~1.2 KB Q +
|
|
# ~2 KB A keeps prompts small and fast without losing classification signal.
|
|
MAX_QUESTION_CHARS = 1200
|
|
MAX_ANSWER_CHARS = 2000
|
|
|
|
console = Console()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Ollama discovery + chat
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def find_ollama() -> str:
|
|
"""Return the first reachable Ollama base URL.
|
|
|
|
Mirrors the project convention: try localhost, then the Tailscale address
|
|
of the workstation that's expected to run the model. Raises if neither
|
|
answers within PING_TIMEOUT_SECONDS.
|
|
"""
|
|
for url in OLLAMA_HOSTS:
|
|
try:
|
|
with urllib.request.urlopen(url + "/api/tags", timeout=PING_TIMEOUT_SECONDS):
|
|
return url
|
|
except Exception:
|
|
continue
|
|
raise RuntimeError(
|
|
"No Ollama instance reachable. Tried: " + ", ".join(OLLAMA_HOSTS)
|
|
)
|
|
|
|
|
|
def ollama_chat(base_url: str, model: str, messages: list[dict]) -> str:
|
|
"""POST to /api/chat and return the assistant message content.
|
|
|
|
Uses temperature=0 + format='json' so the model is biased toward returning
|
|
valid JSON. format='json' is an Ollama-native flag that constrains output
|
|
to the JSON grammar.
|
|
"""
|
|
body = json.dumps(
|
|
{
|
|
"model": model,
|
|
"messages": messages,
|
|
"stream": False,
|
|
"format": "json",
|
|
"options": {"temperature": 0},
|
|
}
|
|
).encode("utf-8")
|
|
req = urllib.request.Request(
|
|
base_url + "/api/chat",
|
|
data=body,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
with urllib.request.urlopen(req, timeout=HTTP_TIMEOUT_SECONDS) as resp:
|
|
payload = json.loads(resp.read().decode("utf-8"))
|
|
return payload["message"]["content"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Response parsing
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _extract_json_object(raw: str) -> dict:
|
|
"""Locate the first '{...}' span in the response and json.loads it.
|
|
|
|
qwen3 with format='json' usually returns clean JSON, but tolerate stray
|
|
whitespace, code fences, or thinking tags by clipping to the outermost
|
|
braces before parsing.
|
|
"""
|
|
start = raw.find("{")
|
|
end = raw.rfind("}")
|
|
if start < 0 or end <= start:
|
|
raise ValueError("no JSON object found in model response")
|
|
return json.loads(raw[start : end + 1])
|
|
|
|
|
|
def _normalize(parsed: dict) -> dict:
|
|
"""Validate + coerce a parsed response into the canonical shape.
|
|
|
|
Returns a dict with keys: usefulness_score (int 1..5), topic_class (str
|
|
from VALID_TOPIC_CLASSES), is_banter (0|1), reason (str).
|
|
Raises ValueError for any out-of-range or missing field.
|
|
"""
|
|
score = parsed.get("usefulness_score")
|
|
if isinstance(score, str):
|
|
score = score.strip()
|
|
if score.isdigit():
|
|
score = int(score)
|
|
if not isinstance(score, int) or not (1 <= score <= 5):
|
|
raise ValueError(f"usefulness_score out of range: {parsed.get('usefulness_score')!r}")
|
|
|
|
topic_class = parsed.get("topic_class")
|
|
if not isinstance(topic_class, str):
|
|
raise ValueError(f"topic_class not a string: {topic_class!r}")
|
|
topic_class = topic_class.strip().lower()
|
|
if topic_class not in VALID_TOPIC_CLASSES:
|
|
raise ValueError(f"topic_class not recognized: {topic_class!r}")
|
|
|
|
is_banter_raw = parsed.get("is_banter")
|
|
if isinstance(is_banter_raw, bool):
|
|
is_banter = 1 if is_banter_raw else 0
|
|
elif isinstance(is_banter_raw, (int, float)):
|
|
is_banter = 1 if int(is_banter_raw) else 0
|
|
elif isinstance(is_banter_raw, str):
|
|
v = is_banter_raw.strip().lower()
|
|
if v in ("true", "yes", "1"):
|
|
is_banter = 1
|
|
elif v in ("false", "no", "0"):
|
|
is_banter = 0
|
|
else:
|
|
raise ValueError(f"is_banter not boolean-coercible: {is_banter_raw!r}")
|
|
else:
|
|
raise ValueError(f"is_banter missing or unrecognized: {is_banter_raw!r}")
|
|
|
|
reason = parsed.get("reason") or ""
|
|
if not isinstance(reason, str):
|
|
reason = str(reason)
|
|
return {
|
|
"usefulness_score": score,
|
|
"topic_class": topic_class,
|
|
"is_banter": is_banter,
|
|
"reason": reason.strip(),
|
|
}
|
|
|
|
|
|
def classify_one(
|
|
base_url: str,
|
|
model: str,
|
|
question: str,
|
|
answer: str,
|
|
) -> dict | None:
|
|
"""Classify a single Q/A pair. Returns normalized dict or None on failure.
|
|
|
|
Retries once on JSON-parse failure with a corrective follow-up message.
|
|
Network/HTTP errors propagate to the caller (treated as transient).
|
|
"""
|
|
q = (question or "")[:MAX_QUESTION_CHARS]
|
|
a = (answer or "")[:MAX_ANSWER_CHARS]
|
|
user_msg = USER_TEMPLATE.format(question=q, answer=a)
|
|
messages = [
|
|
{"role": "system", "content": SYSTEM_PROMPT},
|
|
{"role": "user", "content": user_msg},
|
|
]
|
|
raw = ollama_chat(base_url, model, messages)
|
|
try:
|
|
return _normalize(_extract_json_object(raw))
|
|
except (ValueError, json.JSONDecodeError) as first_err:
|
|
# One retry: tell the model its previous reply was invalid and ask
|
|
# for a clean JSON object.
|
|
messages.append({"role": "assistant", "content": raw})
|
|
messages.append(
|
|
{
|
|
"role": "user",
|
|
"content": (
|
|
"Your last reply wasn't valid JSON in the required shape. "
|
|
"Return ONLY a JSON object with keys: usefulness_score (1-5 integer), "
|
|
'topic_class (one of "computer-help","banter","promo","off-topic","unclear"), '
|
|
"is_banter (true/false), reason (string)."
|
|
),
|
|
}
|
|
)
|
|
try:
|
|
raw2 = ollama_chat(base_url, model, messages)
|
|
return _normalize(_extract_json_object(raw2))
|
|
except (ValueError, json.JSONDecodeError) as second_err:
|
|
console.print(
|
|
f"[yellow][WARNING][/yellow] JSON parse failed twice "
|
|
f"(first: {first_err}; second: {second_err}); skipping row"
|
|
)
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DB helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def fetch_rows(
|
|
conn: sqlite3.Connection,
|
|
rebuild: bool,
|
|
limit: int | None,
|
|
) -> list[sqlite3.Row]:
|
|
"""Pull rows that need classification, in deterministic id order."""
|
|
sql = (
|
|
"SELECT id, question_text, answer_text, caller_name "
|
|
"FROM qa_pairs"
|
|
)
|
|
if not rebuild:
|
|
sql += " WHERE usefulness_score IS NULL"
|
|
sql += " ORDER BY id"
|
|
if limit is not None and limit > 0:
|
|
sql += f" LIMIT {int(limit)}"
|
|
return conn.execute(sql).fetchall()
|
|
|
|
|
|
def commit_batch(conn: sqlite3.Connection, updates: list[tuple]) -> None:
|
|
"""Flush a batch of (score, topic_class, is_banter, id) tuples."""
|
|
if not updates:
|
|
return
|
|
with conn:
|
|
conn.executemany(
|
|
"UPDATE qa_pairs "
|
|
"SET usefulness_score = ?, topic_class = ?, is_banter = ? "
|
|
"WHERE id = ?",
|
|
updates,
|
|
)
|
|
|
|
|
|
def summarize(conn: sqlite3.Connection) -> None:
|
|
"""Print final distribution tables to stdout."""
|
|
total = conn.execute("SELECT COUNT(*) FROM qa_pairs").fetchone()[0]
|
|
scored = conn.execute(
|
|
"SELECT COUNT(*) FROM qa_pairs WHERE usefulness_score IS NOT NULL"
|
|
).fetchone()[0]
|
|
console.print()
|
|
console.print(
|
|
f"[bold]Coverage:[/bold] {scored} / {total} rows scored "
|
|
f"({100 * scored / total:.1f}%)"
|
|
)
|
|
|
|
by_class = conn.execute(
|
|
"SELECT topic_class, COUNT(*) AS n FROM qa_pairs "
|
|
"WHERE topic_class IS NOT NULL GROUP BY topic_class ORDER BY n DESC"
|
|
).fetchall()
|
|
if by_class:
|
|
t = Table(title="Distribution by topic_class", show_lines=False)
|
|
t.add_column("topic_class")
|
|
t.add_column("count", justify="right")
|
|
t.add_column("share", justify="right")
|
|
for row in by_class:
|
|
share = 100 * row[1] / scored if scored else 0
|
|
t.add_row(row[0], str(row[1]), f"{share:.1f}%")
|
|
console.print(t)
|
|
|
|
by_score = conn.execute(
|
|
"SELECT usefulness_score, COUNT(*) AS n FROM qa_pairs "
|
|
"WHERE usefulness_score IS NOT NULL GROUP BY usefulness_score "
|
|
"ORDER BY usefulness_score DESC"
|
|
).fetchall()
|
|
if by_score:
|
|
t = Table(title="Distribution by usefulness_score", show_lines=False)
|
|
t.add_column("score", justify="right")
|
|
t.add_column("count", justify="right")
|
|
t.add_column("share", justify="right")
|
|
for row in by_score:
|
|
share = 100 * row[1] / scored if scored else 0
|
|
t.add_row(str(row[0]), str(row[1]), f"{share:.1f}%")
|
|
console.print(t)
|
|
|
|
banter_count = conn.execute(
|
|
"SELECT COUNT(*) FROM qa_pairs WHERE is_banter = 1"
|
|
).fetchone()[0]
|
|
console.print(
|
|
f"[bold]Flagged as banter:[/bold] {banter_count} "
|
|
f"({100 * banter_count / scored:.1f}% of scored)"
|
|
if scored
|
|
else "[bold]Flagged as banter:[/bold] 0"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Modes: smoke, full
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def run_smoke(
|
|
conn: sqlite3.Connection,
|
|
base_url: str,
|
|
model: str,
|
|
sample_size: int,
|
|
) -> None:
|
|
"""Print classifications for the first N rows. No DB writes."""
|
|
rows = conn.execute(
|
|
"SELECT id, question_text, answer_text, caller_name "
|
|
"FROM qa_pairs ORDER BY id LIMIT ?",
|
|
(sample_size,),
|
|
).fetchall()
|
|
console.print(
|
|
f"[bold]Smoke test:[/bold] classifying first {len(rows)} rows "
|
|
f"(no DB writes)\n"
|
|
)
|
|
for row in rows:
|
|
qid, q_text, a_text, caller = row
|
|
console.rule(f"[bold]Row {qid}[/bold] caller={caller or '-'}")
|
|
q_excerpt = (q_text or "").strip().replace("\n", " ")[:240]
|
|
a_excerpt = (a_text or "").strip().replace("\n", " ")[:240]
|
|
console.print(f"[cyan]Q:[/cyan] {q_excerpt}")
|
|
console.print(f"[cyan]A:[/cyan] {a_excerpt}")
|
|
try:
|
|
t0 = time.monotonic()
|
|
result = classify_one(base_url, model, q_text or "", a_text or "")
|
|
dt = time.monotonic() - t0
|
|
except Exception as e:
|
|
console.print(f"[red][ERROR][/red] {e}")
|
|
continue
|
|
if result is None:
|
|
console.print("[yellow][WARNING][/yellow] classifier returned no result")
|
|
continue
|
|
console.print(
|
|
f"[green]score={result['usefulness_score']}[/green] "
|
|
f"class=[magenta]{result['topic_class']}[/magenta] "
|
|
f"is_banter={result['is_banter']} "
|
|
f"({dt:.1f}s)"
|
|
)
|
|
console.print(f" reason: {result['reason']}")
|
|
|
|
|
|
def run_full(
|
|
conn: sqlite3.Connection,
|
|
base_url: str,
|
|
model: str,
|
|
rebuild: bool,
|
|
limit: int | None,
|
|
batch_size: int,
|
|
) -> tuple[int, int, int]:
|
|
"""Walk all eligible rows, classify, persist in batches.
|
|
|
|
Returns (n_processed, n_skipped, n_failed).
|
|
"""
|
|
rows = fetch_rows(conn, rebuild=rebuild, limit=limit)
|
|
if not rows:
|
|
console.print("[green][OK][/green] no rows to process - everything is already scored")
|
|
return (0, 0, 0)
|
|
|
|
console.print(
|
|
f"[bold]Classifying {len(rows)} rows[/bold] via {model} at {base_url} "
|
|
f"(batch={batch_size}, rebuild={rebuild})"
|
|
)
|
|
|
|
pending: list[tuple] = []
|
|
n_processed = 0
|
|
n_failed = 0
|
|
|
|
progress = Progress(
|
|
SpinnerColumn(),
|
|
TextColumn("[bold blue]classify[/bold blue]"),
|
|
BarColumn(bar_width=40),
|
|
MofNCompleteColumn(),
|
|
TaskProgressColumn(),
|
|
TimeElapsedColumn(),
|
|
TimeRemainingColumn(),
|
|
TextColumn("{task.fields[stat]}"),
|
|
console=console,
|
|
)
|
|
with progress:
|
|
task = progress.add_task("rows", total=len(rows), stat="")
|
|
for row in rows:
|
|
qid = row["id"]
|
|
try:
|
|
result = classify_one(
|
|
base_url,
|
|
model,
|
|
row["question_text"] or "",
|
|
row["answer_text"] or "",
|
|
)
|
|
except (urllib.error.URLError, urllib.error.HTTPError, TimeoutError) as e:
|
|
console.print(f"[red][ERROR][/red] row {qid}: network failure: {e}")
|
|
result = None
|
|
except Exception as e:
|
|
console.print(
|
|
f"[red][ERROR][/red] row {qid}: unexpected {type(e).__name__}: {e}"
|
|
)
|
|
result = None
|
|
|
|
if result is None:
|
|
n_failed += 1
|
|
else:
|
|
pending.append(
|
|
(
|
|
result["usefulness_score"],
|
|
result["topic_class"],
|
|
result["is_banter"],
|
|
qid,
|
|
)
|
|
)
|
|
n_processed += 1
|
|
|
|
if len(pending) >= batch_size:
|
|
commit_batch(conn, pending)
|
|
pending.clear()
|
|
|
|
progress.update(
|
|
task,
|
|
advance=1,
|
|
stat=f"ok={n_processed} fail={n_failed}",
|
|
)
|
|
|
|
# Flush remainder.
|
|
commit_batch(conn, pending)
|
|
pending.clear()
|
|
|
|
return (n_processed, 0, n_failed)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main() -> int:
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument("--db", default=str(DEFAULT_DB), help="Path to archive.db")
|
|
ap.add_argument(
|
|
"--model",
|
|
default=DEFAULT_MODEL,
|
|
help=f"Ollama model name (default: {DEFAULT_MODEL})",
|
|
)
|
|
ap.add_argument(
|
|
"--ollama",
|
|
default=None,
|
|
help="Ollama base URL (default: auto-discover localhost / Tailscale)",
|
|
)
|
|
ap.add_argument(
|
|
"--limit",
|
|
type=int,
|
|
default=None,
|
|
help="Process at most N rows (sampling mode)",
|
|
)
|
|
ap.add_argument(
|
|
"--rebuild",
|
|
action="store_true",
|
|
help="Re-classify rows that already have a usefulness_score",
|
|
)
|
|
ap.add_argument(
|
|
"--smoke",
|
|
action="store_true",
|
|
help=f"Print {DEFAULT_SMOKE_LIMIT}-row sample without writing to DB",
|
|
)
|
|
ap.add_argument(
|
|
"--batch",
|
|
type=int,
|
|
default=DEFAULT_BATCH_SIZE,
|
|
help=f"Commit every N rows (default {DEFAULT_BATCH_SIZE})",
|
|
)
|
|
args = ap.parse_args()
|
|
|
|
db_path = Path(args.db)
|
|
if not db_path.exists():
|
|
console.print(f"[red][ERROR][/red] DB not found: {db_path}")
|
|
return 1
|
|
|
|
try:
|
|
base_url = args.ollama or find_ollama()
|
|
except RuntimeError as e:
|
|
console.print(f"[red][ERROR][/red] {e}")
|
|
return 1
|
|
console.print(f"[INFO] Ollama: {base_url} model: {args.model}")
|
|
|
|
# For smoke mode we open RO; full mode needs RW.
|
|
if args.smoke:
|
|
conn = sqlite3.connect(f"file:{db_path}?mode=ro", uri=True)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
run_smoke(conn, base_url, args.model, DEFAULT_SMOKE_LIMIT)
|
|
finally:
|
|
conn.close()
|
|
return 0
|
|
|
|
# Confirm migration has run before we try to write.
|
|
conn = sqlite3.connect(db_path)
|
|
conn.row_factory = sqlite3.Row
|
|
cols = {row[1] for row in conn.execute("PRAGMA table_info(qa_pairs)").fetchall()}
|
|
missing = {"usefulness_score", "topic_class", "is_banter"} - cols
|
|
if missing:
|
|
console.print(
|
|
f"[red][ERROR][/red] qa_pairs is missing columns: {sorted(missing)}. "
|
|
"Run import_to_sqlite.py first (it auto-migrates)."
|
|
)
|
|
conn.close()
|
|
return 1
|
|
|
|
t0 = time.monotonic()
|
|
try:
|
|
n_ok, _n_skip, n_fail = run_full(
|
|
conn,
|
|
base_url,
|
|
args.model,
|
|
rebuild=args.rebuild,
|
|
limit=args.limit,
|
|
batch_size=args.batch,
|
|
)
|
|
finally:
|
|
elapsed = time.monotonic() - t0
|
|
summarize(conn)
|
|
conn.close()
|
|
|
|
console.print(
|
|
f"\n[bold]Done in {elapsed:.1f}s[/bold] "
|
|
f"processed={n_ok} failed={n_fail}"
|
|
)
|
|
if n_fail > 0:
|
|
return 2
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|