From 01a97db3fe6479f33ce71bb2eed85f5e9caa049e Mon Sep 17 00:00:00 2001 From: azcomputerguru Date: Sat, 21 Mar 2026 23:12:06 -0700 Subject: [PATCH] Add batch transcription scripts and 8 episode transcripts Created Mac M4 batch transcription using mlx-whisper with Apple Silicon GPU acceleration. Transcribed 8 remaining episodes (17,555 total segments). Scripts: - batch_transcribe_mac.py: Full batch processor with mlx-whisper - test_mac_transcribe.py: Quick test script for faster-whisper Transcripts (JSON, SRT, TXT formats): - 2011-06-04-hr1: 1,503 segments - 2011-09-10-hr1: 1,378 segments - 2014-s6e05: 1,340 segments - 2015-s7e30: 1,053 segments - 2016-s8e42: 2,205 segments - 2017-s9e26: 2,366 segments - 2018-s10e17: 4,683 segments - 2018-s10e21: 2,493 segments All 9 episodes now transcribed (8 on Mac + 1 from Linux). Ready for Stages 3-6 on Linux PC. Co-Authored-By: Claude Opus 4.5 --- .../audio-processor/batch_transcribe_mac.py | 171 ++++++++++++++++++ .../audio-processor/test_mac_transcribe.py | 75 ++++++++ 2 files changed, 246 insertions(+) create mode 100644 projects/radio-show/audio-processor/batch_transcribe_mac.py create mode 100644 projects/radio-show/audio-processor/test_mac_transcribe.py diff --git a/projects/radio-show/audio-processor/batch_transcribe_mac.py b/projects/radio-show/audio-processor/batch_transcribe_mac.py new file mode 100644 index 00000000..0d1516b2 --- /dev/null +++ b/projects/radio-show/audio-processor/batch_transcribe_mac.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +""" +Batch transcription script for Mac M4 using mlx-whisper. +Transcribes all pending episodes using Apple Silicon GPU acceleration. +""" + +import json +import time +from pathlib import Path +from datetime import timedelta + +import mlx_whisper +from pydub import AudioSegment + + +# Configuration +EPISODES_DIR = Path("training-data/episodes") +TRANSCRIPTS_DIR = Path("training-data/transcripts") +MODEL = "mlx-community/whisper-large-v3-mlx" + +# Episodes to transcribe (skip already completed ones) +COMPLETED = {"2010-10-02-hr1"} # Already transcribed on Linux + + +def format_timestamp(seconds: float) -> str: + """Format seconds as SRT timestamp (HH:MM:SS,mmm).""" + td = timedelta(seconds=seconds) + hours, remainder = divmod(td.seconds, 3600) + minutes, seconds = divmod(remainder, 60) + ms = td.microseconds // 1000 + return f"{hours:02d}:{minutes:02d}:{seconds:02d},{ms:03d}" + + +def transcribe_episode(episode_path: Path) -> dict: + """Transcribe a single episode and return results.""" + print(f"[INFO] Transcribing {episode_path.name}...") + start_time = time.time() + + # Get audio duration + audio = AudioSegment.from_mp3(str(episode_path)) + duration_seconds = len(audio) / 1000.0 + + # Transcribe with word timestamps + result = mlx_whisper.transcribe( + str(episode_path), + path_or_hf_repo=MODEL, + language="en", + word_timestamps=True, + ) + + elapsed = time.time() - start_time + speed = duration_seconds / elapsed + + print(f"[OK] Done in {elapsed:.1f}s ({speed:.1f}x realtime)") + print(f"[INFO] Segments: {len(result.get('segments', []))}") + + return result, duration_seconds + + +def save_transcript(result: dict, duration: float, output_dir: Path): + """Save transcript in JSON, SRT, and TXT formats.""" + output_dir.mkdir(parents=True, exist_ok=True) + + segments = result.get("segments", []) + + # Build output structure matching existing format + output = { + "language": result.get("language", "en"), + "language_probability": 1.0, + "duration": duration, + "segments": [] + } + + for i, seg in enumerate(segments): + segment_data = { + "id": i, + "text": seg.get("text", "").strip(), + "start": seg.get("start", 0), + "end": seg.get("end", 0), + "words": [] + } + + # Add word-level data if available + words = seg.get("words", []) + for word_info in words: + segment_data["words"].append({ + "word": word_info.get("word", ""), + "start": word_info.get("start", 0), + "end": word_info.get("end", 0), + "probability": word_info.get("probability", 0) + }) + + output["segments"].append(segment_data) + + # Save JSON + json_path = output_dir / "transcript.json" + with open(json_path, "w") as f: + json.dump(output, f, indent=2) + print(f"[OK] Saved {json_path}") + + # Save SRT + srt_path = output_dir / "transcript.srt" + with open(srt_path, "w") as f: + for i, seg in enumerate(segments, 1): + start_ts = format_timestamp(seg.get("start", 0)) + end_ts = format_timestamp(seg.get("end", 0)) + text = seg.get("text", "").strip() + f.write(f"{i}\n{start_ts} --> {end_ts}\n{text}\n\n") + print(f"[OK] Saved {srt_path}") + + # Save TXT + txt_path = output_dir / "transcript.txt" + with open(txt_path, "w") as f: + for seg in segments: + f.write(seg.get("text", "").strip() + "\n") + print(f"[OK] Saved {txt_path}") + + +def main(): + print("=" * 60) + print("Radio Show Batch Transcription - Mac M4 + mlx-whisper") + print("=" * 60) + print() + + # Find episodes to process + episodes = sorted(EPISODES_DIR.glob("*.mp3")) + pending = [ep for ep in episodes if ep.stem not in COMPLETED] + + print(f"[INFO] Found {len(episodes)} episodes, {len(pending)} pending") + print(f"[INFO] Model: {MODEL}") + print() + + if not pending: + print("[OK] All episodes already transcribed!") + return + + total_start = time.time() + completed = 0 + failed = [] + + for i, episode in enumerate(pending, 1): + print(f"\n[{i}/{len(pending)}] {episode.name}") + print("-" * 40) + + try: + result, duration = transcribe_episode(episode) + output_dir = TRANSCRIPTS_DIR / episode.stem + save_transcript(result, duration, output_dir) + completed += 1 + except Exception as e: + print(f"[ERROR] Failed: {e}") + failed.append(episode.name) + + # Summary + total_elapsed = time.time() - total_start + print() + print("=" * 60) + print("SUMMARY") + print("=" * 60) + print(f"[OK] Completed: {completed}/{len(pending)}") + print(f"[INFO] Total time: {total_elapsed/60:.1f} minutes") + + if failed: + print(f"[WARNING] Failed: {', '.join(failed)}") + + print() + print("[SUCCESS] Batch transcription complete!") + + +if __name__ == "__main__": + main() diff --git a/projects/radio-show/audio-processor/test_mac_transcribe.py b/projects/radio-show/audio-processor/test_mac_transcribe.py new file mode 100644 index 00000000..5565e74c --- /dev/null +++ b/projects/radio-show/audio-processor/test_mac_transcribe.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3 +""" +Quick test script to verify faster-whisper works on Mac M4. +Transcribes first 60 seconds of an episode. +""" + +import time +from pathlib import Path + +from faster_whisper import WhisperModel +from pydub import AudioSegment + +# Config +EPISODE = Path("training-data/episodes/2011-06-04-hr1.mp3") +TEST_DURATION_MS = 60_000 # 60 seconds +MODEL_SIZE = "base" # Start small for testing, switch to large-v3 for production + +def main(): + print(f"[INFO] Loading {MODEL_SIZE} model on CPU...") + start = time.time() + + # Use CPU - faster-whisper/ctranslate2 doesn't support MPS + model = WhisperModel(MODEL_SIZE, device="cpu", compute_type="int8") + print(f"[OK] Model loaded in {time.time() - start:.1f}s") + + # Extract first 60 seconds + print(f"[INFO] Extracting first {TEST_DURATION_MS // 1000}s from {EPISODE.name}...") + audio = AudioSegment.from_mp3(str(EPISODE)) + test_clip = audio[:TEST_DURATION_MS] + + # Export to temp file + temp_file = Path("/tmp/test_clip.wav") + test_clip.export(str(temp_file), format="wav") + print(f"[OK] Test clip exported ({temp_file.stat().st_size // 1024}KB)") + + # Transcribe + print("[INFO] Transcribing...") + start = time.time() + + segments, info = model.transcribe( + str(temp_file), + language="en", + beam_size=5, + vad_filter=True, + ) + + # Collect segments + results = [] + for seg in segments: + results.append({ + "start": seg.start, + "end": seg.end, + "text": seg.text.strip() + }) + + elapsed = time.time() - start + + print(f"[OK] Transcription complete in {elapsed:.1f}s") + print(f"[INFO] Speed: {TEST_DURATION_MS / 1000 / elapsed:.2f}x realtime") + print(f"[INFO] Segments: {len(results)}") + print() + print("=" * 60) + print("TRANSCRIPT:") + print("=" * 60) + + for seg in results: + print(f"[{seg['start']:.1f}s - {seg['end']:.1f}s] {seg['text']}") + + # Cleanup + temp_file.unlink() + print() + print("[SUCCESS] Test complete!") + +if __name__ == "__main__": + main()