radio: attach caller names to Q&A pairs from transcript intros
QAPair gets caller_name and caller_role fields populated by a new attach_caller_names(pairs, transcript_segments) helper. For each pair, finds the active opening intro at the question_start time (8s forward tolerance, no backward limit — a caller's call can run for 10+ minutes and the intro happens once at the start) and attaches the speaker name. Validation on 9-episode test set: 19/19 Q&A pairs (100%) now have caller names attached. Examples of corrections from oracle attribution: 2018-s10e18 @ 73:36 Christopher (was misattributed to "Tara") 2015-s7e19 @ 35:45 William (was misattributed to "Tara") 2010-05-08-hr1 Jackie x3, Bruce 2012-03-10-hr1 Adam x2 2016-s8e43 John, Doug 2017-s9e30 Tom, Denise x3, Charlie speaker_oracle.py: adds speaker_at(time, intros) helper used both by the existing resolve_speakers() and the new caller-name attachment. Also adds the "let's fit/bring/put X in/on" intro pattern variant (caught Charlie at 70:21 in 2017-s9e30 that "talk to X" missed). download_full_archive.py: SSH keepalive every 30s + per-file retry-on- failure (up to 3 attempts with reconnect). Earlier run hung on a dead connection at file 109 of 589 with no recovery; restarted run is now running at ~10 MB/s vs ~2-3 MB/s before. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -17,7 +17,7 @@ ensure_cuda_libs()
|
|||||||
import torch
|
import torch
|
||||||
from src.config import load_config
|
from src.config import load_config
|
||||||
from src.diarizer import diarize, VoiceProfileStore
|
from src.diarizer import diarize, VoiceProfileStore
|
||||||
from src.qa_extractor import load_diarized_transcript, extract_qa_pairs
|
from src.qa_extractor import load_diarized_transcript, extract_qa_pairs, attach_caller_names
|
||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
from rich.table import Table
|
from rich.table import Table
|
||||||
|
|
||||||
@@ -180,8 +180,16 @@ for ep, transcript_path, audio_dur, _ in trans_results:
|
|||||||
diarization_path = trans_ep_dir / "diarization.json"
|
diarization_path = trans_ep_dir / "diarization.json"
|
||||||
segments = load_diarized_transcript(transcript_path, diarization_path)
|
segments = load_diarized_transcript(transcript_path, diarization_path)
|
||||||
pairs = extract_qa_pairs(segments)
|
pairs = extract_qa_pairs(segments)
|
||||||
|
|
||||||
|
# Attach caller names from transcript intros
|
||||||
|
with open(transcript_path) as f:
|
||||||
|
td = _json.load(f)
|
||||||
|
attach_caller_names(pairs, td.get("segments", []))
|
||||||
|
|
||||||
|
named = sum(1 for p in pairs if p.caller_name)
|
||||||
|
name_str = ", ".join(p.caller_name for p in pairs if p.caller_name) or "—"
|
||||||
qa_rows.append((ep.stem, len(pairs)))
|
qa_rows.append((ep.stem, len(pairs)))
|
||||||
console.print(f" {ep.stem}: {len(pairs)} Q&A pairs")
|
console.print(f" {ep.stem}: {len(pairs)} pairs ({named} named: {name_str})")
|
||||||
|
|
||||||
# ── Summary ────────────────────────────────────────────────────────────────
|
# ── Summary ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|||||||
@@ -22,12 +22,22 @@ LOCAL_ROOT.mkdir(parents=True, exist_ok=True)
|
|||||||
REMOTE_ROOT = "/home/gurushow/public_html/archive"
|
REMOTE_ROOT = "/home/gurushow/public_html/archive"
|
||||||
YEARS = ["2010", "2011", "2012", "2014", "2015", "2016", "2017", "2018"]
|
YEARS = ["2010", "2011", "2012", "2014", "2015", "2016", "2017", "2018"]
|
||||||
|
|
||||||
|
def connect():
|
||||||
|
c = paramiko.SSHClient()
|
||||||
|
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||||
|
c.connect("172.16.3.10", username="root", password=password,
|
||||||
|
look_for_keys=False, allow_agent=False,
|
||||||
|
timeout=30, banner_timeout=30, auth_timeout=30)
|
||||||
|
transport = c.get_transport()
|
||||||
|
if transport is not None:
|
||||||
|
transport.set_keepalive(30) # send keepalive every 30s
|
||||||
|
s = c.open_sftp()
|
||||||
|
s.get_channel().settimeout(120) # per-operation timeout
|
||||||
|
return c, s
|
||||||
|
|
||||||
|
|
||||||
print(f"Connecting to 172.16.3.10...", flush=True)
|
print(f"Connecting to 172.16.3.10...", flush=True)
|
||||||
client = paramiko.SSHClient()
|
client, sftp = connect()
|
||||||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
|
||||||
client.connect("172.16.3.10", username="root", password=password,
|
|
||||||
look_for_keys=False, allow_agent=False, timeout=30)
|
|
||||||
sftp = client.open_sftp()
|
|
||||||
print("Connected.", flush=True)
|
print("Connected.", flush=True)
|
||||||
|
|
||||||
|
|
||||||
@@ -75,6 +85,10 @@ for year in YEARS:
|
|||||||
size_mb = remote_size / 1024 / 1024
|
size_mb = remote_size / 1024 / 1024
|
||||||
print(f" [{downloaded_files + 1:3d}] {rel} ({size_mb:.1f} MB)...", end="", flush=True)
|
print(f" [{downloaded_files + 1:3d}] {rel} ({size_mb:.1f} MB)...", end="", flush=True)
|
||||||
t0 = time.monotonic()
|
t0 = time.monotonic()
|
||||||
|
|
||||||
|
attempt = 0
|
||||||
|
while True:
|
||||||
|
attempt += 1
|
||||||
try:
|
try:
|
||||||
sftp.get(remote, str(local))
|
sftp.get(remote, str(local))
|
||||||
elapsed = time.monotonic() - t0
|
elapsed = time.monotonic() - t0
|
||||||
@@ -82,9 +96,21 @@ for year in YEARS:
|
|||||||
print(f" done ({elapsed:.1f}s, {mbps:.1f} MB/s)", flush=True)
|
print(f" done ({elapsed:.1f}s, {mbps:.1f} MB/s)", flush=True)
|
||||||
downloaded_files += 1
|
downloaded_files += 1
|
||||||
downloaded_bytes += remote_size
|
downloaded_bytes += remote_size
|
||||||
|
break
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f" FAILED: {e}", flush=True)
|
if attempt >= 3:
|
||||||
|
print(f" FAILED after {attempt} attempts: {e}", flush=True)
|
||||||
errors.append(f"get {remote}: {e}")
|
errors.append(f"get {remote}: {e}")
|
||||||
|
break
|
||||||
|
print(f" retry {attempt} ({e})...", end="", flush=True)
|
||||||
|
# Reconnect on failure
|
||||||
|
try:
|
||||||
|
sftp.close()
|
||||||
|
client.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
time.sleep(5)
|
||||||
|
client, sftp = connect()
|
||||||
|
|
||||||
elapsed_total = time.monotonic() - t_start
|
elapsed_total = time.monotonic() - t_start
|
||||||
print(f"\n=== Summary ===", flush=True)
|
print(f"\n=== Summary ===", flush=True)
|
||||||
|
|||||||
@@ -78,6 +78,10 @@ class QAPair:
|
|||||||
answer_text: str
|
answer_text: str
|
||||||
topic: Optional[str] = None
|
topic: Optional[str] = None
|
||||||
topic_tags: list[str] = field(default_factory=list)
|
topic_tags: list[str] = field(default_factory=list)
|
||||||
|
# Caller identification — populated by attach_caller_names() when a
|
||||||
|
# transcript-side speaker intro precedes the question turn.
|
||||||
|
caller_name: Optional[str] = None
|
||||||
|
caller_role: Optional[str] = None # "caller" / "guest" / "fillin"
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
def to_dict(self) -> dict:
|
||||||
return {
|
return {
|
||||||
@@ -89,6 +93,8 @@ class QAPair:
|
|||||||
"answer_text": self.answer_text,
|
"answer_text": self.answer_text,
|
||||||
"topic": self.topic,
|
"topic": self.topic,
|
||||||
"topic_tags": self.topic_tags,
|
"topic_tags": self.topic_tags,
|
||||||
|
"caller_name": self.caller_name,
|
||||||
|
"caller_role": self.caller_role,
|
||||||
}
|
}
|
||||||
|
|
||||||
def clip_start(self, padding: float = 1.5) -> float:
|
def clip_start(self, padding: float = 1.5) -> float:
|
||||||
@@ -101,6 +107,24 @@ class QAPair:
|
|||||||
return self.answer_end - self.question_start
|
return self.answer_end - self.question_start
|
||||||
|
|
||||||
|
|
||||||
|
def attach_caller_names(pairs: list[QAPair],
|
||||||
|
transcript_segments: list[dict]) -> list[QAPair]:
|
||||||
|
"""Attach caller names to Q&A pairs using transcript-side speaker intros.
|
||||||
|
|
||||||
|
For each pair, looks up the active intro at the question_start time and
|
||||||
|
populates caller_name / caller_role. Mutates the input pairs and returns
|
||||||
|
the list for chaining.
|
||||||
|
"""
|
||||||
|
from .speaker_oracle import extract_intros, speaker_at
|
||||||
|
intros = extract_intros(transcript_segments)
|
||||||
|
for pair in pairs:
|
||||||
|
intro = speaker_at(pair.question_start, intros)
|
||||||
|
if intro is not None:
|
||||||
|
pair.caller_name = intro.name
|
||||||
|
pair.caller_role = intro.role_hint
|
||||||
|
return pairs
|
||||||
|
|
||||||
|
|
||||||
def extract_qa_pairs(diarized_segments: list[dict]) -> list[QAPair]:
|
def extract_qa_pairs(diarized_segments: list[dict]) -> list[QAPair]:
|
||||||
"""
|
"""
|
||||||
Extract caller Q&A pairs from diarized transcript segments.
|
Extract caller Q&A pairs from diarized transcript segments.
|
||||||
|
|||||||
@@ -41,6 +41,11 @@ _INTRO_PATTERNS: list[tuple[re.Pattern, str, int, int | None]] = [
|
|||||||
r"\blet.s\s+(?:go\s+ahead\s+and\s+)?(?:talk|go)\s+to\s+([A-Z][a-z]+)",
|
r"\blet.s\s+(?:go\s+ahead\s+and\s+)?(?:talk|go)\s+to\s+([A-Z][a-z]+)",
|
||||||
re.I), "caller", 1, None),
|
re.I), "caller", 1, None),
|
||||||
|
|
||||||
|
# "let's fit <name> in" / "let's bring <name> on" — caller pickup variants
|
||||||
|
(re.compile(
|
||||||
|
r"\blet.s\s+(?:go\s+ahead\s+and\s+)?(?:fit|bring|put)\s+([A-Z][a-z]+)\s+(?:in|on)\b",
|
||||||
|
re.I), "caller", 1, None),
|
||||||
|
|
||||||
# "Hello, <name>. How are you?" — caller pickup
|
# "Hello, <name>. How are you?" — caller pickup
|
||||||
(re.compile(
|
(re.compile(
|
||||||
r"\bhello,?\s+([A-Z][a-z]+)\.?\s+(?:how\s+are\s+you|thanks\s+for|are\s+you\s+there|welcome)",
|
r"\bhello,?\s+([A-Z][a-z]+)\.?\s+(?:how\s+are\s+you|thanks\s+for|are\s+you\s+there|welcome)",
|
||||||
@@ -246,6 +251,26 @@ def resolve_speakers(diarization_turns: list[dict],
|
|||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def speaker_at(time: float, intros: list[SpeakerIntro]) -> SpeakerIntro | None:
|
||||||
|
"""Return the active opening intro at the given time, or None.
|
||||||
|
|
||||||
|
Same lookup logic as resolve_speakers but for a single timestamp,
|
||||||
|
used to attach caller names to Q&A pairs by their question_start time.
|
||||||
|
"""
|
||||||
|
opening = sorted(
|
||||||
|
[i for i in intros if i.role_hint != "caller_close"],
|
||||||
|
key=lambda i: i.intro_time,
|
||||||
|
)
|
||||||
|
best: SpeakerIntro | None = None
|
||||||
|
cutoff = time + _INTRO_FORWARD_TOLERANCE_S
|
||||||
|
for intro in opening:
|
||||||
|
if intro.intro_time <= cutoff:
|
||||||
|
best = intro
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
return best
|
||||||
|
|
||||||
|
|
||||||
def named_speaker_summary(named_turns: list[NamedTurn], duration: float) -> dict[str, float]:
|
def named_speaker_summary(named_turns: list[NamedTurn], duration: float) -> dict[str, float]:
|
||||||
"""Aggregate seconds-by-named-speaker for reporting."""
|
"""Aggregate seconds-by-named-speaker for reporting."""
|
||||||
times: dict[str, float] = {}
|
times: dict[str, float] = {}
|
||||||
|
|||||||
Reference in New Issue
Block a user