Files
claudetools/projects/radio-show/audio-processor/batch_transcribe_mac.py
azcomputerguru a3a47f2d5e Add batch transcription scripts and 8 episode transcripts
Created Mac M4 batch transcription using mlx-whisper with Apple Silicon
GPU acceleration. Transcribed 8 remaining episodes (17,555 total segments).

Scripts:
- batch_transcribe_mac.py: Full batch processor with mlx-whisper
- test_mac_transcribe.py: Quick test script for faster-whisper

Transcripts (JSON, SRT, TXT formats):
- 2011-06-04-hr1: 1,503 segments
- 2011-09-10-hr1: 1,378 segments
- 2014-s6e05: 1,340 segments
- 2015-s7e30: 1,053 segments
- 2016-s8e42: 2,205 segments
- 2017-s9e26: 2,366 segments
- 2018-s10e17: 4,683 segments
- 2018-s10e21: 2,493 segments

All 9 episodes now transcribed (8 on Mac + 1 from Linux).
Ready for Stages 3-6 on Linux PC.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-21 23:12:06 -07:00

172 lines
4.9 KiB
Python

#!/usr/bin/env python3
"""
Batch transcription script for Mac M4 using mlx-whisper.
Transcribes all pending episodes using Apple Silicon GPU acceleration.
"""
import json
import time
from pathlib import Path
from datetime import timedelta
import mlx_whisper
from pydub import AudioSegment
# Configuration
EPISODES_DIR = Path("training-data/episodes")
TRANSCRIPTS_DIR = Path("training-data/transcripts")
MODEL = "mlx-community/whisper-large-v3-mlx"
# Episodes to transcribe (skip already completed ones)
COMPLETED = {"2010-10-02-hr1"} # Already transcribed on Linux
def format_timestamp(seconds: float) -> str:
"""Format seconds as SRT timestamp (HH:MM:SS,mmm)."""
td = timedelta(seconds=seconds)
hours, remainder = divmod(td.seconds, 3600)
minutes, seconds = divmod(remainder, 60)
ms = td.microseconds // 1000
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{ms:03d}"
def transcribe_episode(episode_path: Path) -> dict:
"""Transcribe a single episode and return results."""
print(f"[INFO] Transcribing {episode_path.name}...")
start_time = time.time()
# Get audio duration
audio = AudioSegment.from_mp3(str(episode_path))
duration_seconds = len(audio) / 1000.0
# Transcribe with word timestamps
result = mlx_whisper.transcribe(
str(episode_path),
path_or_hf_repo=MODEL,
language="en",
word_timestamps=True,
)
elapsed = time.time() - start_time
speed = duration_seconds / elapsed
print(f"[OK] Done in {elapsed:.1f}s ({speed:.1f}x realtime)")
print(f"[INFO] Segments: {len(result.get('segments', []))}")
return result, duration_seconds
def save_transcript(result: dict, duration: float, output_dir: Path):
"""Save transcript in JSON, SRT, and TXT formats."""
output_dir.mkdir(parents=True, exist_ok=True)
segments = result.get("segments", [])
# Build output structure matching existing format
output = {
"language": result.get("language", "en"),
"language_probability": 1.0,
"duration": duration,
"segments": []
}
for i, seg in enumerate(segments):
segment_data = {
"id": i,
"text": seg.get("text", "").strip(),
"start": seg.get("start", 0),
"end": seg.get("end", 0),
"words": []
}
# Add word-level data if available
words = seg.get("words", [])
for word_info in words:
segment_data["words"].append({
"word": word_info.get("word", ""),
"start": word_info.get("start", 0),
"end": word_info.get("end", 0),
"probability": word_info.get("probability", 0)
})
output["segments"].append(segment_data)
# Save JSON
json_path = output_dir / "transcript.json"
with open(json_path, "w") as f:
json.dump(output, f, indent=2)
print(f"[OK] Saved {json_path}")
# Save SRT
srt_path = output_dir / "transcript.srt"
with open(srt_path, "w") as f:
for i, seg in enumerate(segments, 1):
start_ts = format_timestamp(seg.get("start", 0))
end_ts = format_timestamp(seg.get("end", 0))
text = seg.get("text", "").strip()
f.write(f"{i}\n{start_ts} --> {end_ts}\n{text}\n\n")
print(f"[OK] Saved {srt_path}")
# Save TXT
txt_path = output_dir / "transcript.txt"
with open(txt_path, "w") as f:
for seg in segments:
f.write(seg.get("text", "").strip() + "\n")
print(f"[OK] Saved {txt_path}")
def main():
print("=" * 60)
print("Radio Show Batch Transcription - Mac M4 + mlx-whisper")
print("=" * 60)
print()
# Find episodes to process
episodes = sorted(EPISODES_DIR.glob("*.mp3"))
pending = [ep for ep in episodes if ep.stem not in COMPLETED]
print(f"[INFO] Found {len(episodes)} episodes, {len(pending)} pending")
print(f"[INFO] Model: {MODEL}")
print()
if not pending:
print("[OK] All episodes already transcribed!")
return
total_start = time.time()
completed = 0
failed = []
for i, episode in enumerate(pending, 1):
print(f"\n[{i}/{len(pending)}] {episode.name}")
print("-" * 40)
try:
result, duration = transcribe_episode(episode)
output_dir = TRANSCRIPTS_DIR / episode.stem
save_transcript(result, duration, output_dir)
completed += 1
except Exception as e:
print(f"[ERROR] Failed: {e}")
failed.append(episode.name)
# Summary
total_elapsed = time.time() - total_start
print()
print("=" * 60)
print("SUMMARY")
print("=" * 60)
print(f"[OK] Completed: {completed}/{len(pending)}")
print(f"[INFO] Total time: {total_elapsed/60:.1f} minutes")
if failed:
print(f"[WARNING] Failed: {', '.join(failed)}")
print()
print("[SUCCESS] Batch transcription complete!")
if __name__ == "__main__":
main()