diff --git a/projects/radio-show/audio-processor/benchmark.py b/projects/radio-show/audio-processor/benchmark.py index 2d8b06e..7b0f094 100644 --- a/projects/radio-show/audio-processor/benchmark.py +++ b/projects/radio-show/audio-processor/benchmark.py @@ -17,7 +17,7 @@ ensure_cuda_libs() import torch from src.config import load_config from src.diarizer import diarize, VoiceProfileStore -from src.qa_extractor import load_diarized_transcript, extract_qa_pairs +from src.qa_extractor import load_diarized_transcript, extract_qa_pairs, attach_caller_names from rich.console import Console from rich.table import Table @@ -180,8 +180,16 @@ for ep, transcript_path, audio_dur, _ in trans_results: diarization_path = trans_ep_dir / "diarization.json" segments = load_diarized_transcript(transcript_path, diarization_path) pairs = extract_qa_pairs(segments) + + # Attach caller names from transcript intros + with open(transcript_path) as f: + td = _json.load(f) + attach_caller_names(pairs, td.get("segments", [])) + + named = sum(1 for p in pairs if p.caller_name) + name_str = ", ".join(p.caller_name for p in pairs if p.caller_name) or "—" qa_rows.append((ep.stem, len(pairs))) - console.print(f" {ep.stem}: {len(pairs)} Q&A pairs") + console.print(f" {ep.stem}: {len(pairs)} pairs ({named} named: {name_str})") # ── Summary ──────────────────────────────────────────────────────────────── diff --git a/projects/radio-show/audio-processor/download_full_archive.py b/projects/radio-show/audio-processor/download_full_archive.py index 42fa53e..f3a7a01 100644 --- a/projects/radio-show/audio-processor/download_full_archive.py +++ b/projects/radio-show/audio-processor/download_full_archive.py @@ -22,12 +22,22 @@ LOCAL_ROOT.mkdir(parents=True, exist_ok=True) REMOTE_ROOT = "/home/gurushow/public_html/archive" YEARS = ["2010", "2011", "2012", "2014", "2015", "2016", "2017", "2018"] +def connect(): + c = paramiko.SSHClient() + c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + c.connect("172.16.3.10", username="root", password=password, + look_for_keys=False, allow_agent=False, + timeout=30, banner_timeout=30, auth_timeout=30) + transport = c.get_transport() + if transport is not None: + transport.set_keepalive(30) # send keepalive every 30s + s = c.open_sftp() + s.get_channel().settimeout(120) # per-operation timeout + return c, s + + print(f"Connecting to 172.16.3.10...", flush=True) -client = paramiko.SSHClient() -client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) -client.connect("172.16.3.10", username="root", password=password, - look_for_keys=False, allow_agent=False, timeout=30) -sftp = client.open_sftp() +client, sftp = connect() print("Connected.", flush=True) @@ -75,16 +85,32 @@ for year in YEARS: size_mb = remote_size / 1024 / 1024 print(f" [{downloaded_files + 1:3d}] {rel} ({size_mb:.1f} MB)...", end="", flush=True) t0 = time.monotonic() - try: - sftp.get(remote, str(local)) - elapsed = time.monotonic() - t0 - mbps = size_mb / elapsed if elapsed > 0 else 0 - print(f" done ({elapsed:.1f}s, {mbps:.1f} MB/s)", flush=True) - downloaded_files += 1 - downloaded_bytes += remote_size - except Exception as e: - print(f" FAILED: {e}", flush=True) - errors.append(f"get {remote}: {e}") + + attempt = 0 + while True: + attempt += 1 + try: + sftp.get(remote, str(local)) + elapsed = time.monotonic() - t0 + mbps = size_mb / elapsed if elapsed > 0 else 0 + print(f" done ({elapsed:.1f}s, {mbps:.1f} MB/s)", flush=True) + downloaded_files += 1 + downloaded_bytes += remote_size + break + except Exception as e: + if attempt >= 3: + print(f" FAILED after {attempt} attempts: {e}", flush=True) + errors.append(f"get {remote}: {e}") + break + print(f" retry {attempt} ({e})...", end="", flush=True) + # Reconnect on failure + try: + sftp.close() + client.close() + except Exception: + pass + time.sleep(5) + client, sftp = connect() elapsed_total = time.monotonic() - t_start print(f"\n=== Summary ===", flush=True) diff --git a/projects/radio-show/audio-processor/src/qa_extractor.py b/projects/radio-show/audio-processor/src/qa_extractor.py index 06eb3d9..513776b 100644 --- a/projects/radio-show/audio-processor/src/qa_extractor.py +++ b/projects/radio-show/audio-processor/src/qa_extractor.py @@ -78,6 +78,10 @@ class QAPair: answer_text: str topic: Optional[str] = None topic_tags: list[str] = field(default_factory=list) + # Caller identification — populated by attach_caller_names() when a + # transcript-side speaker intro precedes the question turn. + caller_name: Optional[str] = None + caller_role: Optional[str] = None # "caller" / "guest" / "fillin" def to_dict(self) -> dict: return { @@ -89,6 +93,8 @@ class QAPair: "answer_text": self.answer_text, "topic": self.topic, "topic_tags": self.topic_tags, + "caller_name": self.caller_name, + "caller_role": self.caller_role, } def clip_start(self, padding: float = 1.5) -> float: @@ -101,6 +107,24 @@ class QAPair: return self.answer_end - self.question_start +def attach_caller_names(pairs: list[QAPair], + transcript_segments: list[dict]) -> list[QAPair]: + """Attach caller names to Q&A pairs using transcript-side speaker intros. + + For each pair, looks up the active intro at the question_start time and + populates caller_name / caller_role. Mutates the input pairs and returns + the list for chaining. + """ + from .speaker_oracle import extract_intros, speaker_at + intros = extract_intros(transcript_segments) + for pair in pairs: + intro = speaker_at(pair.question_start, intros) + if intro is not None: + pair.caller_name = intro.name + pair.caller_role = intro.role_hint + return pairs + + def extract_qa_pairs(diarized_segments: list[dict]) -> list[QAPair]: """ Extract caller Q&A pairs from diarized transcript segments. diff --git a/projects/radio-show/audio-processor/src/speaker_oracle.py b/projects/radio-show/audio-processor/src/speaker_oracle.py index e88ab69..5cef747 100644 --- a/projects/radio-show/audio-processor/src/speaker_oracle.py +++ b/projects/radio-show/audio-processor/src/speaker_oracle.py @@ -41,6 +41,11 @@ _INTRO_PATTERNS: list[tuple[re.Pattern, str, int, int | None]] = [ r"\blet.s\s+(?:go\s+ahead\s+and\s+)?(?:talk|go)\s+to\s+([A-Z][a-z]+)", re.I), "caller", 1, None), + # "let's fit in" / "let's bring on" — caller pickup variants + (re.compile( + r"\blet.s\s+(?:go\s+ahead\s+and\s+)?(?:fit|bring|put)\s+([A-Z][a-z]+)\s+(?:in|on)\b", + re.I), "caller", 1, None), + # "Hello, . How are you?" — caller pickup (re.compile( r"\bhello,?\s+([A-Z][a-z]+)\.?\s+(?:how\s+are\s+you|thanks\s+for|are\s+you\s+there|welcome)", @@ -246,6 +251,26 @@ def resolve_speakers(diarization_turns: list[dict], return out +def speaker_at(time: float, intros: list[SpeakerIntro]) -> SpeakerIntro | None: + """Return the active opening intro at the given time, or None. + + Same lookup logic as resolve_speakers but for a single timestamp, + used to attach caller names to Q&A pairs by their question_start time. + """ + opening = sorted( + [i for i in intros if i.role_hint != "caller_close"], + key=lambda i: i.intro_time, + ) + best: SpeakerIntro | None = None + cutoff = time + _INTRO_FORWARD_TOLERANCE_S + for intro in opening: + if intro.intro_time <= cutoff: + best = intro + else: + break + return best + + def named_speaker_summary(named_turns: list[NamedTurn], duration: float) -> dict[str, float]: """Aggregate seconds-by-named-speaker for reporting.""" times: dict[str, float] = {}