radio: utf-8 transcript writes + sqlite archive importer + session log
- src/transcriber.py: open transcript.{json,txt,srt} with encoding="utf-8".
Windows cp1252 default crashed on Whisper output containing U+2044.
- import_to_sqlite.py: new. Walks archive-data/transcripts, builds
archive.db (5 tables + 2 FTS5 virtual tables, sha256-keyed idempotency).
20.5 MB / 208 episodes at smoke-test time, 1.9s rebuild.
- batch_process.py: tracked from prior session — full-archive batch with
resumable transcribe/diarize/intros/qa pipeline.
- .gitignore: archive-data/ and logs/.
Session log: 2026-04-27-archive-batch-and-sqlite-import.md.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
332
projects/radio-show/audio-processor/import_to_sqlite.py
Normal file
332
projects/radio-show/audio-processor/import_to_sqlite.py
Normal file
@@ -0,0 +1,332 @@
|
||||
"""
|
||||
Import per-episode pipeline outputs (transcript / diarization / intros / qa)
|
||||
into a single SQLite archive.db.
|
||||
|
||||
Idempotent: skips episodes whose transcript.json sha256 matches the recorded
|
||||
hash. Re-run after each batch_process pass to keep the DB current.
|
||||
|
||||
Usage:
|
||||
py import_to_sqlite.py [--db PATH] [--root PATH] [--rebuild] [--vacuum]
|
||||
"""
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import re
|
||||
import sqlite3
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
BASE = Path(__file__).parent
|
||||
DEFAULT_ROOT = BASE / "archive-data" / "transcripts"
|
||||
DEFAULT_DB = BASE / "archive-data" / "archive.db"
|
||||
|
||||
REQUIRED_FILES = ("transcript.json", "diarization.json", "intros.json", "qa.json")
|
||||
|
||||
SCHEMA = """
|
||||
CREATE TABLE IF NOT EXISTS episodes (
|
||||
id INTEGER PRIMARY KEY,
|
||||
rel_path TEXT NOT NULL UNIQUE,
|
||||
year INTEGER NOT NULL,
|
||||
title TEXT,
|
||||
air_date TEXT,
|
||||
duration_sec REAL NOT NULL,
|
||||
language TEXT,
|
||||
language_probability REAL,
|
||||
num_speakers INTEGER,
|
||||
transcript_sha256 TEXT NOT NULL,
|
||||
processed_at TEXT NOT NULL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_episodes_year ON episodes(year);
|
||||
CREATE INDEX IF NOT EXISTS idx_episodes_air_date ON episodes(air_date);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS segments (
|
||||
id INTEGER PRIMARY KEY,
|
||||
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
|
||||
seg_idx INTEGER NOT NULL,
|
||||
start_sec REAL,
|
||||
end_sec REAL,
|
||||
text TEXT NOT NULL,
|
||||
UNIQUE(episode_id, seg_idx)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_segments_episode ON segments(episode_id, start_sec);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS turns (
|
||||
id INTEGER PRIMARY KEY,
|
||||
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
|
||||
speaker TEXT NOT NULL,
|
||||
start_sec REAL,
|
||||
end_sec REAL,
|
||||
confidence REAL
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_turns_episode ON turns(episode_id, start_sec);
|
||||
CREATE INDEX IF NOT EXISTS idx_turns_speaker ON turns(episode_id, speaker);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS intros (
|
||||
id INTEGER PRIMARY KEY,
|
||||
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
|
||||
name TEXT NOT NULL,
|
||||
role_hint TEXT,
|
||||
intro_time_sec REAL,
|
||||
affiliation TEXT,
|
||||
fillin_for TEXT,
|
||||
source_text TEXT
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_intros_episode ON intros(episode_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_intros_name ON intros(name);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS qa_pairs (
|
||||
id INTEGER PRIMARY KEY,
|
||||
episode_id INTEGER NOT NULL REFERENCES episodes(id) ON DELETE CASCADE,
|
||||
question_start_sec REAL,
|
||||
question_end_sec REAL,
|
||||
answer_start_sec REAL,
|
||||
answer_end_sec REAL,
|
||||
question_text TEXT NOT NULL,
|
||||
answer_text TEXT NOT NULL,
|
||||
caller_name TEXT,
|
||||
caller_role TEXT,
|
||||
topic TEXT,
|
||||
topic_tags TEXT
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_qa_episode ON qa_pairs(episode_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_qa_caller ON qa_pairs(caller_name);
|
||||
|
||||
CREATE VIRTUAL TABLE IF NOT EXISTS segments_fts USING fts5(
|
||||
text,
|
||||
content='segments', content_rowid='id',
|
||||
tokenize='porter unicode61'
|
||||
);
|
||||
|
||||
CREATE VIRTUAL TABLE IF NOT EXISTS qa_fts USING fts5(
|
||||
question_text, answer_text,
|
||||
content='qa_pairs', content_rowid='id',
|
||||
tokenize='porter unicode61'
|
||||
);
|
||||
"""
|
||||
|
||||
TRIGGERS = """
|
||||
CREATE TRIGGER IF NOT EXISTS segments_ai AFTER INSERT ON segments BEGIN
|
||||
INSERT INTO segments_fts(rowid, text) VALUES (new.id, new.text);
|
||||
END;
|
||||
CREATE TRIGGER IF NOT EXISTS segments_ad AFTER DELETE ON segments BEGIN
|
||||
INSERT INTO segments_fts(segments_fts, rowid, text) VALUES('delete', old.id, old.text);
|
||||
END;
|
||||
CREATE TRIGGER IF NOT EXISTS qa_ai AFTER INSERT ON qa_pairs BEGIN
|
||||
INSERT INTO qa_fts(rowid, question_text, answer_text)
|
||||
VALUES (new.id, new.question_text, new.answer_text);
|
||||
END;
|
||||
CREATE TRIGGER IF NOT EXISTS qa_ad AFTER DELETE ON qa_pairs BEGIN
|
||||
INSERT INTO qa_fts(qa_fts, rowid, question_text, answer_text)
|
||||
VALUES('delete', old.id, old.question_text, old.answer_text);
|
||||
END;
|
||||
"""
|
||||
|
||||
# Most → least specific
|
||||
DATE_PATTERNS = [
|
||||
re.compile(r"(?P<y>20\d{2})[-_](?P<m>\d{1,2})[-_](?P<d>\d{1,2})"),
|
||||
re.compile(r"(?:^|[^\d])(?P<m>\d{1,2})-(?P<d>\d{1,2})-(?P<yy>\d{2})(?:[^\d]|$)"),
|
||||
]
|
||||
|
||||
|
||||
def init_schema(conn: sqlite3.Connection):
|
||||
conn.executescript(SCHEMA)
|
||||
conn.executescript(TRIGGERS)
|
||||
conn.execute("PRAGMA foreign_keys = ON")
|
||||
conn.commit()
|
||||
|
||||
|
||||
def sha256_file(path: Path) -> str:
|
||||
h = hashlib.sha256()
|
||||
with open(path, "rb") as f:
|
||||
for chunk in iter(lambda: f.read(65536), b""):
|
||||
h.update(chunk)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def parse_air_date(rel_dir: Path) -> str | None:
|
||||
s = rel_dir.as_posix()
|
||||
for pat in DATE_PATTERNS:
|
||||
m = pat.search(s)
|
||||
if not m:
|
||||
continue
|
||||
gd = m.groupdict()
|
||||
try:
|
||||
if gd.get("y"):
|
||||
y, mo, d = int(gd["y"]), int(gd["m"]), int(gd["d"])
|
||||
else:
|
||||
yy = int(gd["yy"])
|
||||
y = 2000 + yy if yy < 30 else 1900 + yy
|
||||
mo, d = int(gd["m"]), int(gd["d"])
|
||||
return datetime(y, mo, d).date().isoformat()
|
||||
except ValueError:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def import_episode(conn: sqlite3.Connection, ep_dir: Path, root: Path) -> str:
|
||||
rel_dir = ep_dir.relative_to(root)
|
||||
rel_path = rel_dir.as_posix() + ".mp3"
|
||||
year_str = rel_dir.parts[0]
|
||||
if not (year_str.isdigit() and len(year_str) == 4):
|
||||
return "skipped"
|
||||
year = int(year_str)
|
||||
title = rel_dir.name
|
||||
air_date = parse_air_date(rel_dir)
|
||||
|
||||
transcript_path = ep_dir / "transcript.json"
|
||||
sha = sha256_file(transcript_path)
|
||||
|
||||
row = conn.execute(
|
||||
"SELECT id, transcript_sha256 FROM episodes WHERE rel_path = ?",
|
||||
(rel_path,),
|
||||
).fetchone()
|
||||
if row and row[1] == sha:
|
||||
return "skipped"
|
||||
|
||||
with open(transcript_path, encoding="utf-8") as f:
|
||||
transcript = json.load(f)
|
||||
with open(ep_dir / "diarization.json", encoding="utf-8") as f:
|
||||
diarization = json.load(f)
|
||||
with open(ep_dir / "intros.json", encoding="utf-8") as f:
|
||||
intros = json.load(f)
|
||||
with open(ep_dir / "qa.json", encoding="utf-8") as f:
|
||||
qa = json.load(f)
|
||||
|
||||
with conn:
|
||||
if row:
|
||||
conn.execute("DELETE FROM episodes WHERE id = ?", (row[0],))
|
||||
status = "updated"
|
||||
else:
|
||||
status = "inserted"
|
||||
|
||||
cur = conn.execute(
|
||||
"""INSERT INTO episodes
|
||||
(rel_path, year, title, air_date, duration_sec, language,
|
||||
language_probability, num_speakers, transcript_sha256, processed_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
rel_path, year, title, air_date,
|
||||
transcript.get("duration", 0.0),
|
||||
transcript.get("language"),
|
||||
transcript.get("language_probability"),
|
||||
diarization.get("num_speakers"),
|
||||
sha,
|
||||
datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
||||
),
|
||||
)
|
||||
episode_id = cur.lastrowid
|
||||
|
||||
conn.executemany(
|
||||
"INSERT INTO segments (episode_id, seg_idx, start_sec, end_sec, text) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
[
|
||||
(episode_id, s["id"], s.get("start"), s.get("end"), s["text"])
|
||||
for s in transcript.get("segments", [])
|
||||
],
|
||||
)
|
||||
|
||||
conn.executemany(
|
||||
"INSERT INTO turns (episode_id, speaker, start_sec, end_sec, confidence) "
|
||||
"VALUES (?, ?, ?, ?, ?)",
|
||||
[
|
||||
(episode_id, t["speaker"], t.get("start"), t.get("end"), t.get("confidence"))
|
||||
for t in diarization.get("turns", [])
|
||||
],
|
||||
)
|
||||
|
||||
conn.executemany(
|
||||
"INSERT INTO intros "
|
||||
"(episode_id, name, role_hint, intro_time_sec, affiliation, fillin_for, source_text) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
[
|
||||
(episode_id, i["name"], i.get("role_hint"), i.get("intro_time"),
|
||||
i.get("affiliation"), i.get("fillin_for"), i.get("source_text"))
|
||||
for i in intros
|
||||
],
|
||||
)
|
||||
|
||||
conn.executemany(
|
||||
"INSERT INTO qa_pairs "
|
||||
"(episode_id, question_start_sec, question_end_sec, "
|
||||
" answer_start_sec, answer_end_sec, "
|
||||
" question_text, answer_text, caller_name, caller_role, topic, topic_tags) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
[
|
||||
(episode_id,
|
||||
p.get("question_start"), p.get("question_end"),
|
||||
p.get("answer_start"), p.get("answer_end"),
|
||||
p.get("question_text"), p.get("answer_text"),
|
||||
p.get("caller_name"), p.get("caller_role"),
|
||||
p.get("topic"), json.dumps(p.get("topic_tags") or []))
|
||||
for p in qa
|
||||
],
|
||||
)
|
||||
|
||||
return status
|
||||
|
||||
|
||||
def find_episode_dirs(root: Path):
|
||||
for d in root.rglob("*"):
|
||||
if not d.is_dir():
|
||||
continue
|
||||
if all((d / fn).exists() for fn in REQUIRED_FILES):
|
||||
yield d
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--root", default=str(DEFAULT_ROOT))
|
||||
ap.add_argument("--db", default=str(DEFAULT_DB))
|
||||
ap.add_argument("--rebuild", action="store_true",
|
||||
help="Delete the DB before import")
|
||||
ap.add_argument("--vacuum", action="store_true",
|
||||
help="VACUUM after import (slow on large DBs)")
|
||||
args = ap.parse_args()
|
||||
|
||||
root = Path(args.root)
|
||||
db = Path(args.db)
|
||||
|
||||
if args.rebuild and db.exists():
|
||||
db.unlink()
|
||||
print(f"deleted {db}")
|
||||
|
||||
db.parent.mkdir(parents=True, exist_ok=True)
|
||||
conn = sqlite3.connect(db)
|
||||
init_schema(conn)
|
||||
|
||||
n_inserted = n_updated = n_skipped = n_error = 0
|
||||
t0 = time.monotonic()
|
||||
|
||||
dirs = sorted(find_episode_dirs(root))
|
||||
print(f"Found {len(dirs)} complete episode directories under {root}")
|
||||
|
||||
for i, d in enumerate(dirs, 1):
|
||||
try:
|
||||
status = import_episode(conn, d, root)
|
||||
if status == "inserted": n_inserted += 1
|
||||
elif status == "updated": n_updated += 1
|
||||
elif status == "skipped": n_skipped += 1
|
||||
except Exception as e:
|
||||
n_error += 1
|
||||
print(f"ERROR {d.relative_to(root)}: {e}")
|
||||
if i % 50 == 0:
|
||||
print(f" {i}/{len(dirs)} ins={n_inserted} upd={n_updated} skip={n_skipped} err={n_error}")
|
||||
|
||||
conn.executescript("ANALYZE;")
|
||||
if args.vacuum:
|
||||
conn.executescript("VACUUM;")
|
||||
conn.close()
|
||||
|
||||
size_mb = db.stat().st_size / 1024 / 1024
|
||||
elapsed = time.monotonic() - t0
|
||||
print(f"\n=== Done in {elapsed:.1f}s ===")
|
||||
print(f" inserted : {n_inserted}")
|
||||
print(f" updated : {n_updated}")
|
||||
print(f" skipped : {n_skipped}")
|
||||
print(f" errors : {n_error}")
|
||||
print(f" db : {db} ({size_mb:.1f} MB)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user