""" Import per-episode pipeline outputs (transcript / diarization / intros / qa) into a single SQLite archive.db. Idempotent: skips episodes whose transcript.json sha256 matches the recorded hash. Re-run after each batch_process pass to keep the DB current. Usage: py import_to_sqlite.py [--db PATH] [--root PATH] [--rebuild] [--vacuum] """ import argparse import hashlib import json import re import sqlite3 import time from datetime import datetime, timezone from pathlib import Path BASE = Path(__file__).parent DEFAULT_ROOT = BASE / "archive-data" / "transcripts" DEFAULT_DB = BASE / "archive-data" / "archive.db" REQUIRED_FILES = ("transcript.json", "diarization.json", "intros.json", "qa.json") SCHEMA = """ CREATE TABLE IF NOT EXISTS episodes ( id INTEGER PRIMARY KEY, rel_path TEXT NOT NULL UNIQUE, year INTEGER NOT NULL, title TEXT, air_date TEXT, duration_sec REAL NOT NULL, language TEXT, language_probability REAL, num_speakers INTEGER, transcript_sha256 TEXT NOT NULL, processed_at TEXT NOT NULL ); CREATE INDEX IF NOT EXISTS idx_episodes_year ON episodes(year); CREATE INDEX IF NOT EXISTS idx_episodes_air_date ON episodes(air_date); CREATE TABLE IF NOT EXISTS segments ( id INTEGER PRIMARY KEY, episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE, seg_idx INTEGER NOT NULL, start_sec REAL, end_sec REAL, text TEXT NOT NULL, UNIQUE(episode_id, seg_idx) ); CREATE INDEX IF NOT EXISTS idx_segments_episode ON segments(episode_id, start_sec); CREATE TABLE IF NOT EXISTS turns ( id INTEGER PRIMARY KEY, episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE, speaker TEXT NOT NULL, start_sec REAL, end_sec REAL, confidence REAL ); CREATE INDEX IF NOT EXISTS idx_turns_episode ON turns(episode_id, start_sec); CREATE INDEX IF NOT EXISTS idx_turns_speaker ON turns(episode_id, speaker); CREATE TABLE IF NOT EXISTS intros ( id INTEGER PRIMARY KEY, episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE, name TEXT NOT NULL, role_hint TEXT, intro_time_sec REAL, affiliation TEXT, fillin_for TEXT, source_text TEXT ); CREATE INDEX IF NOT EXISTS idx_intros_episode ON intros(episode_id); CREATE INDEX IF NOT EXISTS idx_intros_name ON intros(name); CREATE TABLE IF NOT EXISTS qa_pairs ( id INTEGER PRIMARY KEY, episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE, question_start_sec REAL, question_end_sec REAL, answer_start_sec REAL, answer_end_sec REAL, question_text TEXT NOT NULL, answer_text TEXT NOT NULL, caller_name TEXT, caller_role TEXT, topic TEXT, topic_tags TEXT, usefulness_score INTEGER, topic_class TEXT, is_banter INTEGER ); CREATE INDEX IF NOT EXISTS idx_qa_episode ON qa_pairs(episode_id); CREATE INDEX IF NOT EXISTS idx_qa_caller ON qa_pairs(caller_name); -- Indexes on quality columns (usefulness_score, topic_class) are created by -- _migrate_qa_quality_columns() so they apply to both fresh and migrated DBs. CREATE VIRTUAL TABLE IF NOT EXISTS segments_fts USING fts5( text, content='segments', content_rowid='id', tokenize='porter unicode61' ); CREATE VIRTUAL TABLE IF NOT EXISTS qa_fts USING fts5( question_text, answer_text, content='qa_pairs', content_rowid='id', tokenize='porter unicode61' ); """ TRIGGERS = """ CREATE TRIGGER IF NOT EXISTS segments_ai AFTER INSERT ON segments BEGIN INSERT INTO segments_fts(rowid, text) VALUES (new.id, new.text); END; CREATE TRIGGER IF NOT EXISTS segments_ad AFTER DELETE ON segments BEGIN INSERT INTO segments_fts(segments_fts, rowid, text) VALUES('delete', old.id, old.text); END; CREATE TRIGGER IF NOT EXISTS qa_ai AFTER INSERT ON qa_pairs BEGIN INSERT INTO qa_fts(rowid, question_text, answer_text) VALUES (new.id, new.question_text, new.answer_text); END; CREATE TRIGGER IF NOT EXISTS qa_ad AFTER DELETE ON qa_pairs BEGIN INSERT INTO qa_fts(qa_fts, rowid, question_text, answer_text) VALUES('delete', old.id, old.question_text, old.answer_text); END; """ # Most → least specific DATE_PATTERNS = [ re.compile(r"(?P20\d{2})[-_](?P\d{1,2})[-_](?P\d{1,2})"), re.compile(r"(?:^|[^\d])(?P\d{1,2})-(?P\d{1,2})-(?P\d{2})(?:[^\d]|$)"), ] def init_schema(conn: sqlite3.Connection): conn.executescript(SCHEMA) conn.executescript(TRIGGERS) conn.execute("PRAGMA foreign_keys = ON") _migrate_qa_quality_columns(conn) conn.commit() # Columns added by the Q&A quality classifier (Track 1). # Defined here so the migration is idempotent and runs on every invocation: # the SCHEMA above creates them on a fresh DB; this block ALTERs an existing # qa_pairs that pre-dates them. Names + types must match SCHEMA exactly. QA_QUALITY_COLUMNS = ( ("usefulness_score", "INTEGER"), ("topic_class", "TEXT"), ("is_banter", "INTEGER"), ) def _migrate_qa_quality_columns(conn: sqlite3.Connection) -> None: """Add Q&A quality columns to qa_pairs if they're missing. Idempotent: existing column values are untouched; new columns default NULL. Safe to call on fresh DBs (qa_pairs already has the columns from SCHEMA - PRAGMA table_info reflects that and we no-op). """ existing = {row[1] for row in conn.execute("PRAGMA table_info(qa_pairs)").fetchall()} if not existing: # qa_pairs hasn't been created yet (shouldn't happen post-SCHEMA, but be safe) return for col, col_type in QA_QUALITY_COLUMNS: if col not in existing: conn.execute(f"ALTER TABLE qa_pairs ADD COLUMN {col} {col_type}") # Indexes on the new columns (CREATE INDEX IF NOT EXISTS is already idempotent, # but on a brand-new DB they were created by SCHEMA; on a migrated DB they # weren't, so create them here too). conn.execute("CREATE INDEX IF NOT EXISTS idx_qa_usefulness ON qa_pairs(usefulness_score)") conn.execute("CREATE INDEX IF NOT EXISTS idx_qa_topic_class ON qa_pairs(topic_class)") def sha256_file(path: Path) -> str: h = hashlib.sha256() with open(path, "rb") as f: for chunk in iter(lambda: f.read(65536), b""): h.update(chunk) return h.hexdigest() def parse_air_date(rel_dir: Path) -> str | None: s = rel_dir.as_posix() for pat in DATE_PATTERNS: m = pat.search(s) if not m: continue gd = m.groupdict() try: if gd.get("y"): y, mo, d = int(gd["y"]), int(gd["m"]), int(gd["d"]) else: yy = int(gd["yy"]) y = 2000 + yy if yy < 30 else 1900 + yy mo, d = int(gd["m"]), int(gd["d"]) return datetime(y, mo, d).date().isoformat() except ValueError: continue return None def import_episode(conn: sqlite3.Connection, ep_dir: Path, root: Path) -> str: rel_dir = ep_dir.relative_to(root) rel_path = rel_dir.as_posix() + ".mp3" year_str = rel_dir.parts[0] if not (year_str.isdigit() and len(year_str) == 4): return "skipped" year = int(year_str) title = rel_dir.name air_date = parse_air_date(rel_dir) transcript_path = ep_dir / "transcript.json" sha = sha256_file(transcript_path) row = conn.execute( "SELECT id, transcript_sha256 FROM episodes WHERE rel_path = ?", (rel_path,), ).fetchone() if row and row[1] == sha: return "skipped" with open(transcript_path, encoding="utf-8") as f: transcript = json.load(f) with open(ep_dir / "diarization.json", encoding="utf-8") as f: diarization = json.load(f) with open(ep_dir / "intros.json", encoding="utf-8") as f: intros = json.load(f) with open(ep_dir / "qa.json", encoding="utf-8") as f: qa = json.load(f) with conn: if row: conn.execute("DELETE FROM episodes WHERE id = ?", (row[0],)) status = "updated" else: status = "inserted" cur = conn.execute( """INSERT INTO episodes (rel_path, year, title, air_date, duration_sec, language, language_probability, num_speakers, transcript_sha256, processed_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( rel_path, year, title, air_date, transcript.get("duration", 0.0), transcript.get("language"), transcript.get("language_probability"), diarization.get("num_speakers"), sha, datetime.now(timezone.utc).isoformat(timespec="seconds"), ), ) episode_id = cur.lastrowid conn.executemany( "INSERT INTO segments (episode_id, seg_idx, start_sec, end_sec, text) " "VALUES (?, ?, ?, ?, ?)", [ (episode_id, s["id"], s.get("start"), s.get("end"), s["text"]) for s in transcript.get("segments", []) ], ) conn.executemany( "INSERT INTO turns (episode_id, speaker, start_sec, end_sec, confidence) " "VALUES (?, ?, ?, ?, ?)", [ (episode_id, t["speaker"], t.get("start"), t.get("end"), t.get("confidence")) for t in diarization.get("turns", []) ], ) conn.executemany( "INSERT INTO intros " "(episode_id, name, role_hint, intro_time_sec, affiliation, fillin_for, source_text) " "VALUES (?, ?, ?, ?, ?, ?, ?)", [ (episode_id, i["name"], i.get("role_hint"), i.get("intro_time"), i.get("affiliation"), i.get("fillin_for"), i.get("source_text")) for i in intros ], ) conn.executemany( "INSERT INTO qa_pairs " "(episode_id, question_start_sec, question_end_sec, " " answer_start_sec, answer_end_sec, " " question_text, answer_text, caller_name, caller_role, topic, topic_tags) " "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", [ (episode_id, p.get("question_start"), p.get("question_end"), p.get("answer_start"), p.get("answer_end"), p.get("question_text"), p.get("answer_text"), p.get("caller_name"), p.get("caller_role"), p.get("topic"), json.dumps(p.get("topic_tags") or [])) for p in qa ], ) return status def find_episode_dirs(root: Path): for d in root.rglob("*"): if not d.is_dir(): continue if all((d / fn).exists() for fn in REQUIRED_FILES): yield d def main(): ap = argparse.ArgumentParser() ap.add_argument("--root", default=str(DEFAULT_ROOT)) ap.add_argument("--db", default=str(DEFAULT_DB)) ap.add_argument("--rebuild", action="store_true", help="Delete the DB before import") ap.add_argument("--vacuum", action="store_true", help="VACUUM after import (slow on large DBs)") args = ap.parse_args() root = Path(args.root) db = Path(args.db) if args.rebuild and db.exists(): db.unlink() print(f"deleted {db}") db.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(db) init_schema(conn) n_inserted = n_updated = n_skipped = n_error = 0 t0 = time.monotonic() dirs = sorted(find_episode_dirs(root)) print(f"Found {len(dirs)} complete episode directories under {root}") for i, d in enumerate(dirs, 1): try: status = import_episode(conn, d, root) if status == "inserted": n_inserted += 1 elif status == "updated": n_updated += 1 elif status == "skipped": n_skipped += 1 except Exception as e: n_error += 1 print(f"ERROR {d.relative_to(root)}: {e}") if i % 50 == 0: print(f" {i}/{len(dirs)} ins={n_inserted} upd={n_updated} skip={n_skipped} err={n_error}") conn.executescript("ANALYZE;") if args.vacuum: conn.executescript("VACUUM;") conn.close() size_mb = db.stat().st_size / 1024 / 1024 elapsed = time.monotonic() - t0 print(f"\n=== Done in {elapsed:.1f}s ===") print(f" inserted : {n_inserted}") print(f" updated : {n_updated}") print(f" skipped : {n_skipped}") print(f" errors : {n_error}") print(f" db : {db} ({size_mb:.1f} MB)") if __name__ == "__main__": main()