- src/transcriber.py: open transcript.{json,txt,srt} with encoding="utf-8".
Windows cp1252 default crashed on Whisper output containing U+2044.
- import_to_sqlite.py: new. Walks archive-data/transcripts, builds
archive.db (5 tables + 2 FTS5 virtual tables, sha256-keyed idempotency).
20.5 MB / 208 episodes at smoke-test time, 1.9s rebuild.
- batch_process.py: tracked from prior session — full-archive batch with
resumable transcribe/diarize/intros/qa pipeline.
- .gitignore: archive-data/ and logs/.
Session log: 2026-04-27-archive-batch-and-sqlite-import.md.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
207 lines
7.4 KiB
Python
207 lines
7.4 KiB
Python
"""
|
|
Batch-process the full archive: transcribe, diarize (with bumper filter +
|
|
current profiles), extract intros and Q&A pairs. Resumable — skips any
|
|
episode whose outputs already exist.
|
|
|
|
Output layout mirrors archive-data/episodes/ tree:
|
|
archive-data/episodes/<year>/.../<stem>.mp3
|
|
archive-data/transcripts/<year>/.../<stem>/{transcript,diarization,intros,qa}.json
|
|
|
|
Skips in-progress download files (modified within last 60s).
|
|
"""
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
|
|
os.environ["PYTHONIOENCODING"] = "utf-8"
|
|
os.environ["TRANSFORMERS_OFFLINE"] = "1"
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
|
|
from src.gpu import ensure_cuda_libs
|
|
ensure_cuda_libs()
|
|
|
|
import torch
|
|
from src.config import load_config
|
|
from src.diarizer import diarize, VoiceProfileStore
|
|
from src.qa_extractor import (
|
|
load_diarized_transcript, extract_qa_pairs, attach_caller_names,
|
|
)
|
|
from src.speaker_oracle import extract_intros
|
|
from src.transcriber import transcribe as _transcribe
|
|
from rich.console import Console
|
|
|
|
console = Console()
|
|
BASE = Path(__file__).parent
|
|
EPISODES_ROOT = BASE / "archive-data" / "episodes"
|
|
TRANSCRIPTS_ROOT = BASE / "archive-data" / "transcripts"
|
|
|
|
INPROGRESS_GRACE_S = 60.0 # skip files modified within this many seconds
|
|
|
|
if not EPISODES_ROOT.exists():
|
|
console.print(f"[red]No episodes at {EPISODES_ROOT}[/red]")
|
|
sys.exit(1)
|
|
|
|
config = load_config()
|
|
device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
console.print(f"[bold]Batch process[/bold] device={device} ({torch.cuda.get_device_name(0) if device=='cuda' else 'CPU'})")
|
|
|
|
voice_profiles = VoiceProfileStore(
|
|
config.resolve_path(config.diarization.voice_profiles_dir)
|
|
)
|
|
|
|
|
|
def out_dir_for(mp3: Path) -> Path:
|
|
rel = mp3.relative_to(EPISODES_ROOT)
|
|
return TRANSCRIPTS_ROOT / rel.parent / rel.stem
|
|
|
|
|
|
def is_inprogress(mp3: Path) -> bool:
|
|
try:
|
|
mtime = mp3.stat().st_mtime
|
|
except FileNotFoundError:
|
|
return True
|
|
return (time.time() - mtime) < INPROGRESS_GRACE_S
|
|
|
|
|
|
# Collect all MP3s
|
|
all_mp3s = sorted(p for p in EPISODES_ROOT.rglob("*.mp3") if p.is_file())
|
|
all_mp3s += sorted(p for p in EPISODES_ROOT.rglob("*.MP3") if p.is_file())
|
|
all_mp3s = sorted(set(all_mp3s))
|
|
console.print(f"Found {len(all_mp3s)} MP3 files in {EPISODES_ROOT}")
|
|
|
|
t0_total = time.monotonic()
|
|
processed = 0
|
|
skipped_done = 0
|
|
skipped_inprogress = 0
|
|
errors: list[str] = []
|
|
|
|
# Aggregate intros across all transcribed episodes
|
|
all_intros: dict[str, dict] = {} # name -> {count, role_hints, episodes}
|
|
|
|
for idx, mp3 in enumerate(all_mp3s, 1):
|
|
rel = mp3.relative_to(EPISODES_ROOT)
|
|
|
|
if is_inprogress(mp3):
|
|
skipped_inprogress += 1
|
|
continue
|
|
|
|
out_dir = out_dir_for(mp3)
|
|
transcript_path = out_dir / "transcript.json"
|
|
diarization_path = out_dir / "diarization.json"
|
|
intros_path = out_dir / "intros.json"
|
|
qa_path = out_dir / "qa.json"
|
|
|
|
needs_transcribe = not transcript_path.exists()
|
|
needs_diarize = not diarization_path.exists()
|
|
needs_intros = not intros_path.exists()
|
|
needs_qa = not qa_path.exists()
|
|
|
|
if not (needs_transcribe or needs_diarize or needs_intros or needs_qa):
|
|
skipped_done += 1
|
|
continue
|
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
console.print(f"\n[{idx}/{len(all_mp3s)}] {rel}")
|
|
t0_ep = time.monotonic()
|
|
|
|
try:
|
|
if needs_transcribe:
|
|
t0 = time.monotonic()
|
|
console.print(" transcribing...")
|
|
transcript = _transcribe(mp3, model_size="large-v3", device=device, batch_size=16)
|
|
transcript.save(out_dir)
|
|
wall = time.monotonic() - t0
|
|
rtf = transcript.duration / wall if wall > 0 else 0
|
|
console.print(f" [green]transcribed: {transcript.duration:.0f}s in {wall:.1f}s ({rtf:.1f}x)[/green]")
|
|
|
|
if needs_diarize:
|
|
t0 = time.monotonic()
|
|
console.print(" diarizing...")
|
|
result = diarize(mp3, voice_profiles=voice_profiles,
|
|
host_match_threshold=0.85,
|
|
transcript_path=transcript_path)
|
|
result.save(out_dir)
|
|
wall = time.monotonic() - t0
|
|
audio_dur = result.turns[-1].end if result.turns else 0
|
|
rtf = audio_dur / wall if wall > 0 else 0
|
|
console.print(f" [green]diarized: {len(result.turns)} turns in {wall:.1f}s ({rtf:.1f}x)[/green]")
|
|
|
|
if needs_intros:
|
|
with open(transcript_path) as f:
|
|
tdata = json.load(f)
|
|
intros = extract_intros(tdata.get("segments", []))
|
|
with open(intros_path, "w") as f:
|
|
json.dump([
|
|
{
|
|
"name": i.name,
|
|
"role_hint": i.role_hint,
|
|
"intro_time": i.intro_time,
|
|
"affiliation": i.affiliation,
|
|
"fillin_for": i.fillin_for,
|
|
"source_text": i.source_text[:200],
|
|
} for i in intros
|
|
], f, indent=2)
|
|
for intro in intros:
|
|
rec = all_intros.setdefault(intro.name, {
|
|
"count": 0, "roles": set(), "episodes": set(),
|
|
})
|
|
rec["count"] += 1
|
|
rec["roles"].add(intro.role_hint)
|
|
rec["episodes"].add(str(rel))
|
|
console.print(f" [green]intros: {len(intros)} extracted[/green]")
|
|
|
|
if needs_qa:
|
|
with open(transcript_path) as f:
|
|
tdata = json.load(f)
|
|
segments = load_diarized_transcript(transcript_path, diarization_path)
|
|
pairs = extract_qa_pairs(segments)
|
|
attach_caller_names(pairs, tdata.get("segments", []))
|
|
with open(qa_path, "w") as f:
|
|
json.dump([p.to_dict() for p in pairs], f, indent=2)
|
|
named = sum(1 for p in pairs if p.caller_name)
|
|
console.print(f" [green]Q&A: {len(pairs)} pairs ({named} named)[/green]")
|
|
|
|
processed += 1
|
|
except Exception as e:
|
|
errors.append(f"{rel}: {e}")
|
|
console.print(f" [red]ERROR: {e}[/red]")
|
|
continue
|
|
|
|
if idx % 20 == 0:
|
|
elapsed = time.monotonic() - t0_total
|
|
console.print(f"\n[bold]progress[/bold] {idx}/{len(all_mp3s)} "
|
|
f"({processed} processed, {skipped_done} cached, {skipped_inprogress} in-progress, "
|
|
f"{len(errors)} errors) — {elapsed/60:.1f} min elapsed\n")
|
|
|
|
# Persist aggregated intro roster
|
|
roster_path = TRANSCRIPTS_ROOT / "intro_roster.json"
|
|
roster = {}
|
|
for name, rec in all_intros.items():
|
|
roster[name] = {
|
|
"count": rec["count"],
|
|
"roles": sorted(rec["roles"]),
|
|
"episode_count": len(rec["episodes"]),
|
|
"episodes": sorted(rec["episodes"])[:20], # cap to first 20 for readability
|
|
}
|
|
roster = dict(sorted(roster.items(), key=lambda x: -x[1]["count"]))
|
|
roster_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(roster_path, "w") as f:
|
|
json.dump(roster, f, indent=2)
|
|
|
|
elapsed = time.monotonic() - t0_total
|
|
console.print(f"\n[bold green]=== Done ===[/bold green]")
|
|
console.print(f" processed : {processed}")
|
|
console.print(f" cached (skipped): {skipped_done}")
|
|
console.print(f" in-progress : {skipped_inprogress}")
|
|
console.print(f" errors : {len(errors)}")
|
|
console.print(f" wall time : {elapsed/60:.1f} min")
|
|
console.print(f" roster written : {roster_path} ({len(roster)} unique names)")
|
|
|
|
if errors:
|
|
console.print("\n[yellow]Errors:[/yellow]")
|
|
for e in errors[:20]:
|
|
console.print(f" {e}")
|