radio show: co-host voice profile, Q&A extraction fixes, archive index

- Build Tom (co-host) voice profile (44 embeddings, 0.698 similarity to Mike)
- diarizer.py: add CO-HOST speaker label for cohost-role profiles
- voice_profiler.py: emit "Cohost: <name>" label for cohost role
- qa_extractor.py: overlap resolution at load time (midpoint boundary split),
  4s CALLER-preference threshold, turn-based caller-intro lookback (2 HOST turns),
  _preceded_by_caller_intro() helper, _PHONE_GREETING pattern,
  751-1041 + "we'll get your problem solved" promo signatures
- benchmark.py: use src.transcriber.transcribe with batch_size=16
- add index_test_episodes.py and build_cohost_profile.py scripts
- add .gitignore (exclude episodes, transcripts, *.db, .venv)
- session log: 2026-04-27-qa-extraction-cohost-indexing.md

Result: 2016-s8e43 drops from 12 false-positive Q&A pairs to 2 real caller pairs.
archive.db: 6 episodes, 762 segments, 10 Q&A pairs, FTS5 search verified.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-27 14:41:04 -07:00
parent 79abef9dc9
commit e9ac607500
55 changed files with 649 additions and 100 deletions

View File

@@ -202,6 +202,8 @@ def diarize(audio_path: str | Path,
label = seg.speaker_label.split(" (")[0] # strip confidence score
if label.startswith("Host:") or label.startswith("Host "):
speaker = "HOST"
elif label.startswith("Cohost:"):
speaker = "CO-HOST"
elif label == "[error]":
speaker = "UNKNOWN"
else:

View File

@@ -53,10 +53,12 @@ _PROMO_SIGS: list[tuple[re.Pattern, int]] = [
(re.compile(r"\bcomputer running slow\b", re.I), 1),
(re.compile(r"\bafter these messages\b", re.I), 1),
(re.compile(r"\b790.?2040\b", re.I), 1),
(re.compile(r"\b751.?1041\b", re.I), 1),
(re.compile(r"\bgurushow\.com\b", re.I), 1),
(re.compile(r"\bcall in now\b", re.I), 1),
(re.compile(r"\bcomputer troubles\?", re.I), 1),
(re.compile(r"\bhardware installation\b", re.I), 1),
(re.compile(r"we.?ll get your problem solved", re.I), 1),
]
@@ -127,10 +129,19 @@ def extract_qa_pairs(diarized_segments: list[dict]) -> list[QAPair]:
if _is_promo_or_bumper(turn["text"]):
i += 1
continue
# Skip the opening 90s — real callers never call before the show starts
if turn["start"] < 90:
i += 1
continue
q_duration = turn["end"] - turn["start"]
if q_duration < MIN_QUESTION_DURATION:
i += 1
continue
# Require caller-intro context: host must have introduced the call, OR
# the caller opens with a phone greeting ("hello", "hi", "hey")
if not _preceded_by_caller_intro(turns, i) and not _PHONE_GREETING.match(turn["text"].strip()):
i += 1
continue
# Look ahead for HOST answer turn(s)
j = i + 1
@@ -329,25 +340,71 @@ def load_diarized_transcript(transcript_path: Path,
with open(diarization_path) as f:
diarization = json.load(f)
turns = diarization.get("turns", [])
raw_turns = diarization.get("turns", [])
def speaker_at(t: float) -> str:
"""Find which diarization turn covers time t."""
# Resolve overlapping boundaries left by the sliding-window diarizer:
# place each transition at the midpoint of the overlap region.
resolved: list[dict] = []
for turn in sorted(raw_turns, key=lambda t: t["start"]):
if not resolved:
resolved.append(dict(turn))
continue
prev = resolved[-1]
if turn["start"] < prev["end"]:
mid = (turn["start"] + prev["end"]) / 2
prev["end"] = mid
resolved.append({**turn, "start": mid})
else:
resolved.append(dict(turn))
turns = resolved
# Minimum CALLER coverage to label a transcript segment as CALLER.
# Batch transcription produces ~25s segments; caller windows are 10s.
# Require 4s of CALLER overlap so brief HOST-edge segments aren't over-claimed.
_CALLER_MIN_S = 4.0
def speaker_for_segment(seg_start: float, seg_end: float) -> str:
caller_cov = 0.0
coverage: dict[str, float] = {}
for turn in turns:
if turn["start"] <= t <= turn["end"]:
return turn["speaker"]
return "UNKNOWN"
overlap = min(seg_end, turn["end"]) - max(seg_start, turn["start"])
if overlap <= 0:
continue
coverage[turn["speaker"]] = coverage.get(turn["speaker"], 0) + overlap
if turn["speaker"] == "CALLER":
caller_cov += overlap
if not coverage:
return "UNKNOWN"
if caller_cov >= _CALLER_MIN_S:
return "CALLER"
return max(coverage, key=coverage.__getitem__)
return [
{"start": s["start"], "end": s["end"],
"text": s["text"],
"speaker": speaker_at((s["start"] + s["end"]) / 2)}
"speaker": speaker_for_segment(s["start"], s["end"])}
for s in segments
]
# ── Helpers ────────────────────────────────────────────────────────────────
_PHONE_GREETING = re.compile(r"^(hello|hi|hey|good (morning|afternoon|evening))\b", re.IGNORECASE)
def _preceded_by_caller_intro(turns: list[dict], idx: int, max_host_turns: int = 2) -> bool:
"""Return True if a preceding HOST turn (within max_host_turns HOST turns) contains a caller-intro phrase."""
host_count = 0
for j in range(idx - 1, -1, -1):
if turns[j]["speaker"] == "HOST":
if _CALLER_INTRO.search(turns[j]["text"]):
return True
host_count += 1
if host_count >= max_host_turns:
break
return False
def _looks_like_question(text: str) -> bool:
return bool(QUESTION_PATTERN.search(text))

View File

@@ -113,61 +113,60 @@ def _format_srt_time(seconds: float) -> str:
def transcribe(audio_path: str | Path, model_size: str = "large-v3",
language: str = "en", device: str = "cuda") -> Transcript:
"""Transcribe an audio file using faster-whisper."""
from faster_whisper import WhisperModel
language: str = "en", device: str = "cuda",
batch_size: int = 16) -> Transcript:
"""Transcribe an audio file using faster-whisper.
Uses BatchedInferencePipeline + int8_float16 + VAD for archive/batch work.
Word timestamps are skipped in batch mode (not needed for segment-level search).
Pass batch_size=0 to fall back to sequential WhisperModel with word timestamps.
"""
from faster_whisper import WhisperModel, BatchedInferencePipeline
audio_path = Path(audio_path)
use_batched = batch_size > 0
console.print(f"[bold]Transcribing:[/bold] {audio_path.name}")
console.print(f"[dim]Model: {model_size}, Device: {device}[/dim]")
model = WhisperModel(model_size, device=device, compute_type="float16")
segments_raw, info = model.transcribe(
str(audio_path),
language=language,
word_timestamps=True,
vad_filter=True,
vad_parameters=dict(
min_silence_duration_ms=500,
speech_pad_ms=200,
),
console.print(
f"[dim]Model: {model_size} | "
f"{'batched x' + str(batch_size) + ' int8_float16' if use_batched else 'sequential float16'} | "
f"Device: {device}[/dim]"
)
console.print(f"[dim]Detected language: {info.language} "
f"(probability: {info.language_probability:.2f})[/dim]")
console.print(f"[dim]Duration: {info.duration:.1f}s "
f"({info.duration / 60:.1f} min)[/dim]")
if use_batched:
base_model = WhisperModel(model_size, device=device, compute_type="int8_float16")
model = BatchedInferencePipeline(model=base_model)
segments_raw, info = model.transcribe(
str(audio_path),
language=language,
batch_size=batch_size,
)
else:
model = WhisperModel(model_size, device=device, compute_type="float16")
segments_raw, info = model.transcribe(
str(audio_path),
language=language,
word_timestamps=True,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500, speech_pad_ms=200),
)
console.print(f"[dim]Duration: {info.duration:.1f}s ({info.duration / 60:.1f} min)[/dim]")
segments = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("{task.completed} segments"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task("Transcribing...", total=None)
for i, seg in enumerate(segments_raw):
for i, seg in enumerate(segments_raw):
words = []
if not use_batched:
words = [
TranscriptWord(
word=w.word,
start=w.start,
end=w.end,
probability=w.probability,
)
TranscriptWord(word=w.word, start=w.start,
end=w.end, probability=w.probability)
for w in (seg.words or [])
]
segments.append(TranscriptSegment(
id=i,
text=seg.text,
start=seg.start,
end=seg.end,
words=words,
))
progress.update(task, completed=i + 1)
segments.append(TranscriptSegment(
id=i, text=seg.text, start=seg.start, end=seg.end, words=words,
))
if i % 50 == 0:
console.print(f"[dim] {i} segments... ({seg.end:.0f}s)[/dim]")
console.print(f"[green]Transcription complete: {len(segments)} segments[/green]")

View File

@@ -319,8 +319,11 @@ class VoiceProfiler:
best_match = name
if best_score >= threshold:
if best_match and self.profiles[best_match].role == "host":
role = self.profiles[best_match].role if best_match else "unknown"
if role == "host":
label = f"Host: {best_match}"
elif role == "cohost":
label = f"Cohost: {best_match}"
else:
label = best_match
else: