""" Radio archive query server. Read-only FastAPI over the SQLite archive.db. Endpoints: GET / Landing page with search UI GET /api/episodes List all episodes (year, title, duration) GET /api/episodes/{id} Episode detail: intros + qa_pairs GET /api/episodes/{id}/transcript Chronologically merged segments + turns GET /api/search?q=...&kind=... FTS over segments and/or qa_pairs GET /api/qa List Q&A pairs (no search query, filterable) GET /api/audio/{id} Stream the episode MP3 (HTTP Range supported) GET /api/callers Top recurring caller_names GET /episode/{id} HTML transcript view with audio player Config via env: ARCHIVE_DB path to archive.db (default /data/archive.db) EPISODES_DIR path to mp3 tree (default /data/episodes) PORT listen port (default 8765) """ import html as _html import json import os import re import sqlite3 from contextlib import asynccontextmanager from pathlib import Path from typing import Iterator from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import FileResponse, HTMLResponse, Response, StreamingResponse DB_PATH = os.environ.get("ARCHIVE_DB", "/data/archive.db") EPISODES_DIR = os.environ.get("EPISODES_DIR", "/data/episodes") PORT = int(os.environ.get("PORT", "8765")) def _connect() -> sqlite3.Connection: if not Path(DB_PATH).exists(): raise RuntimeError(f"Archive DB not found at {DB_PATH}") conn = sqlite3.connect(f"file:{DB_PATH}?mode=ro", uri=True, check_same_thread=False) conn.row_factory = sqlite3.Row return conn @asynccontextmanager async def lifespan(app: FastAPI): app.state.db = _connect() yield app.state.db.close() app = FastAPI(title="Computer Guru Radio Archive", lifespan=lifespan) def fts_escape(q: str) -> str: """Wrap each term in double quotes so FTS5 treats reserved chars literally.""" return " ".join(f'"{tok}"' for tok in q.split() if tok) # Excerpt extraction for Q&A texts. # # Whisper transcripts often start with disfluent run-on chatter that's a # leftover from the previous turn. We trim that prefix, take the first 300 # chars, and try to end on a sentence boundary so the excerpt reads cleanly. _EXCERPT_BODY = 300 # primary character budget _EXCERPT_LOOKAHEAD = 80 # extra chars allowed to find a sentence end _EXCERPT_LEAD_SCAN = 30 # window to look for a leading capital letter def _excerpt(text: str | None) -> str: """Return a short, readable excerpt suitable for browsing. Rules (intentionally simple — see spec): 1. Walk the leading prefix and skip to the first capital letter, but only within the first ~30 chars; otherwise keep the original start. 2. Take the first 300 chars. 3. If that cut lands mid-sentence, look up to 80 more chars ahead for the next .!? and end there. 4. Otherwise back up to the last word boundary and append "..." so we never display half a word. """ if not text: return "" s = text.strip() if not s: return "" # 1. trim disfluent leading run-on to the first capital letter lead_window = s[:_EXCERPT_LEAD_SCAN] cap_match = re.search(r"[A-Z]", lead_window) if cap_match and cap_match.start() > 0: s = s[cap_match.start():] if len(s) <= _EXCERPT_BODY: return s body = s[:_EXCERPT_BODY] # 3. if the body ends mid-sentence, look ahead for a terminator if body and body[-1] not in ".!?": ahead = s[_EXCERPT_BODY:_EXCERPT_BODY + _EXCERPT_LOOKAHEAD] m = re.search(r"[.!?]", ahead) if m: return body + ahead[: m.end()] # 4. back up to last whitespace and ellipsize cut = body.rfind(" ") if cut > 0: return body[:cut].rstrip(",;:- ") + "..." return body + "..." return body def _qa_search_excerpts(row: dict) -> dict: """Augment a search/qa row with question/answer excerpts. Excerpts are computed from the (un-highlighted) full text that lives next to the snippet in the row. This keeps the existing q_snippet/ a_snippet (with highlighting) working for back-compat and adds plain-text excerpts the UI can prefer. """ row["question_excerpt"] = _excerpt(row.pop("_question_text", None)) row["answer_excerpt"] = _excerpt(row.pop("_answer_text", None)) return row @app.get("/api/episodes") def list_episodes(year: int | None = None, limit: int = 1000): db: sqlite3.Connection = app.state.db sql = """ SELECT id, year, title, air_date, ROUND(duration_sec/60.0,1) AS minutes, (SELECT COUNT(*) FROM qa_pairs q WHERE q.episode_id = e.id) AS qa_count, (SELECT COUNT(*) FROM intros i WHERE i.episode_id = e.id) AS intro_count FROM episodes e """ params: list = [] if year is not None: sql += " WHERE year = ?" params.append(year) sql += " ORDER BY COALESCE(air_date, '9999') ASC, title ASC LIMIT ?" params.append(limit) rows = db.execute(sql, params).fetchall() return [dict(r) for r in rows] @app.get("/api/episodes/{episode_id}") def episode_detail(episode_id: int): db: sqlite3.Connection = app.state.db ep = db.execute("SELECT * FROM episodes WHERE id = ?", (episode_id,)).fetchone() if not ep: raise HTTPException(404, "episode not found") intros = db.execute( "SELECT name, role_hint, intro_time_sec, affiliation, fillin_for, source_text " "FROM intros WHERE episode_id = ? ORDER BY intro_time_sec", (episode_id,), ).fetchall() qa = db.execute( "SELECT id, question_start_sec, question_end_sec, " "answer_start_sec, answer_end_sec, " "question_text, answer_text, caller_name, caller_role, topic, topic_tags, " "usefulness_score, topic_class, is_banter " "FROM qa_pairs WHERE episode_id = ? ORDER BY question_start_sec", (episode_id,), ).fetchall() return { "episode": dict(ep), "intros": [dict(r) for r in intros], "qa_pairs": [ {**dict(r), "topic_tags": json.loads(r["topic_tags"] or "[]")} for r in qa ], } @app.get("/api/episodes/{episode_id}/transcript") def episode_transcript(episode_id: int): db: sqlite3.Connection = app.state.db ep = db.execute("SELECT id, title, year FROM episodes WHERE id = ?", (episode_id,)).fetchone() if not ep: raise HTTPException(404, "episode not found") segments = db.execute( "SELECT seg_idx, start_sec, end_sec, text FROM segments " "WHERE episode_id = ? ORDER BY seg_idx", (episode_id,), ).fetchall() turns = db.execute( "SELECT speaker, start_sec, end_sec, confidence FROM turns " "WHERE episode_id = ? ORDER BY start_sec", (episode_id,), ).fetchall() return { "episode": dict(ep), "segments": [dict(r) for r in segments], "turns": [dict(r) for r in turns], } @app.get("/api/search") def search( q: str = Query(..., min_length=2), kind: str = Query("both", pattern="^(both|segments|qa)$"), limit: int = Query(50, ge=1, le=500), min_score: int = Query(0, ge=0, le=5, description="Minimum usefulness_score for Q&A hits (0=no filter)"), exclude_banter: bool = Query(False, description="Drop Q&A rows where is_banter=1"), ): db: sqlite3.Connection = app.state.db fts_q = fts_escape(q) if not fts_q: return {"q": q, "segments": [], "qa": []} seg_results = [] qa_results = [] if kind in ("both", "segments"): seg_results = [ dict(r) for r in db.execute( """ SELECT e.id AS episode_id, e.year, e.title, e.air_date, s.start_sec, s.end_sec, snippet(segments_fts, 0, '', '', '...', 16) AS snippet, bm25(segments_fts) AS rank FROM segments_fts JOIN segments s ON s.id = segments_fts.rowid JOIN episodes e ON e.id = s.episode_id WHERE segments_fts MATCH ? ORDER BY rank LIMIT ? """, (fts_q, limit), ).fetchall() ] if kind in ("both", "qa"): # NULL is treated as "unscored, include" so unprocessed rows still # appear and old saved URLs keep working as the classifier rolls out. # Filters are applied as additional WHERE clauses on top of the FTS # MATCH; SQLite's planner can use idx_qa_usefulness once it's helpful. qa_clauses = ["qa_fts MATCH :q"] qa_params: dict[str, object] = {"q": fts_q, "limit": limit} if min_score > 0: qa_clauses.append( "(p.usefulness_score IS NULL OR p.usefulness_score >= :min_score)" ) qa_params["min_score"] = min_score if exclude_banter: qa_clauses.append("(p.is_banter IS NULL OR p.is_banter = 0)") qa_sql = f""" SELECT e.id AS episode_id, e.year, e.title, e.air_date, p.id AS qa_id, p.caller_name, p.question_start_sec, p.answer_start_sec, p.usefulness_score, p.topic_class, p.is_banter, p.question_text AS _question_text, p.answer_text AS _answer_text, snippet(qa_fts, 0, '', '', '...', 16) AS q_snippet, snippet(qa_fts, 1, '', '', '...', 16) AS a_snippet, bm25(qa_fts) AS rank FROM qa_fts JOIN qa_pairs p ON p.id = qa_fts.rowid JOIN episodes e ON e.id = p.episode_id WHERE {' AND '.join(qa_clauses)} ORDER BY rank LIMIT :limit """ qa_results = [ _qa_search_excerpts(dict(r)) for r in db.execute(qa_sql, qa_params).fetchall() ] return {"q": q, "segments": seg_results, "qa": qa_results} # Sort key whitelist so we can pass user input straight into ORDER BY. _QA_SORT_ORDERS: dict[str, str] = { "air_date_desc": "COALESCE(e.air_date, '0000') DESC, p.question_start_sec ASC", "air_date_asc": "COALESCE(e.air_date, '9999') ASC, p.question_start_sec ASC", "score_desc": "COALESCE(p.usefulness_score, 0) DESC, " "COALESCE(e.air_date, '0000') DESC, p.question_start_sec ASC", } @app.get("/api/qa") def list_qa( year: int | None = None, min_score: int = Query(0, ge=0, le=5), exclude_banter: bool = Query(False), topic_class: str | None = None, limit: int = Query(50, ge=1, le=200), offset: int = Query(0, ge=0), order: str = Query("air_date_desc"), ): """Browseable Q&A list — same column shape as /api/search Q&A hits.""" db: sqlite3.Connection = app.state.db if order not in _QA_SORT_ORDERS: raise HTTPException(400, f"unknown order: {order}") order_sql = _QA_SORT_ORDERS[order] where = ["1=1"] params: dict[str, object] = {} if year is not None: where.append("e.year = :year") params["year"] = year if min_score > 0: where.append("(p.usefulness_score IS NULL OR p.usefulness_score >= :min_score)") params["min_score"] = min_score if exclude_banter: where.append("(p.is_banter IS NULL OR p.is_banter = 0)") if topic_class: where.append("p.topic_class = :topic_class") params["topic_class"] = topic_class where_sql = " AND ".join(where) total = db.execute( f"""SELECT COUNT(*) FROM qa_pairs p JOIN episodes e ON e.id = p.episode_id WHERE {where_sql}""", params, ).fetchone()[0] params_pl = dict(params, limit=limit, offset=offset) rows = db.execute( f"""SELECT e.id AS episode_id, e.year, e.title, e.air_date, p.id AS qa_id, p.caller_name, p.question_start_sec, p.answer_start_sec, p.usefulness_score, p.topic_class, p.is_banter, p.question_text AS _question_text, p.answer_text AS _answer_text FROM qa_pairs p JOIN episodes e ON e.id = p.episode_id WHERE {where_sql} ORDER BY {order_sql} LIMIT :limit OFFSET :offset""", params_pl, ).fetchall() items = [_qa_search_excerpts(dict(r)) for r in rows] return {"total": total, "items": items} # --- Audio streaming with HTTP Range support ---------------------------- _AUDIO_CHUNK = 64 * 1024 def _resolve_audio_path(rel_path: str) -> Path | None: """Return the absolute Path to the MP3 if it exists, else None. rel_path is the value stored in episodes.rel_path (e.g. "2010/10 - October/10-02-10 HR 1.mp3"). We refuse anything that escapes the episodes root via .. so a malicious DB row cannot read arbitrary files. """ if not rel_path: return None base = Path(EPISODES_DIR).resolve() candidate = (base / rel_path).resolve() try: candidate.relative_to(base) except ValueError: return None if not candidate.is_file(): return None return candidate def _parse_range(header: str, file_size: int) -> tuple[int, int] | None: """Parse a single-range "bytes=START-END" header. Returns None if invalid.""" if not header or not header.startswith("bytes="): return None spec = header[len("bytes="):].strip() if "," in spec: # Multi-range — fall back to no-range (full file) for simplicity return None if "-" not in spec: return None start_s, end_s = spec.split("-", 1) try: if start_s == "": # suffix range: "-N" -> last N bytes length = int(end_s) if length <= 0: return None start = max(0, file_size - length) end = file_size - 1 else: start = int(start_s) end = int(end_s) if end_s else file_size - 1 except ValueError: return None if start < 0 or end < start or start >= file_size: return None end = min(end, file_size - 1) return start, end def _file_iter(path: Path, start: int, length: int, chunk: int = _AUDIO_CHUNK) -> Iterator[bytes]: remaining = length with open(path, "rb") as f: f.seek(start) while remaining > 0: data = f.read(min(chunk, remaining)) if not data: break remaining -= len(data) yield data @app.get("/api/audio/{episode_id}") def stream_audio(episode_id: int, request: Request): """Stream the episode's MP3 with HTTP Range support. Returns 404 if the episode doesn't exist or the file isn't on disk (Jupiter currently has no episodes/ tree — that's a clean 404). The audio element on the transcript page checks the response and hides itself on 404. """ db: sqlite3.Connection = app.state.db ep = db.execute("SELECT rel_path FROM episodes WHERE id = ?", (episode_id,)).fetchone() if not ep: raise HTTPException(404, "episode not found") path = _resolve_audio_path(ep["rel_path"]) if path is None: raise HTTPException(404, "audio file missing") file_size = path.stat().st_size range_header = request.headers.get("range") or request.headers.get("Range") rng = _parse_range(range_header, file_size) if range_header else None headers = { "Accept-Ranges": "bytes", "Cache-Control": "public, max-age=86400", "Content-Type": "audio/mpeg", } if rng is None: # Full content headers["Content-Length"] = str(file_size) return StreamingResponse( _file_iter(path, 0, file_size), status_code=200, headers=headers, media_type="audio/mpeg", ) start, end = rng length = end - start + 1 headers["Content-Length"] = str(length) headers["Content-Range"] = f"bytes {start}-{end}/{file_size}" return StreamingResponse( _file_iter(path, start, length), status_code=206, headers=headers, media_type="audio/mpeg", ) @app.get("/api/callers") def top_callers(limit: int = 50): db: sqlite3.Connection = app.state.db rows = db.execute( "SELECT caller_name, COUNT(*) AS pairs FROM qa_pairs " "WHERE caller_name IS NOT NULL " "GROUP BY caller_name ORDER BY pairs DESC LIMIT ?", (limit,), ).fetchall() return [dict(r) for r in rows] @app.get("/api/stats") def stats(): db: sqlite3.Connection = app.state.db counts = { t: db.execute(f"SELECT COUNT(*) FROM {t}").fetchone()[0] for t in ("episodes", "segments", "turns", "intros", "qa_pairs") } by_year = [ dict(r) for r in db.execute( "SELECT year, COUNT(*) AS episodes, " "ROUND(SUM(duration_sec)/3600.0, 1) AS hours " "FROM episodes GROUP BY year ORDER BY year" ).fetchall() ] return {"counts": counts, "by_year": by_year} @app.get("/api/db.sqlite") def download_db(): """Stream the read-only archive.db for offline laptop sync. Anyone who can reach /api/search can already read every transcript, so exposing the underlying SQLite file adds no meaningful disclosure. Sync side: curl -o archive.db :/api/db.sqlite """ if not Path(DB_PATH).exists(): raise HTTPException(404, "archive db not present") return FileResponse( DB_PATH, media_type="application/vnd.sqlite3", filename="archive.db", ) @app.get("/", response_class=HTMLResponse) def index(): return INDEX_HTML # --- Single-episode HTML transcript view -------------------------------- def _fmt_time(sec: float | None) -> str: if sec is None: return "" s = int(sec) return f"{s // 60}:{s % 60:02d}" def _episode_html(episode_id: int) -> str: db: sqlite3.Connection = app.state.db ep = db.execute("SELECT * FROM episodes WHERE id = ?", (episode_id,)).fetchone() if not ep: raise HTTPException(404, "episode not found") intros = db.execute( "SELECT id, name, role_hint, intro_time_sec FROM intros " "WHERE episode_id = ? ORDER BY intro_time_sec", (episode_id,), ).fetchall() qa = db.execute( "SELECT id, question_start_sec, question_end_sec, answer_start_sec, " " answer_end_sec, question_text, answer_text, caller_name, " " caller_role, usefulness_score, topic_class, is_banter " "FROM qa_pairs WHERE episode_id = ? ORDER BY question_start_sec", (episode_id,), ).fetchall() segments = db.execute( "SELECT seg_idx, start_sec, end_sec, text FROM segments " "WHERE episode_id = ? ORDER BY seg_idx", (episode_id,), ).fetchall() esc = _html.escape title = esc(ep["title"] or f"Episode {episode_id}") air = esc(ep["air_date"] or "") year = ep["year"] duration_min = round((ep["duration_sec"] or 0) / 60.0, 1) rel_path = esc(ep["rel_path"] or "") # Build qa lookup keyed by question_start so we can splice them into # the segment stream chronologically. qa_rows = [dict(r) for r in qa] qa_starts = sorted( (((r["question_start_sec"] or 0.0), r) for r in qa_rows), key=lambda x: x[0], ) # Right rail summary lists intro_items = [] for r in intros: t = _fmt_time(r["intro_time_sec"]) name = esc(r["name"] or "?") role = esc(r["role_hint"] or "") role_html = f' ({role})' if role else "" intro_items.append( f'
  • ' f'{t} · {name}{role_html}
  • ' ) intros_html = "\n".join(intro_items) or '
  • none
  • ' qa_items = [] for r in qa_rows: t = _fmt_time(r["question_start_sec"]) score = r["usefulness_score"] badge = ( f'{score}' if score is not None else "" ) topic = esc(r["topic_class"] or "") topic_html = f'{topic} ' if topic else "" caller = esc(r["caller_name"] or "") caller_html = f' · {caller}' if caller else "" first_q = _excerpt(r["question_text"] or "")[:80] teaser = esc(first_q) qa_items.append( f'
  • ' f'{t} {badge}{topic_html}{teaser}{caller_html}
  • ' ) qa_summary_html = "\n".join(qa_items) or '
  • none
  • ' # Build the chronological transcript body. We walk segments and, before # any segment whose start_sec >= a Q&A's question_start, we emit the # Q&A block. (Q&A blocks contain the full question/answer text already, # so segment text becomes context around them.) body_parts: list[str] = [] qa_iter = iter(qa_starts) next_qa: tuple[float, dict] | None = next(qa_iter, None) # Intros also get inline anchors so the right-rail jump links work intro_by_time = sorted( (((r["intro_time_sec"] or 0.0), r) for r in intros), key=lambda x: x[0], ) intro_iter = iter(intro_by_time) next_intro = next(intro_iter, None) def _flush_inline_at(t_seg: float) -> None: nonlocal next_intro, next_qa while next_intro and next_intro[0] <= t_seg: ir = next_intro[1] tlbl = _fmt_time(ir["intro_time_sec"]) name = esc(ir["name"] or "?") role = esc(ir["role_hint"] or "") role_html = f' ({role})' if role else "" body_parts.append( f'
    ' f'' f'{tlbl} intro: {name}{role_html}' f'
    ' ) next_intro = next(intro_iter, None) while next_qa and next_qa[0] <= t_seg: qr = next_qa[1] qstart = qr["question_start_sec"] or 0.0 astart = qr["answer_start_sec"] or qstart score = qr["usefulness_score"] badge = ( f'{score}' if score is not None else "" ) topic = esc(qr["topic_class"] or "") topic_html = f'{topic} ' if topic else "" caller = esc(qr["caller_name"] or "") caller_html = f' · {caller}' if caller else "" qbody = esc(qr["question_text"] or "") abody = esc(qr["answer_text"] or "") dim = " dim" if (score is not None and score <= 2) or qr["is_banter"] == 1 else "" body_parts.append( f'
    ' f'
    {badge}{topic_html}' f'{_fmt_time(qstart)}' f' Q&A{caller_html}' f'' f'
    ' f'
    Q: {qbody}
    ' f'
    ' f'{_fmt_time(astart)}' f' A: {abody}
    ' f'
    ' ) next_qa = next(qa_iter, None) for s in segments: t_seg = s["start_sec"] or 0.0 _flush_inline_at(t_seg) seg_text = esc(s["text"] or "").strip() if not seg_text: continue body_parts.append( f'

    ' f'{_fmt_time(t_seg)} ' f'{seg_text}

    ' ) # Flush any tail intros / Q&As after final segment _flush_inline_at(float("inf")) body_html = "\n".join(body_parts) or '

    no transcript

    ' qa_count = len(qa_rows) intro_count = len(intros) return EPISODE_HTML.format( title=title, episode_id=episode_id, year=year, air=air, duration_min=duration_min, rel_path=rel_path, qa_count=qa_count, intro_count=intro_count, intros_summary=intros_html, qa_summary=qa_summary_html, body=body_html, ) @app.get("/episode/{episode_id}", response_class=HTMLResponse) def episode_page(episode_id: int): return _episode_html(episode_id) INDEX_HTML = """ Computer Guru Radio Archive

    Computer Guru Radio Archive

    loading...
    """ # Single-episode transcript view. EPISODE_HTML = """ {title} · Computer Guru Radio Archive

    {title}

    {year} · {air} · {duration_min} min · {qa_count} Q&A · {intro_count} intros · « back to search
    {rel_path}
    {body}
    """ if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=PORT)