Files
claudetools/projects/radio-show/audio-processor/src/indexer.py
Mike Swanson 79abef9dc9 radio: diarization pipeline fixes, benchmark setup, test episode set
- Fix voice_profiler threshold bug (HOST label overwrote Unknown unconditionally)
- Audio preload optimization: single ffmpeg per episode, 149.5x realtime on 5070 Ti
- WavLM threshold raised to 0.85 (Mike 0.90-0.99, callers 0.46-0.83)
- Promo/bumper filter: weighted signature scoring, 42->27 clean Q&A pairs
- Text-only Q&A fallback for episodes with no CALLER diarization labels
- TRANSFORMERS_OFFLINE=1 to skip HuggingFace freshness checks
- Add diarize_2018.py for targeted re-run + FTS5 rebuild
- Add benchmark.py + BENCH_SETUP.md for GURU-BEAST-ROG (RTX 4090) comparison
- Commit 9-episode training diarization.json outputs
- Session log: 2026-04-27-diarization-pipeline.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-27 13:20:40 -07:00

248 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Archive transcript index using SQLite FTS5.
Stores all transcript segments with speaker labels, searchable by keyword or phrase.
"""
import json
import sqlite3
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator
from rich.console import Console
console = Console()
DB_SCHEMA = """
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY,
episode_id TEXT UNIQUE NOT NULL, -- e.g. "2016-s8e42"
date TEXT, -- "2016-03-15"
audio_path TEXT, -- absolute path to original MP3
duration REAL,
hr INTEGER -- 1 or 2 (for split episodes)
);
CREATE TABLE IF NOT EXISTS segments (
id INTEGER PRIMARY KEY,
episode_id TEXT NOT NULL,
seg_index INTEGER NOT NULL,
start REAL NOT NULL,
end REAL NOT NULL,
speaker TEXT, -- "HOST", "CALLER", "UNKNOWN", "COMMERCIAL"
text TEXT NOT NULL,
FOREIGN KEY (episode_id) REFERENCES episodes(episode_id)
);
CREATE VIRTUAL TABLE IF NOT EXISTS segments_fts USING fts5(
text,
speaker UNINDEXED,
episode_id UNINDEXED,
seg_index UNINDEXED,
content='segments',
content_rowid='id'
);
CREATE TRIGGER IF NOT EXISTS segments_ai AFTER INSERT ON segments BEGIN
INSERT INTO segments_fts(rowid, text, speaker, episode_id, seg_index)
VALUES (new.id, new.text, new.speaker, new.episode_id, new.seg_index);
END;
CREATE TABLE IF NOT EXISTS qa_pairs (
id INTEGER PRIMARY KEY,
episode_id TEXT NOT NULL,
question_start REAL NOT NULL,
question_end REAL NOT NULL,
answer_start REAL NOT NULL,
answer_end REAL NOT NULL,
question_text TEXT NOT NULL,
answer_text TEXT NOT NULL,
topic TEXT, -- Ollama-tagged topic
topic_tags TEXT, -- JSON array of tags
FOREIGN KEY (episode_id) REFERENCES episodes(episode_id)
);
CREATE VIRTUAL TABLE IF NOT EXISTS qa_fts USING fts5(
question_text,
answer_text,
topic,
episode_id UNINDEXED,
content='qa_pairs',
content_rowid='id'
);
CREATE TRIGGER IF NOT EXISTS qa_ai AFTER INSERT ON qa_pairs BEGIN
INSERT INTO qa_fts(rowid, question_text, answer_text, topic, episode_id)
VALUES (new.id, new.question_text, new.answer_text, new.topic, new.episode_id);
END;
"""
@dataclass
class SearchResult:
episode_id: str
date: str
start: float
end: float
speaker: str
text: str
audio_path: str
score: float = 0.0
def timestamp_str(self) -> str:
def fmt(s):
m, sec = divmod(int(s), 60)
h, m = divmod(m, 60)
return f"{h}:{m:02d}:{sec:02d}" if h else f"{m}:{sec:02d}"
return f"{fmt(self.start)}{fmt(self.end)}"
@dataclass
class QAResult:
episode_id: str
date: str
question_start: float
question_end: float
answer_start: float
answer_end: float
question_text: str
answer_text: str
topic: str
audio_path: str
def clip_start(self, padding: float = 1.0) -> float:
return max(0.0, self.question_start - padding)
def clip_end(self, padding: float = 1.0) -> float:
return self.answer_end + padding
def timestamp_str(self) -> str:
def fmt(s):
m, sec = divmod(int(s), 60)
h, m = divmod(m, 60)
return f"{h}:{m:02d}:{sec:02d}" if h else f"{m}:{sec:02d}"
return f"{fmt(self.question_start)}{fmt(self.answer_end)}"
def duration(self) -> float:
return self.answer_end - self.question_start
class ArchiveIndex:
def __init__(self, db_path: Path):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = sqlite3.connect(str(self.db_path))
self._conn.row_factory = sqlite3.Row
self._conn.executescript(DB_SCHEMA)
self._conn.commit()
def close(self):
self._conn.close()
def __enter__(self):
return self
def __exit__(self, *_):
self.close()
# ── Ingestion ──────────────────────────────────────────────────────────
def add_episode(self, episode_id: str, audio_path: Path,
date: str = None, duration: float = None, hr: int = None):
self._conn.execute(
"INSERT OR IGNORE INTO episodes (episode_id, date, audio_path, duration, hr) "
"VALUES (?, ?, ?, ?, ?)",
(episode_id, date, str(audio_path), duration, hr)
)
self._conn.commit()
def add_segments(self, episode_id: str, segments: list[dict]):
"""Add transcript segments. Each dict: {start, end, text, speaker}."""
existing = self._conn.execute(
"SELECT COUNT(*) FROM segments WHERE episode_id = ?", (episode_id,)
).fetchone()[0]
if existing:
return # already indexed
self._conn.executemany(
"INSERT INTO segments (episode_id, seg_index, start, end, speaker, text) "
"VALUES (?, ?, ?, ?, ?, ?)",
[
(episode_id, i, s["start"], s["end"],
s.get("speaker", "UNKNOWN"), s["text"])
for i, s in enumerate(segments)
]
)
self._conn.commit()
def add_qa_pair(self, episode_id: str, q_start: float, q_end: float,
a_start: float, a_end: float, question: str, answer: str,
topic: str = None, tags: list[str] = None):
self._conn.execute(
"INSERT INTO qa_pairs "
"(episode_id, question_start, question_end, answer_start, answer_end, "
"question_text, answer_text, topic, topic_tags) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(episode_id, q_start, q_end, a_start, a_end, question, answer,
topic, json.dumps(tags or []))
)
self._conn.commit()
# ── Search ─────────────────────────────────────────────────────────────
def search(self, query: str, speaker_filter: str = None,
limit: int = 20) -> list[SearchResult]:
"""Full-text search across all transcript segments."""
speaker_clause = ""
params = [query, limit]
if speaker_filter:
speaker_clause = "AND s.speaker = ?"
params.insert(1, speaker_filter)
rows = self._conn.execute(f"""
SELECT s.episode_id, e.date, s.start, s.end, s.speaker, s.text,
e.audio_path, rank
FROM segments_fts f
JOIN segments s ON s.id = f.rowid
JOIN episodes e ON e.episode_id = s.episode_id
WHERE segments_fts MATCH ?
{speaker_clause}
ORDER BY rank
LIMIT ?
""", params).fetchall()
return [SearchResult(
episode_id=r["episode_id"], date=r["date"] or r["episode_id"],
start=r["start"], end=r["end"], speaker=r["speaker"],
text=r["text"], audio_path=r["audio_path"], score=r["rank"]
) for r in rows]
def search_qa(self, query: str, limit: int = 20) -> list[QAResult]:
"""Search Q&A pairs — matches against question, answer, and topic."""
rows = self._conn.execute("""
SELECT q.episode_id, e.date, q.question_start, q.question_end,
q.answer_start, q.answer_end, q.question_text, q.answer_text,
q.topic, e.audio_path, rank
FROM qa_fts f
JOIN qa_pairs q ON q.id = f.rowid
JOIN episodes e ON e.episode_id = q.episode_id
WHERE qa_fts MATCH ?
ORDER BY rank
LIMIT ?
""", [query, limit]).fetchall()
return [QAResult(
episode_id=r["episode_id"], date=r["date"] or r["episode_id"],
question_start=r["question_start"], question_end=r["question_end"],
answer_start=r["answer_start"], answer_end=r["answer_end"],
question_text=r["question_text"], answer_text=r["answer_text"],
topic=r["topic"] or "", audio_path=r["audio_path"]
) for r in rows]
def stats(self) -> dict:
return {
"episodes": self._conn.execute("SELECT COUNT(*) FROM episodes").fetchone()[0],
"segments": self._conn.execute("SELECT COUNT(*) FROM segments").fetchone()[0],
"qa_pairs": self._conn.execute("SELECT COUNT(*) FROM qa_pairs").fetchone()[0],
}