""" Radio archive query server. Read-only FastAPI over the SQLite archive.db. Endpoints: GET / Landing page with search UI GET /api/episodes List all episodes (year, title, duration) GET /api/episodes/{id} Episode detail: intros + qa_pairs GET /api/episodes/{id}/transcript Chronologically merged segments + turns GET /api/search?q=...&kind=... FTS over segments and/or qa_pairs GET /api/callers Top recurring caller_names Config via env: ARCHIVE_DB path to archive.db (default /data/archive.db) PORT listen port (default 8765) """ import json import os import sqlite3 from contextlib import asynccontextmanager from pathlib import Path from fastapi import FastAPI, HTTPException, Query from fastapi.responses import FileResponse, HTMLResponse DB_PATH = os.environ.get("ARCHIVE_DB", "/data/archive.db") PORT = int(os.environ.get("PORT", "8765")) def _connect() -> sqlite3.Connection: if not Path(DB_PATH).exists(): raise RuntimeError(f"Archive DB not found at {DB_PATH}") conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, check_same_thread=False) conn.row_factory = sqlite3.Row return conn @asynccontextmanager async def lifespan(app: FastAPI): app.state.db = _connect() yield app.state.db.close() app = FastAPI(title="Computer Guru Radio Archive", lifespan=lifespan) def fts_escape(q: str) -> str: """Wrap each term in double quotes so FTS5 treats reserved chars literally.""" return " ".join(f'"{tok}"' for tok in q.split() if tok) @app.get("/api/episodes") def list_episodes(year: int | None = None, limit: int = 1000): db: sqlite3.Connection = app.state.db sql = """ SELECT id, year, title, air_date, ROUND(duration_sec/60.0,1) AS minutes, (SELECT COUNT(*) FROM qa_pairs q WHERE q.episode_id = e.id) AS qa_count, (SELECT COUNT(*) FROM intros i WHERE i.episode_id = e.id) AS intro_count FROM episodes e """ params: list = [] if year is not None: sql += " WHERE year = ?" params.append(year) sql += " ORDER BY COALESCE(air_date, '9999') ASC, title ASC LIMIT ?" params.append(limit) rows = db.execute(sql, params).fetchall() return [dict(r) for r in rows] @app.get("/api/episodes/{episode_id}") def episode_detail(episode_id: int): db: sqlite3.Connection = app.state.db ep = db.execute("SELECT * FROM episodes WHERE id = ?", (episode_id,)).fetchone() if not ep: raise HTTPException(404, "episode not found") intros = db.execute( "SELECT name, role_hint, intro_time_sec, affiliation, fillin_for, source_text " "FROM intros WHERE episode_id = ? ORDER BY intro_time_sec", (episode_id,), ).fetchall() qa = db.execute( "SELECT id, question_start_sec, question_end_sec, " "answer_start_sec, answer_end_sec, " "question_text, answer_text, caller_name, caller_role, topic, topic_tags, " "usefulness_score, topic_class, is_banter " "FROM qa_pairs WHERE episode_id = ? ORDER BY question_start_sec", (episode_id,), ).fetchall() return { "episode": dict(ep), "intros": [dict(r) for r in intros], "qa_pairs": [ {**dict(r), "topic_tags": json.loads(r["topic_tags"] or "[]")} for r in qa ], } @app.get("/api/episodes/{episode_id}/transcript") def episode_transcript(episode_id: int): db: sqlite3.Connection = app.state.db ep = db.execute("SELECT id, title, year FROM episodes WHERE id = ?", (episode_id,)).fetchone() if not ep: raise HTTPException(404, "episode not found") segments = db.execute( "SELECT seg_idx, start_sec, end_sec, text FROM segments " "WHERE episode_id = ? ORDER BY seg_idx", (episode_id,), ).fetchall() turns = db.execute( "SELECT speaker, start_sec, end_sec, confidence FROM turns " "WHERE episode_id = ? ORDER BY start_sec", (episode_id,), ).fetchall() return { "episode": dict(ep), "segments": [dict(r) for r in segments], "turns": [dict(r) for r in turns], } @app.get("/api/search") def search( q: str = Query(..., min_length=2), kind: str = Query("both", pattern="^(both|segments|qa)$"), limit: int = Query(50, ge=1, le=500), min_score: int = Query(0, ge=0, le=5, description="Minimum usefulness_score for Q&A hits (0=no filter)"), exclude_banter: bool = Query(False, description="Drop Q&A rows where is_banter=1"), ): db: sqlite3.Connection = app.state.db fts_q = fts_escape(q) if not fts_q: return {"q": q, "segments": [], "qa": []} seg_results = [] qa_results = [] if kind in ("both", "segments"): seg_results = [ dict(r) for r in db.execute( """ SELECT e.id AS episode_id, e.year, e.title, e.air_date, s.start_sec, s.end_sec, snippet(segments_fts, 0, '', '', '...', 16) AS snippet, bm25(segments_fts) AS rank FROM segments_fts JOIN segments s ON s.id = segments_fts.rowid JOIN episodes e ON e.id = s.episode_id WHERE segments_fts MATCH ? ORDER BY rank LIMIT ? """, (fts_q, limit), ).fetchall() ] if kind in ("both", "qa"): # NULL is treated as "unscored, include" so unprocessed rows still # appear and old saved URLs keep working as the classifier rolls out. # Filters are applied as additional WHERE clauses on top of the FTS # MATCH; SQLite's planner can use idx_qa_usefulness once it's helpful. qa_clauses = ["qa_fts MATCH :q"] qa_params: dict[str, object] = {"q": fts_q, "limit": limit} if min_score > 0: qa_clauses.append( "(p.usefulness_score IS NULL OR p.usefulness_score >= :min_score)" ) qa_params["min_score"] = min_score if exclude_banter: qa_clauses.append("(p.is_banter IS NULL OR p.is_banter = 0)") qa_sql = f""" SELECT e.id AS episode_id, e.year, e.title, e.air_date, p.id AS qa_id, p.caller_name, p.question_start_sec, p.answer_start_sec, p.usefulness_score, p.topic_class, p.is_banter, snippet(qa_fts, 0, '', '', '...', 16) AS q_snippet, snippet(qa_fts, 1, '', '', '...', 16) AS a_snippet, bm25(qa_fts) AS rank FROM qa_fts JOIN qa_pairs p ON p.id = qa_fts.rowid JOIN episodes e ON e.id = p.episode_id WHERE {' AND '.join(qa_clauses)} ORDER BY rank LIMIT :limit """ qa_results = [dict(r) for r in db.execute(qa_sql, qa_params).fetchall()] return {"q": q, "segments": seg_results, "qa": qa_results} @app.get("/api/callers") def top_callers(limit: int = 50): db: sqlite3.Connection = app.state.db rows = db.execute( "SELECT caller_name, COUNT(*) AS pairs FROM qa_pairs " "WHERE caller_name IS NOT NULL " "GROUP BY caller_name ORDER BY pairs DESC LIMIT ?", (limit,), ).fetchall() return [dict(r) for r in rows] @app.get("/api/stats") def stats(): db: sqlite3.Connection = app.state.db counts = { t: db.execute(f"SELECT COUNT(*) FROM {t}").fetchone()[0] for t in ("episodes", "segments", "turns", "intros", "qa_pairs") } by_year = [ dict(r) for r in db.execute( "SELECT year, COUNT(*) AS episodes, " "ROUND(SUM(duration_sec)/3600.0, 1) AS hours " "FROM episodes GROUP BY year ORDER BY year" ).fetchall() ] return {"counts": counts, "by_year": by_year} @app.get("/api/db.sqlite") def download_db(): """Stream the read-only archive.db for offline laptop sync. Anyone who can reach /api/search can already read every transcript, so exposing the underlying SQLite file adds no meaningful disclosure. Sync side: curl -o archive.db :/api/db.sqlite """ if not Path(DB_PATH).exists(): raise HTTPException(404, "archive db not present") return FileResponse( DB_PATH, media_type="application/vnd.sqlite3", filename="archive.db", ) @app.get("/", response_class=HTMLResponse) def index(): return INDEX_HTML INDEX_HTML = """ Computer Guru Radio Archive

Computer Guru Radio Archive

...
""" if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=PORT)