""" Q&A pair extraction from diarized transcripts. Identifies exchanges where a CALLER asks a question and the HOST answers. Outputs structured Q&A pairs with timestamps for clip extraction and indexing. """ import json import re from dataclasses import dataclass, field from pathlib import Path from typing import Optional from rich.console import Console console = Console() # Phrases that signal a caller is asking a question QUESTION_SIGNALS = [ r"\?", r"\bhow (do|can|should|would|does)\b", r"\bwhat (is|are|should|can|do|does|about)\b", r"\bwhy (is|are|does|do|would|should)\b", r"\bis (it|there|this|that) (true|safe|possible|good|bad|worth)\b", r"\bshould i\b", r"\bcan you\b", r"\bi (was wondering|wanted to ask|have a question)\b", ] QUESTION_PATTERN = re.compile("|".join(QUESTION_SIGNALS), re.IGNORECASE) # Minimum durations for a meaningful exchange MIN_QUESTION_DURATION = 5.0 # seconds MIN_ANSWER_DURATION = 15.0 # seconds MAX_GAP_BETWEEN_QA = 30.0 # seconds between question end and answer start # ── Promo / bumper filter ────────────────────────────────────────────────── # Promos evolve across years but preserve signature phrases. # Weight 2 = highly distinctive (one match sufficient to filter). # Weight 1 = semi-generic (need 2+ to filter). # A question turn with total score >= PROMO_SCORE_THRESHOLD is suppressed. PROMO_SCORE_THRESHOLD = 2 _PROMO_SIGS: list[tuple[re.Pattern, int]] = [ # Highly distinctive — score 2 each (re.compile(r"acquired a life of its own", re.I), 2), (re.compile(r"simply desire a deeper", re.I), 2), (re.compile(r"tame that beast", re.I), 2), (re.compile(r"mike swanson will be back after", re.I), 2), (re.compile(r"heaven forbid.{0,20}virus", re.I | re.DOTALL), 2), (re.compile(r"mike swanson is answering all", re.I), 2), # Semi-distinctive — score 1 each, need two to filter (re.compile(r"\bcomputer running slow\b", re.I), 1), (re.compile(r"\bafter these messages\b", re.I), 1), (re.compile(r"\b790.?2040\b", re.I), 1), (re.compile(r"\b751.?1041\b", re.I), 1), (re.compile(r"\bgurushow\.com\b", re.I), 1), (re.compile(r"\bcall in now\b", re.I), 1), (re.compile(r"\bcomputer troubles\?", re.I), 1), (re.compile(r"\bhardware installation\b", re.I), 1), (re.compile(r"we.?ll get your problem solved", re.I), 1), ] def _is_promo_or_bumper(text: str) -> bool: """Return True if text scores above threshold on show promo/bumper signatures.""" score = sum(w for pat, w in _PROMO_SIGS if pat.search(text)) return score >= PROMO_SCORE_THRESHOLD @dataclass class QAPair: question_start: float question_end: float answer_start: float answer_end: float question_text: str answer_text: str topic: Optional[str] = None topic_tags: list[str] = field(default_factory=list) # Caller identification — populated by attach_caller_names() when a # transcript-side speaker intro precedes the question turn. caller_name: Optional[str] = None caller_role: Optional[str] = None # "caller" / "guest" / "fillin" def to_dict(self) -> dict: return { "question_start": self.question_start, "question_end": self.question_end, "answer_start": self.answer_start, "answer_end": self.answer_end, "question_text": self.question_text, "answer_text": self.answer_text, "topic": self.topic, "topic_tags": self.topic_tags, "caller_name": self.caller_name, "caller_role": self.caller_role, } def clip_start(self, padding: float = 1.5) -> float: return max(0.0, self.question_start - padding) def clip_end(self, padding: float = 1.5) -> float: return self.answer_end + padding def duration(self) -> float: return self.answer_end - self.question_start def attach_caller_names(pairs: list[QAPair], transcript_segments: list[dict]) -> list[QAPair]: """Attach caller names to Q&A pairs using transcript-side speaker intros. For each pair, looks up the active intro at the question_start time and populates caller_name / caller_role. Mutates the input pairs and returns the list for chaining. """ from .speaker_oracle import extract_intros, speaker_at intros = extract_intros(transcript_segments) for pair in pairs: intro = speaker_at(pair.question_start, intros) if intro is not None: pair.caller_name = intro.name pair.caller_role = intro.role_hint return pairs def extract_qa_pairs(diarized_segments: list[dict]) -> list[QAPair]: """ Extract caller Q&A pairs from diarized transcript segments. Each segment dict: {start, end, text, speaker} Speaker values: "HOST", "CALLER", "UNKNOWN" """ pairs = [] # Group consecutive segments by speaker into speaker turns turns = _merge_consecutive_speaker_turns(diarized_segments) # Check if diarization produced any non-HOST speakers has_caller_labels = any(t["speaker"] in ("CALLER", "UNKNOWN") for t in turns) if not has_caller_labels: # Diarization labels are absent or unreliable — fall back to text-pattern detection return _extract_qa_text_only(turns) i = 0 while i < len(turns): turn = turns[i] # Look for a CALLER turn that looks like a question if turn["speaker"] in ("CALLER", "UNKNOWN") and _looks_like_question(turn["text"]): if _is_promo_or_bumper(turn["text"]): i += 1 continue # Skip the opening 90s — real callers never call before the show starts if turn["start"] < 90: i += 1 continue q_duration = turn["end"] - turn["start"] if q_duration < MIN_QUESTION_DURATION: i += 1 continue # Require caller-intro context: host must have introduced the call, OR # the caller opens with a phone greeting ("hello", "hi", "hey") if not _preceded_by_caller_intro(turns, i) and not _PHONE_GREETING.match(turn["text"].strip()): i += 1 continue # Look ahead for HOST answer turn(s) j = i + 1 answer_turns = [] while j < len(turns): next_turn = turns[j] gap = next_turn["start"] - turns[j - 1]["end"] if gap > MAX_GAP_BETWEEN_QA and not answer_turns: break # too big a gap before any answer if next_turn["speaker"] == "HOST": answer_turns.append(next_turn) # Keep collecting consecutive HOST turns j += 1 while j < len(turns) and turns[j]["speaker"] == "HOST": answer_turns.append(turns[j]) j += 1 break elif next_turn["speaker"] in ("CALLER", "UNKNOWN"): # Another caller turn before host answered — skip this question break else: j += 1 if answer_turns: answer_text = " ".join(t["text"] for t in answer_turns) answer_duration = answer_turns[-1]["end"] - answer_turns[0]["start"] if answer_duration >= MIN_ANSWER_DURATION: pairs.append(QAPair( question_start=turn["start"], question_end=turn["end"], answer_start=answer_turns[0]["start"], answer_end=answer_turns[-1]["end"], question_text=turn["text"].strip(), answer_text=answer_text.strip(), )) i = j continue i += 1 return pairs # Maximum duration for a question turn in text-only mode — avoids capturing monologues _MAX_QUESTION_S_TEXT_MODE = 90.0 # Caller introduction phrases Mike uses before taking a call _CALLER_INTRO = re.compile( r"\b(let'?s go to|going to the phones?|you'?re on the air|on the air|" r"first caller|next caller|caller from|go ahead|what'?s (your question|going on)|" r"welcome to the show|thanks for calling|thank you for calling|" r"our (first|next|last) (caller|call)|taking (a |your )?call)\b", re.IGNORECASE, ) def _extract_qa_text_only(turns: list[dict]) -> list[QAPair]: """ Q&A extraction when speaker labels are unavailable or all HOST. Uses text patterns to identify question anchors. Works well for call-in radio format where callers describe problems and the host answers at length. Captures both genuine caller questions and Mike's own rhetorical Q&A segments. """ pairs = [] i = 0 while i < len(turns): turn = turns[i] q_duration = turn["end"] - turn["start"] is_q_candidate = ( _looks_like_question(turn["text"]) and MIN_QUESTION_DURATION <= q_duration <= _MAX_QUESTION_S_TEXT_MODE ) # Also treat segments immediately after a caller-intro phrase as candidates if not is_q_candidate and i > 0: prev_text = turns[i - 1]["text"] if _CALLER_INTRO.search(prev_text) and q_duration >= MIN_QUESTION_DURATION: is_q_candidate = True if is_q_candidate and _is_promo_or_bumper(turn["text"]): i += 1 continue if is_q_candidate: # Collect following segments as the answer until we hit another question j = i + 1 answer_turns = [] while j < len(turns): next_turn = turns[j] gap = next_turn["start"] - turns[j - 1]["end"] if gap > MAX_GAP_BETWEEN_QA and not answer_turns: break # Stop collecting if we hit another short question-pattern turn if ( _looks_like_question(next_turn["text"]) and (next_turn["end"] - next_turn["start"]) <= _MAX_QUESTION_S_TEXT_MODE and answer_turns ): break answer_turns.append(next_turn) j += 1 # Stop once we have a substantial answer block if answer_turns: ans_dur = answer_turns[-1]["end"] - answer_turns[0]["start"] if ans_dur >= MIN_ANSWER_DURATION * 3: break if answer_turns: answer_text = " ".join(t["text"] for t in answer_turns) answer_duration = answer_turns[-1]["end"] - answer_turns[0]["start"] if answer_duration >= MIN_ANSWER_DURATION: pairs.append(QAPair( question_start=turn["start"], question_end=turn["end"], answer_start=answer_turns[0]["start"], answer_end=answer_turns[-1]["end"], question_text=turn["text"].strip(), answer_text=answer_text.strip(), )) i = j continue i += 1 return pairs def tag_qa_pairs_with_ollama(pairs: list[QAPair], ollama_host: str = "http://localhost:11434", model: str = "qwen3:14b") -> list[QAPair]: """Use Ollama to tag each Q&A pair with a topic and tags.""" try: import ollama client = ollama.Client(host=ollama_host) except ImportError: console.print("[yellow]ollama not installed — skipping topic tagging[/yellow]") return pairs for i, pair in enumerate(pairs): console.print(f"[dim]Tagging Q&A {i+1}/{len(pairs)}...[/dim]") try: prompt = ( f"A radio show caller asked:\n\"{pair.question_text[:300]}\"\n\n" f"The host answered:\n\"{pair.answer_text[:500]}\"\n\n" "Respond with JSON only, no explanation:\n" '{"topic": "short topic name (3-5 words)", "tags": ["tag1", "tag2", "tag3"]}' ) resp = client.chat( model=model, messages=[{"role": "user", "content": prompt}], options={"temperature": 0}, ) raw = resp["message"]["content"].strip() # Extract JSON from response start = raw.find("{") end = raw.rfind("}") + 1 if start >= 0 and end > start: data = json.loads(raw[start:end]) pair.topic = data.get("topic", "") pair.topic_tags = data.get("tags", []) except Exception as e: console.print(f"[yellow]Tagging failed for pair {i+1}: {e}[/yellow]") return pairs def load_diarized_transcript(transcript_path: Path, diarization_path: Optional[Path]) -> list[dict]: """ Merge transcript and diarization into speaker-labeled segments. Falls back to HOST-only if no diarization available. """ with open(transcript_path) as f: transcript = json.load(f) segments = transcript["segments"] if diarization_path is None or not diarization_path.exists(): return [ {"start": s["start"], "end": s["end"], "text": s["text"], "speaker": "HOST"} for s in segments ] with open(diarization_path) as f: diarization = json.load(f) raw_turns = diarization.get("turns", []) # Resolve overlapping boundaries left by the sliding-window diarizer: # place each transition at the midpoint of the overlap region. resolved: list[dict] = [] for turn in sorted(raw_turns, key=lambda t: t["start"]): if not resolved: resolved.append(dict(turn)) continue prev = resolved[-1] if turn["start"] < prev["end"]: mid = (turn["start"] + prev["end"]) / 2 prev["end"] = mid resolved.append({**turn, "start": mid}) else: resolved.append(dict(turn)) turns = resolved # Minimum CALLER coverage to label a transcript segment as CALLER. # Batch transcription produces ~25s segments; caller windows are 10s. # Require 4s of CALLER overlap so brief HOST-edge segments aren't over-claimed. _CALLER_MIN_S = 4.0 def speaker_for_segment(seg_start: float, seg_end: float) -> str: caller_cov = 0.0 coverage: dict[str, float] = {} for turn in turns: overlap = min(seg_end, turn["end"]) - max(seg_start, turn["start"]) if overlap <= 0: continue coverage[turn["speaker"]] = coverage.get(turn["speaker"], 0) + overlap if turn["speaker"] == "CALLER": caller_cov += overlap if not coverage: return "UNKNOWN" if caller_cov >= _CALLER_MIN_S: return "CALLER" return max(coverage, key=coverage.__getitem__) return [ {"start": s["start"], "end": s["end"], "text": s["text"], "speaker": speaker_for_segment(s["start"], s["end"])} for s in segments ] # ── Helpers ──────────────────────────────────────────────────────────────── _PHONE_GREETING = re.compile(r"^(hello|hi|hey|good (morning|afternoon|evening))\b", re.IGNORECASE) def _preceded_by_caller_intro(turns: list[dict], idx: int, max_host_turns: int = 2) -> bool: """Return True if a preceding HOST turn (within max_host_turns HOST turns) contains a caller-intro phrase.""" host_count = 0 for j in range(idx - 1, -1, -1): if turns[j]["speaker"] == "HOST": if _CALLER_INTRO.search(turns[j]["text"]): return True host_count += 1 if host_count >= max_host_turns: break return False def _looks_like_question(text: str) -> bool: return bool(QUESTION_PATTERN.search(text)) def _merge_consecutive_speaker_turns(segments: list[dict]) -> list[dict]: """Merge adjacent segments from the same speaker into continuous turns.""" if not segments: return [] turns = [] current = dict(segments[0]) for seg in segments[1:]: if seg["speaker"] == current["speaker"]: current["end"] = seg["end"] current["text"] = current["text"].rstrip() + " " + seg["text"].lstrip() else: turns.append(current) current = dict(seg) turns.append(current) return turns