radio: attach caller names to Q&A pairs from transcript intros

QAPair gets caller_name and caller_role fields populated by a new
attach_caller_names(pairs, transcript_segments) helper. For each pair,
finds the active opening intro at the question_start time (8s forward
tolerance, no backward limit — a caller's call can run for 10+ minutes
and the intro happens once at the start) and attaches the speaker name.

Validation on 9-episode test set:
  19/19 Q&A pairs (100%) now have caller names attached.

Examples of corrections from oracle attribution:
  2018-s10e18 @ 73:36  Christopher (was misattributed to "Tara")
  2015-s7e19 @ 35:45   William     (was misattributed to "Tara")
  2010-05-08-hr1       Jackie x3, Bruce
  2012-03-10-hr1       Adam x2
  2016-s8e43           John, Doug
  2017-s9e30           Tom, Denise x3, Charlie

speaker_oracle.py: adds speaker_at(time, intros) helper used both by the
existing resolve_speakers() and the new caller-name attachment. Also
adds the "let's fit/bring/put X in/on" intro pattern variant (caught
Charlie at 70:21 in 2017-s9e30 that "talk to X" missed).

download_full_archive.py: SSH keepalive every 30s + per-file retry-on-
failure (up to 3 attempts with reconnect). Earlier run hung on a dead
connection at file 109 of 589 with no recovery; restarted run is now
running at ~10 MB/s vs ~2-3 MB/s before.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-04-27 16:55:31 -07:00
parent 1b574caba4
commit 488bf5849e
4 changed files with 100 additions and 17 deletions

View File

@@ -17,7 +17,7 @@ ensure_cuda_libs()
import torch
from src.config import load_config
from src.diarizer import diarize, VoiceProfileStore
from src.qa_extractor import load_diarized_transcript, extract_qa_pairs
from src.qa_extractor import load_diarized_transcript, extract_qa_pairs, attach_caller_names
from rich.console import Console
from rich.table import Table
@@ -180,8 +180,16 @@ for ep, transcript_path, audio_dur, _ in trans_results:
diarization_path = trans_ep_dir / "diarization.json"
segments = load_diarized_transcript(transcript_path, diarization_path)
pairs = extract_qa_pairs(segments)
# Attach caller names from transcript intros
with open(transcript_path) as f:
td = _json.load(f)
attach_caller_names(pairs, td.get("segments", []))
named = sum(1 for p in pairs if p.caller_name)
name_str = ", ".join(p.caller_name for p in pairs if p.caller_name) or ""
qa_rows.append((ep.stem, len(pairs)))
console.print(f" {ep.stem}: {len(pairs)} Q&A pairs")
console.print(f" {ep.stem}: {len(pairs)} pairs ({named} named: {name_str})")
# ── Summary ────────────────────────────────────────────────────────────────

View File

@@ -22,12 +22,22 @@ LOCAL_ROOT.mkdir(parents=True, exist_ok=True)
REMOTE_ROOT = "/home/gurushow/public_html/archive"
YEARS = ["2010", "2011", "2012", "2014", "2015", "2016", "2017", "2018"]
def connect():
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect("172.16.3.10", username="root", password=password,
look_for_keys=False, allow_agent=False,
timeout=30, banner_timeout=30, auth_timeout=30)
transport = c.get_transport()
if transport is not None:
transport.set_keepalive(30) # send keepalive every 30s
s = c.open_sftp()
s.get_channel().settimeout(120) # per-operation timeout
return c, s
print(f"Connecting to 172.16.3.10...", flush=True)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect("172.16.3.10", username="root", password=password,
look_for_keys=False, allow_agent=False, timeout=30)
sftp = client.open_sftp()
client, sftp = connect()
print("Connected.", flush=True)
@@ -75,16 +85,32 @@ for year in YEARS:
size_mb = remote_size / 1024 / 1024
print(f" [{downloaded_files + 1:3d}] {rel} ({size_mb:.1f} MB)...", end="", flush=True)
t0 = time.monotonic()
try:
sftp.get(remote, str(local))
elapsed = time.monotonic() - t0
mbps = size_mb / elapsed if elapsed > 0 else 0
print(f" done ({elapsed:.1f}s, {mbps:.1f} MB/s)", flush=True)
downloaded_files += 1
downloaded_bytes += remote_size
except Exception as e:
print(f" FAILED: {e}", flush=True)
errors.append(f"get {remote}: {e}")
attempt = 0
while True:
attempt += 1
try:
sftp.get(remote, str(local))
elapsed = time.monotonic() - t0
mbps = size_mb / elapsed if elapsed > 0 else 0
print(f" done ({elapsed:.1f}s, {mbps:.1f} MB/s)", flush=True)
downloaded_files += 1
downloaded_bytes += remote_size
break
except Exception as e:
if attempt >= 3:
print(f" FAILED after {attempt} attempts: {e}", flush=True)
errors.append(f"get {remote}: {e}")
break
print(f" retry {attempt} ({e})...", end="", flush=True)
# Reconnect on failure
try:
sftp.close()
client.close()
except Exception:
pass
time.sleep(5)
client, sftp = connect()
elapsed_total = time.monotonic() - t_start
print(f"\n=== Summary ===", flush=True)

