Files
Mike Swanson 6b63c154d2 sync: auto-sync from GURU-BEAST-ROG at 2026-04-29 13:29:17
Author: Mike Swanson
Machine: GURU-BEAST-ROG
Timestamp: 2026-04-29 13:29:17
2026-04-29 13:29:19 -07:00

371 lines
12 KiB
Python

"""
Import per-episode pipeline outputs (transcript / diarization / intros / qa)
into a single SQLite archive.db.
Idempotent: skips episodes whose transcript.json sha256 matches the recorded
hash. Re-run after each batch_process pass to keep the DB current.
Usage:
py import_to_sqlite.py [--db PATH] [--root PATH] [--rebuild] [--vacuum]
"""
import argparse
import hashlib
import json
import re
import sqlite3
import time
from datetime import datetime, timezone
from pathlib import Path
BASE = Path(__file__).parent
DEFAULT_ROOT = BASE / "archive-data" / "transcripts"
DEFAULT_DB = BASE / "archive-data" / "archive.db"
REQUIRED_FILES = ("transcript.json", "diarization.json", "intros.json", "qa.json")
SCHEMA = """
CREATE TABLE IF NOT EXISTS episodes (
id INTEGER PRIMARY KEY,
rel_path TEXT NOT NULL UNIQUE,
year INTEGER NOT NULL,
title TEXT,
air_date TEXT,
duration_sec REAL NOT NULL,
language TEXT,
language_probability REAL,
num_speakers INTEGER,
transcript_sha256 TEXT NOT NULL,
processed_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_episodes_year ON episodes(year);
CREATE INDEX IF NOT EXISTS idx_episodes_air_date ON episodes(air_date);
CREATE TABLE IF NOT EXISTS segments (
id INTEGER PRIMARY KEY,
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
seg_idx INTEGER NOT NULL,
start_sec REAL,
end_sec REAL,
text TEXT NOT NULL,
UNIQUE(episode_id, seg_idx)
);
CREATE INDEX IF NOT EXISTS idx_segments_episode ON segments(episode_id, start_sec);
CREATE TABLE IF NOT EXISTS turns (
id INTEGER PRIMARY KEY,
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
speaker TEXT NOT NULL,
start_sec REAL,
end_sec REAL,
confidence REAL
);
CREATE INDEX IF NOT EXISTS idx_turns_episode ON turns(episode_id, start_sec);
CREATE INDEX IF NOT EXISTS idx_turns_speaker ON turns(episode_id, speaker);
CREATE TABLE IF NOT EXISTS intros (
id INTEGER PRIMARY KEY,
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
name TEXT NOT NULL,
role_hint TEXT,
intro_time_sec REAL,
affiliation TEXT,
fillin_for TEXT,
source_text TEXT
);
CREATE INDEX IF NOT EXISTS idx_intros_episode ON intros(episode_id);
CREATE INDEX IF NOT EXISTS idx_intros_name ON intros(name);
CREATE TABLE IF NOT EXISTS qa_pairs (
id INTEGER PRIMARY KEY,
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
question_start_sec REAL,
question_end_sec REAL,
answer_start_sec REAL,
answer_end_sec REAL,
question_text TEXT NOT NULL,
answer_text TEXT NOT NULL,
caller_name TEXT,
caller_role TEXT,
topic TEXT,
topic_tags TEXT,
usefulness_score INTEGER,
topic_class TEXT,
is_banter INTEGER
);
CREATE INDEX IF NOT EXISTS idx_qa_episode ON qa_pairs(episode_id);
CREATE INDEX IF NOT EXISTS idx_qa_caller ON qa_pairs(caller_name);
-- Indexes on quality columns (usefulness_score, topic_class) are created by
-- _migrate_qa_quality_columns() so they apply to both fresh and migrated DBs.
CREATE VIRTUAL TABLE IF NOT EXISTS segments_fts USING fts5(
text,
content='segments', content_rowid='id',
tokenize='porter unicode61'
);
CREATE VIRTUAL TABLE IF NOT EXISTS qa_fts USING fts5(
question_text, answer_text,
content='qa_pairs', content_rowid='id',
tokenize='porter unicode61'
);
"""
TRIGGERS = """
CREATE TRIGGER IF NOT EXISTS segments_ai AFTER INSERT ON segments BEGIN
INSERT INTO segments_fts(rowid, text) VALUES (new.id, new.text);
END;
CREATE TRIGGER IF NOT EXISTS segments_ad AFTER DELETE ON segments BEGIN
INSERT INTO segments_fts(segments_fts, rowid, text) VALUES('delete', old.id, old.text);
END;
CREATE TRIGGER IF NOT EXISTS qa_ai AFTER INSERT ON qa_pairs BEGIN
INSERT INTO qa_fts(rowid, question_text, answer_text)
VALUES (new.id, new.question_text, new.answer_text);
END;
CREATE TRIGGER IF NOT EXISTS qa_ad AFTER DELETE ON qa_pairs BEGIN
INSERT INTO qa_fts(qa_fts, rowid, question_text, answer_text)
VALUES('delete', old.id, old.question_text, old.answer_text);
END;
"""
# Most → least specific
DATE_PATTERNS = [
re.compile(r"(?P<y>20\d{2})[-_](?P<m>\d{1,2})[-_](?P<d>\d{1,2})"),
re.compile(r"(?:^|[^\d])(?P<m>\d{1,2})-(?P<d>\d{1,2})-(?P<yy>\d{2})(?:[^\d]|$)"),
]
def init_schema(conn: sqlite3.Connection):
conn.executescript(SCHEMA)
conn.executescript(TRIGGERS)
conn.execute("PRAGMA foreign_keys = ON")
_migrate_qa_quality_columns(conn)
conn.commit()
# Columns added by the Q&A quality classifier (Track 1).
# Defined here so the migration is idempotent and runs on every invocation:
# the SCHEMA above creates them on a fresh DB; this block ALTERs an existing
# qa_pairs that pre-dates them. Names + types must match SCHEMA exactly.
QA_QUALITY_COLUMNS = (
("usefulness_score", "INTEGER"),
("topic_class", "TEXT"),
("is_banter", "INTEGER"),
)
def _migrate_qa_quality_columns(conn: sqlite3.Connection) -> None:
"""Add Q&A quality columns to qa_pairs if they're missing.
Idempotent: existing column values are untouched; new columns default NULL.
Safe to call on fresh DBs (qa_pairs already has the columns from SCHEMA -
PRAGMA table_info reflects that and we no-op).
"""
existing = {row[1] for row in conn.execute("PRAGMA table_info(qa_pairs)").fetchall()}
if not existing:
# qa_pairs hasn't been created yet (shouldn't happen post-SCHEMA, but be safe)
return
for col, col_type in QA_QUALITY_COLUMNS:
if col not in existing:
conn.execute(f"ALTER TABLE qa_pairs ADD COLUMN {col} {col_type}")
# Indexes on the new columns (CREATE INDEX IF NOT EXISTS is already idempotent,
# but on a brand-new DB they were created by SCHEMA; on a migrated DB they
# weren't, so create them here too).
conn.execute("CREATE INDEX IF NOT EXISTS idx_qa_usefulness ON qa_pairs(usefulness_score)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_qa_topic_class ON qa_pairs(topic_class)")
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
def parse_air_date(rel_dir: Path) -> str | None:
s = rel_dir.as_posix()
for pat in DATE_PATTERNS:
m = pat.search(s)
if not m:
continue
gd = m.groupdict()
try:
if gd.get("y"):
y, mo, d = int(gd["y"]), int(gd["m"]), int(gd["d"])
else:
yy = int(gd["yy"])
y = 2000 + yy if yy < 30 else 1900 + yy
mo, d = int(gd["m"]), int(gd["d"])
return datetime(y, mo, d).date().isoformat()
except ValueError:
continue
return None
def import_episode(conn: sqlite3.Connection, ep_dir: Path, root: Path) -> str:
rel_dir = ep_dir.relative_to(root)
rel_path = rel_dir.as_posix() + ".mp3"
year_str = rel_dir.parts[0]
if not (year_str.isdigit() and len(year_str) == 4):
return "skipped"
year = int(year_str)
title = rel_dir.name
air_date = parse_air_date(rel_dir)
transcript_path = ep_dir / "transcript.json"
sha = sha256_file(transcript_path)
row = conn.execute(
"SELECT id, transcript_sha256 FROM episodes WHERE rel_path = ?",
(rel_path,),
).fetchone()
if row and row[1] == sha:
return "skipped"
with open(transcript_path, encoding="utf-8") as f:
transcript = json.load(f)
with open(ep_dir / "diarization.json", encoding="utf-8") as f:
diarization = json.load(f)
with open(ep_dir / "intros.json", encoding="utf-8") as f:
intros = json.load(f)
with open(ep_dir / "qa.json", encoding="utf-8") as f:
qa = json.load(f)
with conn:
if row:
conn.execute("DELETE FROM episodes WHERE id = ?", (row[0],))
status = "updated"
else:
status = "inserted"
cur = conn.execute(
"""INSERT INTO episodes
(rel_path, year, title, air_date, duration_sec, language,
language_probability, num_speakers, transcript_sha256, processed_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
rel_path, year, title, air_date,
transcript.get("duration", 0.0),
transcript.get("language"),
transcript.get("language_probability"),
diarization.get("num_speakers"),
sha,
datetime.now(timezone.utc).isoformat(timespec="seconds"),
),
)
episode_id = cur.lastrowid
conn.executemany(
"INSERT INTO segments (episode_id, seg_idx, start_sec, end_sec, text) "
"VALUES (?, ?, ?, ?, ?)",
[
(episode_id, s["id"], s.get("start"), s.get("end"), s["text"])
for s in transcript.get("segments", [])
],
)
conn.executemany(
"INSERT INTO turns (episode_id, speaker, start_sec, end_sec, confidence) "
"VALUES (?, ?, ?, ?, ?)",
[
(episode_id, t["speaker"], t.get("start"), t.get("end"), t.get("confidence"))
for t in diarization.get("turns", [])
],
)
conn.executemany(
"INSERT INTO intros "
"(episode_id, name, role_hint, intro_time_sec, affiliation, fillin_for, source_text) "
"VALUES (?, ?, ?, ?, ?, ?, ?)",
[
(episode_id, i["name"], i.get("role_hint"), i.get("intro_time"),
i.get("affiliation"), i.get("fillin_for"), i.get("source_text"))
for i in intros
],
)
conn.executemany(
"INSERT INTO qa_pairs "
"(episode_id, question_start_sec, question_end_sec, "
" answer_start_sec, answer_end_sec, "
" question_text, answer_text, caller_name, caller_role, topic, topic_tags) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
[
(episode_id,
p.get("question_start"), p.get("question_end"),
p.get("answer_start"), p.get("answer_end"),
p.get("question_text"), p.get("answer_text"),
p.get("caller_name"), p.get("caller_role"),
p.get("topic"), json.dumps(p.get("topic_tags") or []))
for p in qa
],
)
return status
def find_episode_dirs(root: Path):
for d in root.rglob("*"):
if not d.is_dir():
continue
if all((d / fn).exists() for fn in REQUIRED_FILES):
yield d
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--root", default=str(DEFAULT_ROOT))
ap.add_argument("--db", default=str(DEFAULT_DB))
ap.add_argument("--rebuild", action="store_true",
help="Delete the DB before import")
ap.add_argument("--vacuum", action="store_true",
help="VACUUM after import (slow on large DBs)")
args = ap.parse_args()
root = Path(args.root)
db = Path(args.db)
if args.rebuild and db.exists():
db.unlink()
print(f"deleted {db}")
db.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(db)
init_schema(conn)
n_inserted = n_updated = n_skipped = n_error = 0
t0 = time.monotonic()
dirs = sorted(find_episode_dirs(root))
print(f"Found {len(dirs)} complete episode directories under {root}")
for i, d in enumerate(dirs, 1):
try:
status = import_episode(conn, d, root)
if status == "inserted": n_inserted += 1
elif status == "updated": n_updated += 1
elif status == "skipped": n_skipped += 1
except Exception as e:
n_error += 1
print(f"ERROR {d.relative_to(root)}: {e}")
if i % 50 == 0:
print(f" {i}/{len(dirs)} ins={n_inserted} upd={n_updated} skip={n_skipped} err={n_error}")
conn.executescript("ANALYZE;")
if args.vacuum:
conn.executescript("VACUUM;")
conn.close()
size_mb = db.stat().st_size / 1024 / 1024
elapsed = time.monotonic() - t0
print(f"\n=== Done in {elapsed:.1f}s ===")
print(f" inserted : {n_inserted}")
print(f" updated : {n_updated}")
print(f" skipped : {n_skipped}")
print(f" errors : {n_error}")
print(f" db : {db} ({size_mb:.1f} MB)")
if __name__ == "__main__":
main()