""" Index the 6 test episodes into archive.db. Reads pre-computed transcripts + diarization from test-data/transcripts/. """ import os, sys, re os.environ["PYTHONIOENCODING"] = "utf-8" os.environ["TRANSFORMERS_OFFLINE"] = "1" if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") from pathlib import Path from src.indexer import ArchiveIndex from src.qa_extractor import load_diarized_transcript, extract_qa_pairs from rich.console import Console from rich.table import Table console = Console() BASE = Path(__file__).parent TRANS_DIR = BASE / "test-data" / "transcripts" EP_DIR = BASE / "test-data" / "episodes" DB_PATH = BASE / "archive.db" _DATE_RE = re.compile(r"^(\d{4}-\d{2}-\d{2})") def parse_episode_meta(ep_id: str) -> tuple[str, int | None]: """Return (date_str_or_year, hr) from episode directory name.""" m = _DATE_RE.match(ep_id) if m: date = m.group(1) hr = int(ep_id[-1]) if ep_id.endswith(("-hr1", "-hr2")) else None return date, hr # season/episode format e.g. 2016-s8e43 — use year only year = ep_id[:4] return year, None console.print(f"\n[bold]Indexing test episodes into {DB_PATH.name}[/bold]") with ArchiveIndex(DB_PATH) as idx: rows = [] for ep_dir in sorted(TRANS_DIR.iterdir()): t_path = ep_dir / "transcript.json" d_path = ep_dir / "diarization.json" if not t_path.exists(): continue ep_id = ep_dir.name date, hr = parse_episode_meta(ep_id) audio_path = EP_DIR / f"{ep_id}.mp3" # Episode duration from transcript import json with open(t_path) as f: td = json.load(f) duration = td.get("duration", 0) # Register episode idx.add_episode( episode_id=ep_id, audio_path=audio_path, date=date, duration=duration, hr=hr, ) # Load diarized segments and index segs = load_diarized_transcript(t_path, d_path if d_path.exists() else None) idx.add_segments(ep_id, segs) # Extract and index Q&A pairs pairs = extract_qa_pairs(segs) for p in pairs: idx.add_qa_pair( episode_id=ep_id, q_start=p.question_start, q_end=p.question_end, a_start=p.answer_start, a_end=p.answer_end, question=p.question_text, answer=p.answer_text, topic=p.topic, tags=p.topic_tags, ) rows.append((ep_id, date, f"{duration:.0f}s", len(segs), len(pairs))) console.print(f" [green]{ep_id}[/green]: {len(segs)} segs, {len(pairs)} Q&A pairs") stats = idx.stats() table = Table(title="Index Summary") table.add_column("Episode") table.add_column("Date") table.add_column("Duration") table.add_column("Segments") table.add_column("Q&A") for ep_id, date, dur, segs, qa in rows: table.add_row(ep_id, date, dur, str(segs), str(qa)) console.print() console.print(table) console.print(f"\n[bold]DB totals:[/bold] {stats['episodes']} episodes, " f"{stats['segments']} segments, {stats['qa_pairs']} Q&A pairs") console.print(f"[dim]DB path: {DB_PATH}[/dim]")