Files
Mike Swanson 82940d96d7 radio: utf-8 transcript writes + sqlite archive importer + session log
- src/transcriber.py: open transcript.{json,txt,srt} with encoding="utf-8".
  Windows cp1252 default crashed on Whisper output containing U+2044.
- import_to_sqlite.py: new. Walks archive-data/transcripts, builds
  archive.db (5 tables + 2 FTS5 virtual tables, sha256-keyed idempotency).
  20.5 MB / 208 episodes at smoke-test time, 1.9s rebuild.
- batch_process.py: tracked from prior session — full-archive batch with
  resumable transcribe/diarize/intros/qa pipeline.
- .gitignore: archive-data/ and logs/.

Session log: 2026-04-27-archive-batch-and-sqlite-import.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 19:38:02 -07:00

207 lines
7.4 KiB
Python

"""
Batch-process the full archive: transcribe, diarize (with bumper filter +
current profiles), extract intros and Q&A pairs. Resumable — skips any
episode whose outputs already exist.
Output layout mirrors archive-data/episodes/ tree:
archive-data/episodes/<year>/.../<stem>.mp3
archive-data/transcripts/<year>/.../<stem>/{transcript,diarization,intros,qa}.json
Skips in-progress download files (modified within last 60s).
"""
import os
import sys
import time
import json
from pathlib import Path
os.environ["PYTHONIOENCODING"] = "utf-8"
os.environ["TRANSFORMERS_OFFLINE"] = "1"
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
from src.gpu import ensure_cuda_libs
ensure_cuda_libs()
import torch
from src.config import load_config
from src.diarizer import diarize, VoiceProfileStore
from src.qa_extractor import (
load_diarized_transcript, extract_qa_pairs, attach_caller_names,
)
from src.speaker_oracle import extract_intros
from src.transcriber import transcribe as _transcribe
from rich.console import Console
console = Console()
BASE = Path(__file__).parent
EPISODES_ROOT = BASE / "archive-data" / "episodes"
TRANSCRIPTS_ROOT = BASE / "archive-data" / "transcripts"
INPROGRESS_GRACE_S = 60.0 # skip files modified within this many seconds
if not EPISODES_ROOT.exists():
console.print(f"[red]No episodes at {EPISODES_ROOT}[/red]")
sys.exit(1)
config = load_config()
device = "cuda" if torch.cuda.is_available() else "cpu"
console.print(f"[bold]Batch process[/bold] device={device} ({torch.cuda.get_device_name(0) if device=='cuda' else 'CPU'})")
voice_profiles = VoiceProfileStore(
config.resolve_path(config.diarization.voice_profiles_dir)
)
def out_dir_for(mp3: Path) -> Path:
rel = mp3.relative_to(EPISODES_ROOT)
return TRANSCRIPTS_ROOT / rel.parent / rel.stem
def is_inprogress(mp3: Path) -> bool:
try:
mtime = mp3.stat().st_mtime
except FileNotFoundError:
return True
return (time.time() - mtime) < INPROGRESS_GRACE_S
# Collect all MP3s
all_mp3s = sorted(p for p in EPISODES_ROOT.rglob("*.mp3") if p.is_file())
all_mp3s += sorted(p for p in EPISODES_ROOT.rglob("*.MP3") if p.is_file())
all_mp3s = sorted(set(all_mp3s))
console.print(f"Found {len(all_mp3s)} MP3 files in {EPISODES_ROOT}")
t0_total = time.monotonic()
processed = 0
skipped_done = 0
skipped_inprogress = 0
errors: list[str] = []
# Aggregate intros across all transcribed episodes
all_intros: dict[str, dict] = {} # name -> {count, role_hints, episodes}
for idx, mp3 in enumerate(all_mp3s, 1):
rel = mp3.relative_to(EPISODES_ROOT)
if is_inprogress(mp3):
skipped_inprogress += 1
continue
out_dir = out_dir_for(mp3)
transcript_path = out_dir / "transcript.json"
diarization_path = out_dir / "diarization.json"
intros_path = out_dir / "intros.json"
qa_path = out_dir / "qa.json"
needs_transcribe = not transcript_path.exists()
needs_diarize = not diarization_path.exists()
needs_intros = not intros_path.exists()
needs_qa = not qa_path.exists()
if not (needs_transcribe or needs_diarize or needs_intros or needs_qa):
skipped_done += 1
continue
out_dir.mkdir(parents=True, exist_ok=True)
console.print(f"\n[{idx}/{len(all_mp3s)}] {rel}")
t0_ep = time.monotonic()
try:
if needs_transcribe:
t0 = time.monotonic()
console.print(" transcribing...")
transcript = _transcribe(mp3, model_size="large-v3", device=device, batch_size=16)
transcript.save(out_dir)
wall = time.monotonic() - t0
rtf = transcript.duration / wall if wall > 0 else 0
console.print(f" [green]transcribed: {transcript.duration:.0f}s in {wall:.1f}s ({rtf:.1f}x)[/green]")
if needs_diarize:
t0 = time.monotonic()
console.print(" diarizing...")
result = diarize(mp3, voice_profiles=voice_profiles,
host_match_threshold=0.85,
transcript_path=transcript_path)
result.save(out_dir)
wall = time.monotonic() - t0
audio_dur = result.turns[-1].end if result.turns else 0
rtf = audio_dur / wall if wall > 0 else 0
console.print(f" [green]diarized: {len(result.turns)} turns in {wall:.1f}s ({rtf:.1f}x)[/green]")
if needs_intros:
with open(transcript_path) as f:
tdata = json.load(f)
intros = extract_intros(tdata.get("segments", []))
with open(intros_path, "w") as f:
json.dump([
{
"name": i.name,
"role_hint": i.role_hint,
"intro_time": i.intro_time,
"affiliation": i.affiliation,
"fillin_for": i.fillin_for,
"source_text": i.source_text[:200],
} for i in intros
], f, indent=2)
for intro in intros:
rec = all_intros.setdefault(intro.name, {
"count": 0, "roles": set(), "episodes": set(),
})
rec["count"] += 1
rec["roles"].add(intro.role_hint)
rec["episodes"].add(str(rel))
console.print(f" [green]intros: {len(intros)} extracted[/green]")
if needs_qa:
with open(transcript_path) as f:
tdata = json.load(f)
segments = load_diarized_transcript(transcript_path, diarization_path)
pairs = extract_qa_pairs(segments)
attach_caller_names(pairs, tdata.get("segments", []))
with open(qa_path, "w") as f:
json.dump([p.to_dict() for p in pairs], f, indent=2)
named = sum(1 for p in pairs if p.caller_name)
console.print(f" [green]Q&A: {len(pairs)} pairs ({named} named)[/green]")
processed += 1
except Exception as e:
errors.append(f"{rel}: {e}")
console.print(f" [red]ERROR: {e}[/red]")
continue
if idx % 20 == 0:
elapsed = time.monotonic() - t0_total
console.print(f"\n[bold]progress[/bold] {idx}/{len(all_mp3s)} "
f"({processed} processed, {skipped_done} cached, {skipped_inprogress} in-progress, "
f"{len(errors)} errors) — {elapsed/60:.1f} min elapsed\n")
# Persist aggregated intro roster
roster_path = TRANSCRIPTS_ROOT / "intro_roster.json"
roster = {}
for name, rec in all_intros.items():
roster[name] = {
"count": rec["count"],
"roles": sorted(rec["roles"]),
"episode_count": len(rec["episodes"]),
"episodes": sorted(rec["episodes"])[:20], # cap to first 20 for readability
}
roster = dict(sorted(roster.items(), key=lambda x: -x[1]["count"]))
roster_path.parent.mkdir(parents=True, exist_ok=True)
with open(roster_path, "w") as f:
json.dump(roster, f, indent=2)
elapsed = time.monotonic() - t0_total
console.print(f"\n[bold green]=== Done ===[/bold green]")
console.print(f" processed : {processed}")
console.print(f" cached (skipped): {skipped_done}")
console.print(f" in-progress : {skipped_inprogress}")
console.print(f" errors : {len(errors)}")
console.print(f" wall time : {elapsed/60:.1f} min")
console.print(f" roster written : {roster_path} ({len(roster)} unique names)")
if errors:
console.print("\n[yellow]Errors:[/yellow]")
for e in errors[:20]:
console.print(f" {e}")