""" Archive transcript index using SQLite FTS5. Stores all transcript segments with speaker labels, searchable by keyword or phrase. """ import json import sqlite3 from dataclasses import dataclass from pathlib import Path from typing import Iterator from rich.console import Console console = Console() DB_SCHEMA = """ CREATE TABLE IF NOT EXISTS episodes ( id INTEGER PRIMARY KEY, episode_id TEXT UNIQUE NOT NULL, -- e.g. "2016-s8e42" date TEXT, -- "2016-03-15" audio_path TEXT, -- absolute path to original MP3 duration REAL, hr INTEGER -- 1 or 2 (for split episodes) ); CREATE TABLE IF NOT EXISTS segments ( id INTEGER PRIMARY KEY, episode_id TEXT NOT NULL, seg_index INTEGER NOT NULL, start REAL NOT NULL, end REAL NOT NULL, speaker TEXT, -- "HOST", "CALLER", "UNKNOWN", "COMMERCIAL" text TEXT NOT NULL, FOREIGN KEY (episode_id) REFERENCES episodes(episode_id) ); CREATE VIRTUAL TABLE IF NOT EXISTS segments_fts USING fts5( text, speaker UNINDEXED, episode_id UNINDEXED, seg_index UNINDEXED, content='segments', content_rowid='id' ); CREATE TRIGGER IF NOT EXISTS segments_ai AFTER INSERT ON segments BEGIN INSERT INTO segments_fts(rowid, text, speaker, episode_id, seg_index) VALUES (new.id, new.text, new.speaker, new.episode_id, new.seg_index); END; CREATE TABLE IF NOT EXISTS qa_pairs ( id INTEGER PRIMARY KEY, episode_id TEXT NOT NULL, question_start REAL NOT NULL, question_end REAL NOT NULL, answer_start REAL NOT NULL, answer_end REAL NOT NULL, question_text TEXT NOT NULL, answer_text TEXT NOT NULL, topic TEXT, -- Ollama-tagged topic topic_tags TEXT, -- JSON array of tags FOREIGN KEY (episode_id) REFERENCES episodes(episode_id) ); CREATE VIRTUAL TABLE IF NOT EXISTS qa_fts USING fts5( question_text, answer_text, topic, episode_id UNINDEXED, content='qa_pairs', content_rowid='id' ); CREATE TRIGGER IF NOT EXISTS qa_ai AFTER INSERT ON qa_pairs BEGIN INSERT INTO qa_fts(rowid, question_text, answer_text, topic, episode_id) VALUES (new.id, new.question_text, new.answer_text, new.topic, new.episode_id); END; """ @dataclass class SearchResult: episode_id: str date: str start: float end: float speaker: str text: str audio_path: str score: float = 0.0 def timestamp_str(self) -> str: def fmt(s): m, sec = divmod(int(s), 60) h, m = divmod(m, 60) return f"{h}:{m:02d}:{sec:02d}" if h else f"{m}:{sec:02d}" return f"{fmt(self.start)}–{fmt(self.end)}" @dataclass class QAResult: episode_id: str date: str question_start: float question_end: float answer_start: float answer_end: float question_text: str answer_text: str topic: str audio_path: str def clip_start(self, padding: float = 1.0) -> float: return max(0.0, self.question_start - padding) def clip_end(self, padding: float = 1.0) -> float: return self.answer_end + padding def timestamp_str(self) -> str: def fmt(s): m, sec = divmod(int(s), 60) h, m = divmod(m, 60) return f"{h}:{m:02d}:{sec:02d}" if h else f"{m}:{sec:02d}" return f"{fmt(self.question_start)}–{fmt(self.answer_end)}" def duration(self) -> float: return self.answer_end - self.question_start class ArchiveIndex: def __init__(self, db_path: Path): self.db_path = Path(db_path) self.db_path.parent.mkdir(parents=True, exist_ok=True) self._conn = sqlite3.connect(str(self.db_path)) self._conn.row_factory = sqlite3.Row self._conn.executescript(DB_SCHEMA) self._conn.commit() def close(self): self._conn.close() def __enter__(self): return self def __exit__(self, *_): self.close() # ── Ingestion ────────────────────────────────────────────────────────── def add_episode(self, episode_id: str, audio_path: Path, date: str = None, duration: float = None, hr: int = None): self._conn.execute( "INSERT OR IGNORE INTO episodes (episode_id, date, audio_path, duration, hr) " "VALUES (?, ?, ?, ?, ?)", (episode_id, date, str(audio_path), duration, hr) ) self._conn.commit() def add_segments(self, episode_id: str, segments: list[dict]): """Add transcript segments. Each dict: {start, end, text, speaker}.""" existing = self._conn.execute( "SELECT COUNT(*) FROM segments WHERE episode_id = ?", (episode_id,) ).fetchone()[0] if existing: return # already indexed self._conn.executemany( "INSERT INTO segments (episode_id, seg_index, start, end, speaker, text) " "VALUES (?, ?, ?, ?, ?, ?)", [ (episode_id, i, s["start"], s["end"], s.get("speaker", "UNKNOWN"), s["text"]) for i, s in enumerate(segments) ] ) self._conn.commit() def add_qa_pair(self, episode_id: str, q_start: float, q_end: float, a_start: float, a_end: float, question: str, answer: str, topic: str = None, tags: list[str] = None): self._conn.execute( "INSERT INTO qa_pairs " "(episode_id, question_start, question_end, answer_start, answer_end, " "question_text, answer_text, topic, topic_tags) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", (episode_id, q_start, q_end, a_start, a_end, question, answer, topic, json.dumps(tags or [])) ) self._conn.commit() # ── Search ───────────────────────────────────────────────────────────── def search(self, query: str, speaker_filter: str = None, limit: int = 20) -> list[SearchResult]: """Full-text search across all transcript segments.""" speaker_clause = "" params = [query, limit] if speaker_filter: speaker_clause = "AND s.speaker = ?" params.insert(1, speaker_filter) rows = self._conn.execute(f""" SELECT s.episode_id, e.date, s.start, s.end, s.speaker, s.text, e.audio_path, rank FROM segments_fts f JOIN segments s ON s.id = f.rowid JOIN episodes e ON e.episode_id = s.episode_id WHERE segments_fts MATCH ? {speaker_clause} ORDER BY rank LIMIT ? """, params).fetchall() return [SearchResult( episode_id=r["episode_id"], date=r["date"] or r["episode_id"], start=r["start"], end=r["end"], speaker=r["speaker"], text=r["text"], audio_path=r["audio_path"], score=r["rank"] ) for r in rows] def search_qa(self, query: str, limit: int = 20) -> list[QAResult]: """Search Q&A pairs — matches against question, answer, and topic.""" rows = self._conn.execute(""" SELECT q.episode_id, e.date, q.question_start, q.question_end, q.answer_start, q.answer_end, q.question_text, q.answer_text, q.topic, e.audio_path, rank FROM qa_fts f JOIN qa_pairs q ON q.id = f.rowid JOIN episodes e ON e.episode_id = q.episode_id WHERE qa_fts MATCH ? ORDER BY rank LIMIT ? """, [query, limit]).fetchall() return [QAResult( episode_id=r["episode_id"], date=r["date"] or r["episode_id"], question_start=r["question_start"], question_end=r["question_end"], answer_start=r["answer_start"], answer_end=r["answer_end"], question_text=r["question_text"], answer_text=r["answer_text"], topic=r["topic"] or "", audio_path=r["audio_path"] ) for r in rows] def stats(self) -> dict: return { "episodes": self._conn.execute("SELECT COUNT(*) FROM episodes").fetchone()[0], "segments": self._conn.execute("SELECT COUNT(*) FROM segments").fetchone()[0], "qa_pairs": self._conn.execute("SELECT COUNT(*) FROM qa_pairs").fetchone()[0], }