371 lines
12 KiB
Python
371 lines
12 KiB
Python
"""
|
|
Import per-episode pipeline outputs (transcript / diarization / intros / qa)
|
|
into a single SQLite archive.db.
|
|
|
|
Idempotent: skips episodes whose transcript.json sha256 matches the recorded
|
|
hash. Re-run after each batch_process pass to keep the DB current.
|
|
|
|
Usage:
|
|
py import_to_sqlite.py [--db PATH] [--root PATH] [--rebuild] [--vacuum]
|
|
"""
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import re
|
|
import sqlite3
|
|
import time
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
BASE = Path(__file__).parent
|
|
DEFAULT_ROOT = BASE / "archive-data" / "transcripts"
|
|
DEFAULT_DB = BASE / "archive-data" / "archive.db"
|
|
|
|
REQUIRED_FILES = ("transcript.json", "diarization.json", "intros.json", "qa.json")
|
|
|
|
SCHEMA = """
|
|
CREATE TABLE IF NOT EXISTS episodes (
|
|
id INTEGER PRIMARY KEY,
|
|
rel_path TEXT NOT NULL UNIQUE,
|
|
year INTEGER NOT NULL,
|
|
title TEXT,
|
|
air_date TEXT,
|
|
duration_sec REAL NOT NULL,
|
|
language TEXT,
|
|
language_probability REAL,
|
|
num_speakers INTEGER,
|
|
transcript_sha256 TEXT NOT NULL,
|
|
processed_at TEXT NOT NULL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_episodes_year ON episodes(year);
|
|
CREATE INDEX IF NOT EXISTS idx_episodes_air_date ON episodes(air_date);
|
|
|
|
CREATE TABLE IF NOT EXISTS segments (
|
|
id INTEGER PRIMARY KEY,
|
|
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
|
|
seg_idx INTEGER NOT NULL,
|
|
start_sec REAL,
|
|
end_sec REAL,
|
|
text TEXT NOT NULL,
|
|
UNIQUE(episode_id, seg_idx)
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_segments_episode ON segments(episode_id, start_sec);
|
|
|
|
CREATE TABLE IF NOT EXISTS turns (
|
|
id INTEGER PRIMARY KEY,
|
|
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
|
|
speaker TEXT NOT NULL,
|
|
start_sec REAL,
|
|
end_sec REAL,
|
|
confidence REAL
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_turns_episode ON turns(episode_id, start_sec);
|
|
CREATE INDEX IF NOT EXISTS idx_turns_speaker ON turns(episode_id, speaker);
|
|
|
|
CREATE TABLE IF NOT EXISTS intros (
|
|
id INTEGER PRIMARY KEY,
|
|
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
|
|
name TEXT NOT NULL,
|
|
role_hint TEXT,
|
|
intro_time_sec REAL,
|
|
affiliation TEXT,
|
|
fillin_for TEXT,
|
|
source_text TEXT
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_intros_episode ON intros(episode_id);
|
|
CREATE INDEX IF NOT EXISTS idx_intros_name ON intros(name);
|
|
|
|
CREATE TABLE IF NOT EXISTS qa_pairs (
|
|
id INTEGER PRIMARY KEY,
|
|
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
|
|
question_start_sec REAL,
|
|
question_end_sec REAL,
|
|
answer_start_sec REAL,
|
|
answer_end_sec REAL,
|
|
question_text TEXT NOT NULL,
|
|
answer_text TEXT NOT NULL,
|
|
caller_name TEXT,
|
|
caller_role TEXT,
|
|
topic TEXT,
|
|
topic_tags TEXT,
|
|
usefulness_score INTEGER,
|
|
topic_class TEXT,
|
|
is_banter INTEGER
|
|
);
|
|
CREATE INDEX IF NOT EXISTS idx_qa_episode ON qa_pairs(episode_id);
|
|
CREATE INDEX IF NOT EXISTS idx_qa_caller ON qa_pairs(caller_name);
|
|
-- Indexes on quality columns (usefulness_score, topic_class) are created by
|
|
-- _migrate_qa_quality_columns() so they apply to both fresh and migrated DBs.
|
|
|
|
CREATE VIRTUAL TABLE IF NOT EXISTS segments_fts USING fts5(
|
|
text,
|
|
content='segments', content_rowid='id',
|
|
tokenize='porter unicode61'
|
|
);
|
|
|
|
CREATE VIRTUAL TABLE IF NOT EXISTS qa_fts USING fts5(
|
|
question_text, answer_text,
|
|
content='qa_pairs', content_rowid='id',
|
|
tokenize='porter unicode61'
|
|
);
|
|
"""
|
|
|
|
TRIGGERS = """
|
|
CREATE TRIGGER IF NOT EXISTS segments_ai AFTER INSERT ON segments BEGIN
|
|
INSERT INTO segments_fts(rowid, text) VALUES (new.id, new.text);
|
|
END;
|
|
CREATE TRIGGER IF NOT EXISTS segments_ad AFTER DELETE ON segments BEGIN
|
|
INSERT INTO segments_fts(segments_fts, rowid, text) VALUES('delete', old.id, old.text);
|
|
END;
|
|
CREATE TRIGGER IF NOT EXISTS qa_ai AFTER INSERT ON qa_pairs BEGIN
|
|
INSERT INTO qa_fts(rowid, question_text, answer_text)
|
|
VALUES (new.id, new.question_text, new.answer_text);
|
|
END;
|
|
CREATE TRIGGER IF NOT EXISTS qa_ad AFTER DELETE ON qa_pairs BEGIN
|
|
INSERT INTO qa_fts(qa_fts, rowid, question_text, answer_text)
|
|
VALUES('delete', old.id, old.question_text, old.answer_text);
|
|
END;
|
|
"""
|
|
|
|
# Most → least specific
|
|
DATE_PATTERNS = [
|
|
re.compile(r"(?P<y>20\d{2})[-_](?P<m>\d{1,2})[-_](?P<d>\d{1,2})"),
|
|
re.compile(r"(?:^|[^\d])(?P<m>\d{1,2})-(?P<d>\d{1,2})-(?P<yy>\d{2})(?:[^\d]|$)"),
|
|
]
|
|
|
|
|
|
def init_schema(conn: sqlite3.Connection):
|
|
conn.executescript(SCHEMA)
|
|
conn.executescript(TRIGGERS)
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
_migrate_qa_quality_columns(conn)
|
|
conn.commit()
|
|
|
|
|
|
# Columns added by the Q&A quality classifier (Track 1).
|
|
# Defined here so the migration is idempotent and runs on every invocation:
|
|
# the SCHEMA above creates them on a fresh DB; this block ALTERs an existing
|
|
# qa_pairs that pre-dates them. Names + types must match SCHEMA exactly.
|
|
QA_QUALITY_COLUMNS = (
|
|
("usefulness_score", "INTEGER"),
|
|
("topic_class", "TEXT"),
|
|
("is_banter", "INTEGER"),
|
|
)
|
|
|
|
|
|
def _migrate_qa_quality_columns(conn: sqlite3.Connection) -> None:
|
|
"""Add Q&A quality columns to qa_pairs if they're missing.
|
|
|
|
Idempotent: existing column values are untouched; new columns default NULL.
|
|
Safe to call on fresh DBs (qa_pairs already has the columns from SCHEMA -
|
|
PRAGMA table_info reflects that and we no-op).
|
|
"""
|
|
existing = {row[1] for row in conn.execute("PRAGMA table_info(qa_pairs)").fetchall()}
|
|
if not existing:
|
|
# qa_pairs hasn't been created yet (shouldn't happen post-SCHEMA, but be safe)
|
|
return
|
|
for col, col_type in QA_QUALITY_COLUMNS:
|
|
if col not in existing:
|
|
conn.execute(f"ALTER TABLE qa_pairs ADD COLUMN {col} {col_type}")
|
|
# Indexes on the new columns (CREATE INDEX IF NOT EXISTS is already idempotent,
|
|
# but on a brand-new DB they were created by SCHEMA; on a migrated DB they
|
|
# weren't, so create them here too).
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_qa_usefulness ON qa_pairs(usefulness_score)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_qa_topic_class ON qa_pairs(topic_class)")
|
|
|
|
|
|
def sha256_file(path: Path) -> str:
|
|
h = hashlib.sha256()
|
|
with open(path, "rb") as f:
|
|
for chunk in iter(lambda: f.read(65536), b""):
|
|
h.update(chunk)
|
|
return h.hexdigest()
|
|
|
|
|
|
def parse_air_date(rel_dir: Path) -> str | None:
|
|
s = rel_dir.as_posix()
|
|
for pat in DATE_PATTERNS:
|
|
m = pat.search(s)
|
|
if not m:
|
|
continue
|
|
gd = m.groupdict()
|
|
try:
|
|
if gd.get("y"):
|
|
y, mo, d = int(gd["y"]), int(gd["m"]), int(gd["d"])
|
|
else:
|
|
yy = int(gd["yy"])
|
|
y = 2000 + yy if yy < 30 else 1900 + yy
|
|
mo, d = int(gd["m"]), int(gd["d"])
|
|
return datetime(y, mo, d).date().isoformat()
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
def import_episode(conn: sqlite3.Connection, ep_dir: Path, root: Path) -> str:
|
|
rel_dir = ep_dir.relative_to(root)
|
|
rel_path = rel_dir.as_posix() + ".mp3"
|
|
year_str = rel_dir.parts[0]
|
|
if not (year_str.isdigit() and len(year_str) == 4):
|
|
return "skipped"
|
|
year = int(year_str)
|
|
title = rel_dir.name
|
|
air_date = parse_air_date(rel_dir)
|
|
|
|
transcript_path = ep_dir / "transcript.json"
|
|
sha = sha256_file(transcript_path)
|
|
|
|
row = conn.execute(
|
|
"SELECT id, transcript_sha256 FROM episodes WHERE rel_path = ?",
|
|
(rel_path,),
|
|
).fetchone()
|
|
if row and row[1] == sha:
|
|
return "skipped"
|
|
|
|
with open(transcript_path, encoding="utf-8") as f:
|
|
transcript = json.load(f)
|
|
with open(ep_dir / "diarization.json", encoding="utf-8") as f:
|
|
diarization = json.load(f)
|
|
with open(ep_dir / "intros.json", encoding="utf-8") as f:
|
|
intros = json.load(f)
|
|
with open(ep_dir / "qa.json", encoding="utf-8") as f:
|
|
qa = json.load(f)
|
|
|
|
with conn:
|
|
if row:
|
|
conn.execute("DELETE FROM episodes WHERE id = ?", (row[0],))
|
|
status = "updated"
|
|
else:
|
|
status = "inserted"
|
|
|
|
cur = conn.execute(
|
|
"""INSERT INTO episodes
|
|
(rel_path, year, title, air_date, duration_sec, language,
|
|
language_probability, num_speakers, transcript_sha256, processed_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
rel_path, year, title, air_date,
|
|
transcript.get("duration", 0.0),
|
|
transcript.get("language"),
|
|
transcript.get("language_probability"),
|
|
diarization.get("num_speakers"),
|
|
sha,
|
|
datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
|
),
|
|
)
|
|
episode_id = cur.lastrowid
|
|
|
|
conn.executemany(
|
|
"INSERT INTO segments (episode_id, seg_idx, start_sec, end_sec, text) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
[
|
|
(episode_id, s["id"], s.get("start"), s.get("end"), s["text"])
|
|
for s in transcript.get("segments", [])
|
|
],
|
|
)
|
|
|
|
conn.executemany(
|
|
"INSERT INTO turns (episode_id, speaker, start_sec, end_sec, confidence) "
|
|
"VALUES (?, ?, ?, ?, ?)",
|
|
[
|
|
(episode_id, t["speaker"], t.get("start"), t.get("end"), t.get("confidence"))
|
|
for t in diarization.get("turns", [])
|
|
],
|
|
)
|
|
|
|
conn.executemany(
|
|
"INSERT INTO intros "
|
|
"(episode_id, name, role_hint, intro_time_sec, affiliation, fillin_for, source_text) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
|
[
|
|
(episode_id, i["name"], i.get("role_hint"), i.get("intro_time"),
|
|
i.get("affiliation"), i.get("fillin_for"), i.get("source_text"))
|
|
for i in intros
|
|
],
|
|
)
|
|
|
|
conn.executemany(
|
|
"INSERT INTO qa_pairs "
|
|
"(episode_id, question_start_sec, question_end_sec, "
|
|
" answer_start_sec, answer_end_sec, "
|
|
" question_text, answer_text, caller_name, caller_role, topic, topic_tags) "
|
|
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
|
[
|
|
(episode_id,
|
|
p.get("question_start"), p.get("question_end"),
|
|
p.get("answer_start"), p.get("answer_end"),
|
|
p.get("question_text"), p.get("answer_text"),
|
|
p.get("caller_name"), p.get("caller_role"),
|
|
p.get("topic"), json.dumps(p.get("topic_tags") or []))
|
|
for p in qa
|
|
],
|
|
)
|
|
|
|
return status
|
|
|
|
|
|
def find_episode_dirs(root: Path):
|
|
for d in root.rglob("*"):
|
|
if not d.is_dir():
|
|
continue
|
|
if all((d / fn).exists() for fn in REQUIRED_FILES):
|
|
yield d
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--root", default=str(DEFAULT_ROOT))
|
|
ap.add_argument("--db", default=str(DEFAULT_DB))
|
|
ap.add_argument("--rebuild", action="store_true",
|
|
help="Delete the DB before import")
|
|
ap.add_argument("--vacuum", action="store_true",
|
|
help="VACUUM after import (slow on large DBs)")
|
|
args = ap.parse_args()
|
|
|
|
root = Path(args.root)
|
|
db = Path(args.db)
|
|
|
|
if args.rebuild and db.exists():
|
|
db.unlink()
|
|
print(f"deleted {db}")
|
|
|
|
db.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(db)
|
|
init_schema(conn)
|
|
|
|
n_inserted = n_updated = n_skipped = n_error = 0
|
|
t0 = time.monotonic()
|
|
|
|
dirs = sorted(find_episode_dirs(root))
|
|
print(f"Found {len(dirs)} complete episode directories under {root}")
|
|
|
|
for i, d in enumerate(dirs, 1):
|
|
try:
|
|
status = import_episode(conn, d, root)
|
|
if status == "inserted": n_inserted += 1
|
|
elif status == "updated": n_updated += 1
|
|
elif status == "skipped": n_skipped += 1
|
|
except Exception as e:
|
|
n_error += 1
|
|
print(f"ERROR {d.relative_to(root)}: {e}")
|
|
if i % 50 == 0:
|
|
print(f" {i}/{len(dirs)} ins={n_inserted} upd={n_updated} skip={n_skipped} err={n_error}")
|
|
|
|
conn.executescript("ANALYZE;")
|
|
if args.vacuum:
|
|
conn.executescript("VACUUM;")
|
|
conn.close()
|
|
|
|
size_mb = db.stat().st_size / 1024 / 1024
|
|
elapsed = time.monotonic() - t0
|
|
print(f"\n=== Done in {elapsed:.1f}s ===")
|
|
print(f" inserted : {n_inserted}")
|
|
print(f" updated : {n_updated}")
|
|
print(f" skipped : {n_skipped}")
|
|
print(f" errors : {n_error}")
|
|
print(f" db : {db} ({size_mb:.1f} MB)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|