radio: diarization pipeline fixes, benchmark setup, test episode set

- Fix voice_profiler threshold bug (HOST label overwrote Unknown unconditionally)
- Audio preload optimization: single ffmpeg per episode, 149.5x realtime on 5070 Ti
- WavLM threshold raised to 0.85 (Mike 0.90-0.99, callers 0.46-0.83)
- Promo/bumper filter: weighted signature scoring, 42->27 clean Q&A pairs
- Text-only Q&A fallback for episodes with no CALLER diarization labels
- TRANSFORMERS_OFFLINE=1 to skip HuggingFace freshness checks
- Add diarize_2018.py for targeted re-run + FTS5 rebuild
- Add benchmark.py + BENCH_SETUP.md for GURU-BEAST-ROG (RTX 4090) comparison
- Commit 9-episode training diarization.json outputs
- Session log: 2026-04-27-diarization-pipeline.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-27 13:20:10 -07:00
parent 206cd2f929
commit 79abef9dc9
21 changed files with 4720 additions and 202 deletions

View File

@@ -0,0 +1,372 @@
"""
Q&A pair extraction from diarized transcripts.
Identifies exchanges where a CALLER asks a question and the HOST answers.
Outputs structured Q&A pairs with timestamps for clip extraction and indexing.
"""
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
from rich.console import Console
console = Console()
# Phrases that signal a caller is asking a question
QUESTION_SIGNALS = [
r"\?",
r"\bhow (do|can|should|would|does)\b",
r"\bwhat (is|are|should|can|do|does|about)\b",
r"\bwhy (is|are|does|do|would|should)\b",
r"\bis (it|there|this|that) (true|safe|possible|good|bad|worth)\b",
r"\bshould i\b",
r"\bcan you\b",
r"\bi (was wondering|wanted to ask|have a question)\b",
]
QUESTION_PATTERN = re.compile("|".join(QUESTION_SIGNALS), re.IGNORECASE)
# Minimum durations for a meaningful exchange
MIN_QUESTION_DURATION = 5.0 # seconds
MIN_ANSWER_DURATION = 15.0 # seconds
MAX_GAP_BETWEEN_QA = 30.0 # seconds between question end and answer start
# ── Promo / bumper filter ──────────────────────────────────────────────────
# Promos evolve across years but preserve signature phrases.
# Weight 2 = highly distinctive (one match sufficient to filter).
# Weight 1 = semi-generic (need 2+ to filter).
# A question turn with total score >= PROMO_SCORE_THRESHOLD is suppressed.
PROMO_SCORE_THRESHOLD = 2
_PROMO_SIGS: list[tuple[re.Pattern, int]] = [
# Highly distinctive — score 2 each
(re.compile(r"acquired a life of its own", re.I), 2),
(re.compile(r"simply desire a deeper", re.I), 2),
(re.compile(r"tame that beast", re.I), 2),
(re.compile(r"mike swanson will be back after", re.I), 2),
(re.compile(r"heaven forbid.{0,20}virus", re.I | re.DOTALL), 2),
(re.compile(r"mike swanson is answering all", re.I), 2),
# Semi-distinctive — score 1 each, need two to filter
(re.compile(r"\bcomputer running slow\b", re.I), 1),
(re.compile(r"\bafter these messages\b", re.I), 1),
(re.compile(r"\b790.?2040\b", re.I), 1),
(re.compile(r"\bgurushow\.com\b", re.I), 1),
(re.compile(r"\bcall in now\b", re.I), 1),
(re.compile(r"\bcomputer troubles\?", re.I), 1),
(re.compile(r"\bhardware installation\b", re.I), 1),
]
def _is_promo_or_bumper(text: str) -> bool:
"""Return True if text scores above threshold on show promo/bumper signatures."""
score = sum(w for pat, w in _PROMO_SIGS if pat.search(text))
return score >= PROMO_SCORE_THRESHOLD
@dataclass
class QAPair:
question_start: float
question_end: float
answer_start: float
answer_end: float
question_text: str
answer_text: str
topic: Optional[str] = None
topic_tags: list[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"question_start": self.question_start,
"question_end": self.question_end,
"answer_start": self.answer_start,
"answer_end": self.answer_end,
"question_text": self.question_text,
"answer_text": self.answer_text,
"topic": self.topic,
"topic_tags": self.topic_tags,
}
def clip_start(self, padding: float = 1.5) -> float:
return max(0.0, self.question_start - padding)
def clip_end(self, padding: float = 1.5) -> float:
return self.answer_end + padding
def duration(self) -> float:
return self.answer_end - self.question_start
def extract_qa_pairs(diarized_segments: list[dict]) -> list[QAPair]:
"""
Extract caller Q&A pairs from diarized transcript segments.
Each segment dict: {start, end, text, speaker}
Speaker values: "HOST", "CALLER", "UNKNOWN"
"""
pairs = []
# Group consecutive segments by speaker into speaker turns
turns = _merge_consecutive_speaker_turns(diarized_segments)
# Check if diarization produced any non-HOST speakers
has_caller_labels = any(t["speaker"] in ("CALLER", "UNKNOWN") for t in turns)
if not has_caller_labels:
# Diarization labels are absent or unreliable — fall back to text-pattern detection
return _extract_qa_text_only(turns)
i = 0
while i < len(turns):
turn = turns[i]
# Look for a CALLER turn that looks like a question
if turn["speaker"] in ("CALLER", "UNKNOWN") and _looks_like_question(turn["text"]):
if _is_promo_or_bumper(turn["text"]):
i += 1
continue
q_duration = turn["end"] - turn["start"]
if q_duration < MIN_QUESTION_DURATION:
i += 1
continue
# Look ahead for HOST answer turn(s)
j = i + 1
answer_turns = []
while j < len(turns):
next_turn = turns[j]
gap = next_turn["start"] - turns[j - 1]["end"]
if gap > MAX_GAP_BETWEEN_QA and not answer_turns:
break # too big a gap before any answer
if next_turn["speaker"] == "HOST":
answer_turns.append(next_turn)
# Keep collecting consecutive HOST turns
j += 1
while j < len(turns) and turns[j]["speaker"] == "HOST":
answer_turns.append(turns[j])
j += 1
break
elif next_turn["speaker"] in ("CALLER", "UNKNOWN"):
# Another caller turn before host answered — skip this question
break
else:
j += 1
if answer_turns:
answer_text = " ".join(t["text"] for t in answer_turns)
answer_duration = answer_turns[-1]["end"] - answer_turns[0]["start"]
if answer_duration >= MIN_ANSWER_DURATION:
pairs.append(QAPair(
question_start=turn["start"],
question_end=turn["end"],
answer_start=answer_turns[0]["start"],
answer_end=answer_turns[-1]["end"],
question_text=turn["text"].strip(),
answer_text=answer_text.strip(),
))
i = j
continue
i += 1
return pairs
# Maximum duration for a question turn in text-only mode — avoids capturing monologues
_MAX_QUESTION_S_TEXT_MODE = 90.0
# Caller introduction phrases Mike uses before taking a call
_CALLER_INTRO = re.compile(
r"\b(let'?s go to|going to the phones?|you'?re on the air|on the air|"
r"first caller|next caller|caller from|go ahead|what'?s (your question|going on)|"
r"welcome to the show|thanks for calling|thank you for calling|"
r"our (first|next|last) (caller|call)|taking (a |your )?call)\b",
re.IGNORECASE,
)
def _extract_qa_text_only(turns: list[dict]) -> list[QAPair]:
"""
Q&A extraction when speaker labels are unavailable or all HOST.
Uses text patterns to identify question anchors. Works well for call-in
radio format where callers describe problems and the host answers at length.
Captures both genuine caller questions and Mike's own rhetorical Q&A segments.
"""
pairs = []
i = 0
while i < len(turns):
turn = turns[i]
q_duration = turn["end"] - turn["start"]
is_q_candidate = (
_looks_like_question(turn["text"])
and MIN_QUESTION_DURATION <= q_duration <= _MAX_QUESTION_S_TEXT_MODE
)
# Also treat segments immediately after a caller-intro phrase as candidates
if not is_q_candidate and i > 0:
prev_text = turns[i - 1]["text"]
if _CALLER_INTRO.search(prev_text) and q_duration >= MIN_QUESTION_DURATION:
is_q_candidate = True
if is_q_candidate and _is_promo_or_bumper(turn["text"]):
i += 1
continue
if is_q_candidate:
# Collect following segments as the answer until we hit another question
j = i + 1
answer_turns = []
while j < len(turns):
next_turn = turns[j]
gap = next_turn["start"] - turns[j - 1]["end"]
if gap > MAX_GAP_BETWEEN_QA and not answer_turns:
break
# Stop collecting if we hit another short question-pattern turn
if (
_looks_like_question(next_turn["text"])
and (next_turn["end"] - next_turn["start"]) <= _MAX_QUESTION_S_TEXT_MODE
and answer_turns
):
break
answer_turns.append(next_turn)
j += 1
# Stop once we have a substantial answer block
if answer_turns:
ans_dur = answer_turns[-1]["end"] - answer_turns[0]["start"]
if ans_dur >= MIN_ANSWER_DURATION * 3:
break
if answer_turns:
answer_text = " ".join(t["text"] for t in answer_turns)
answer_duration = answer_turns[-1]["end"] - answer_turns[0]["start"]
if answer_duration >= MIN_ANSWER_DURATION:
pairs.append(QAPair(
question_start=turn["start"],
question_end=turn["end"],
answer_start=answer_turns[0]["start"],
answer_end=answer_turns[-1]["end"],
question_text=turn["text"].strip(),
answer_text=answer_text.strip(),
))
i = j
continue
i += 1
return pairs
def tag_qa_pairs_with_ollama(pairs: list[QAPair], ollama_host: str = "http://localhost:11434",
model: str = "qwen3:14b") -> list[QAPair]:
"""Use Ollama to tag each Q&A pair with a topic and tags."""
try:
import ollama
client = ollama.Client(host=ollama_host)
except ImportError:
console.print("[yellow]ollama not installed — skipping topic tagging[/yellow]")
return pairs
for i, pair in enumerate(pairs):
console.print(f"[dim]Tagging Q&A {i+1}/{len(pairs)}...[/dim]")
try:
prompt = (
f"A radio show caller asked:\n\"{pair.question_text[:300]}\"\n\n"
f"The host answered:\n\"{pair.answer_text[:500]}\"\n\n"
"Respond with JSON only, no explanation:\n"
'{"topic": "short topic name (3-5 words)", "tags": ["tag1", "tag2", "tag3"]}'
)
resp = client.chat(
model=model,
messages=[{"role": "user", "content": prompt}],
options={"temperature": 0},
)
raw = resp["message"]["content"].strip()
# Extract JSON from response
start = raw.find("{")
end = raw.rfind("}") + 1
if start >= 0 and end > start:
data = json.loads(raw[start:end])
pair.topic = data.get("topic", "")
pair.topic_tags = data.get("tags", [])
except Exception as e:
console.print(f"[yellow]Tagging failed for pair {i+1}: {e}[/yellow]")
return pairs
def load_diarized_transcript(transcript_path: Path,
diarization_path: Optional[Path]) -> list[dict]:
"""
Merge transcript and diarization into speaker-labeled segments.
Falls back to HOST-only if no diarization available.
"""
with open(transcript_path) as f:
transcript = json.load(f)
segments = transcript["segments"]
if diarization_path is None or not diarization_path.exists():
return [
{"start": s["start"], "end": s["end"],
"text": s["text"], "speaker": "HOST"}
for s in segments
]
with open(diarization_path) as f:
diarization = json.load(f)
turns = diarization.get("turns", [])
def speaker_at(t: float) -> str:
"""Find which diarization turn covers time t."""
for turn in turns:
if turn["start"] <= t <= turn["end"]:
return turn["speaker"]
return "UNKNOWN"
return [
{"start": s["start"], "end": s["end"],
"text": s["text"],
"speaker": speaker_at((s["start"] + s["end"]) / 2)}
for s in segments
]
# ── Helpers ────────────────────────────────────────────────────────────────
def _looks_like_question(text: str) -> bool:
return bool(QUESTION_PATTERN.search(text))
def _merge_consecutive_speaker_turns(segments: list[dict]) -> list[dict]:
"""Merge adjacent segments from the same speaker into continuous turns."""
if not segments:
return []
turns = []
current = dict(segments[0])
for seg in segments[1:]:
if seg["speaker"] == current["speaker"]:
current["end"] = seg["end"]
current["text"] = current["text"].rstrip() + " " + seg["text"].lstrip()
else:
turns.append(current)
current = dict(seg)
turns.append(current)
return turns