View File

@@ -78,6 +78,10 @@ class QAPair:
answer_text: str
topic: Optional[str] = None
topic_tags: list[str] = field(default_factory=list)
# Caller identification — populated by attach_caller_names() when a
# transcript-side speaker intro precedes the question turn.
caller_name: Optional[str] = None
caller_role: Optional[str] = None # "caller" / "guest" / "fillin"
def to_dict(self) -> dict:
return {
@@ -89,6 +93,8 @@ class QAPair:
"answer_text": self.answer_text,
"topic": self.topic,
"topic_tags": self.topic_tags,
"caller_name": self.caller_name,
"caller_role": self.caller_role,
}
def clip_start(self, padding: float = 1.5) -> float:
@@ -101,6 +107,24 @@ class QAPair:
return self.answer_end - self.question_start
def attach_caller_names(pairs: list[QAPair],
transcript_segments: list[dict]) -> list[QAPair]:
"""Attach caller names to Q&A pairs using transcript-side speaker intros.
For each pair, looks up the active intro at the question_start time and
populates caller_name / caller_role. Mutates the input pairs and returns
the list for chaining.
"""
from .speaker_oracle import extract_intros, speaker_at
intros = extract_intros(transcript_segments)
for pair in pairs:
intro = speaker_at(pair.question_start, intros)
if intro is not None:
pair.caller_name = intro.name
pair.caller_role = intro.role_hint
return pairs
def extract_qa_pairs(diarized_segments: list[dict]) -> list[QAPair]:
"""
Extract caller Q&A pairs from diarized transcript segments.

View File

@@ -41,6 +41,11 @@ _INTRO_PATTERNS: list[tuple[re.Pattern, str, int, int | None]] = [
r"\blet.s\s+(?:go\s+ahead\s+and\s+)?(?:talk|go)\s+to\s+([A-Z][a-z]+)",
re.I), "caller", 1, None),
# "let's fit <name> in" / "let's bring <name> on" — caller pickup variants
(re.compile(
r"\blet.s\s+(?:go\s+ahead\s+and\s+)?(?:fit|bring|put)\s+([A-Z][a-z]+)\s+(?:in|on)\b",
re.I), "caller", 1, None),
# "Hello, <name>. How are you?" — caller pickup
(re.compile(
r"\bhello,?\s+([A-Z][a-z]+)\.?\s+(?:how\s+are\s+you|thanks\s+for|are\s+you\s+there|welcome)",
@@ -246,6 +251,26 @@ def resolve_speakers(diarization_turns: list[dict],
return out
def speaker_at(time: float, intros: list[SpeakerIntro]) -> SpeakerIntro | None:
"""Return the active opening intro at the given time, or None.
Same lookup logic as resolve_speakers but for a single timestamp,
used to attach caller names to Q&A pairs by their question_start time.
"""
opening = sorted(
[i for i in intros if i.role_hint != "caller_close"],
key=lambda i: i.intro_time,
)
best: SpeakerIntro | None = None
cutoff = time + _INTRO_FORWARD_TOLERANCE_S
for intro in opening:
if intro.intro_time <= cutoff:
best = intro
else:
break
return best
def named_speaker_summary(named_turns: list[NamedTurn], duration: float) -> dict[str, float]:
"""Aggregate seconds-by-named-speaker for reporting."""
times: dict[str, float] = {}