Created Mac M4 batch transcription using mlx-whisper with Apple Silicon GPU acceleration. Transcribed 8 remaining episodes (17,555 total segments). Scripts: - batch_transcribe_mac.py: Full batch processor with mlx-whisper - test_mac_transcribe.py: Quick test script for faster-whisper Transcripts (JSON, SRT, TXT formats): - 2011-06-04-hr1: 1,503 segments - 2011-09-10-hr1: 1,378 segments - 2014-s6e05: 1,340 segments - 2015-s7e30: 1,053 segments - 2016-s8e42: 2,205 segments - 2017-s9e26: 2,366 segments - 2018-s10e17: 4,683 segments - 2018-s10e21: 2,493 segments All 9 episodes now transcribed (8 on Mac + 1 from Linux). Ready for Stages 3-6 on Linux PC. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
172 lines
4.9 KiB
Python
172 lines
4.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Batch transcription script for Mac M4 using mlx-whisper.
|
|
Transcribes all pending episodes using Apple Silicon GPU acceleration.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from datetime import timedelta
|
|
|
|
import mlx_whisper
|
|
from pydub import AudioSegment
|
|
|
|
|
|
# Configuration
|
|
EPISODES_DIR = Path("training-data/episodes")
|
|
TRANSCRIPTS_DIR = Path("training-data/transcripts")
|
|
MODEL = "mlx-community/whisper-large-v3-mlx"
|
|
|
|
# Episodes to transcribe (skip already completed ones)
|
|
COMPLETED = {"2010-10-02-hr1"} # Already transcribed on Linux
|
|
|
|
|
|
def format_timestamp(seconds: float) -> str:
|
|
"""Format seconds as SRT timestamp (HH:MM:SS,mmm)."""
|
|
td = timedelta(seconds=seconds)
|
|
hours, remainder = divmod(td.seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
ms = td.microseconds // 1000
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d},{ms:03d}"
|
|
|
|
|
|
def transcribe_episode(episode_path: Path) -> dict:
|
|
"""Transcribe a single episode and return results."""
|
|
print(f"[INFO] Transcribing {episode_path.name}...")
|
|
start_time = time.time()
|
|
|
|
# Get audio duration
|
|
audio = AudioSegment.from_mp3(str(episode_path))
|
|
duration_seconds = len(audio) / 1000.0
|
|
|
|
# Transcribe with word timestamps
|
|
result = mlx_whisper.transcribe(
|
|
str(episode_path),
|
|
path_or_hf_repo=MODEL,
|
|
language="en",
|
|
word_timestamps=True,
|
|
)
|
|
|
|
elapsed = time.time() - start_time
|
|
speed = duration_seconds / elapsed
|
|
|
|
print(f"[OK] Done in {elapsed:.1f}s ({speed:.1f}x realtime)")
|
|
print(f"[INFO] Segments: {len(result.get('segments', []))}")
|
|
|
|
return result, duration_seconds
|
|
|
|
|
|
def save_transcript(result: dict, duration: float, output_dir: Path):
|
|
"""Save transcript in JSON, SRT, and TXT formats."""
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
segments = result.get("segments", [])
|
|
|
|
# Build output structure matching existing format
|
|
output = {
|
|
"language": result.get("language", "en"),
|
|
"language_probability": 1.0,
|
|
"duration": duration,
|
|
"segments": []
|
|
}
|
|
|
|
for i, seg in enumerate(segments):
|
|
segment_data = {
|
|
"id": i,
|
|
"text": seg.get("text", "").strip(),
|
|
"start": seg.get("start", 0),
|
|
"end": seg.get("end", 0),
|
|
"words": []
|
|
}
|
|
|
|
# Add word-level data if available
|
|
words = seg.get("words", [])
|
|
for word_info in words:
|
|
segment_data["words"].append({
|
|
"word": word_info.get("word", ""),
|
|
"start": word_info.get("start", 0),
|
|
"end": word_info.get("end", 0),
|
|
"probability": word_info.get("probability", 0)
|
|
})
|
|
|
|
output["segments"].append(segment_data)
|
|
|
|
# Save JSON
|
|
json_path = output_dir / "transcript.json"
|
|
with open(json_path, "w") as f:
|
|
json.dump(output, f, indent=2)
|
|
print(f"[OK] Saved {json_path}")
|
|
|
|
# Save SRT
|
|
srt_path = output_dir / "transcript.srt"
|
|
with open(srt_path, "w") as f:
|
|
for i, seg in enumerate(segments, 1):
|
|
start_ts = format_timestamp(seg.get("start", 0))
|
|
end_ts = format_timestamp(seg.get("end", 0))
|
|
text = seg.get("text", "").strip()
|
|
f.write(f"{i}\n{start_ts} --> {end_ts}\n{text}\n\n")
|
|
print(f"[OK] Saved {srt_path}")
|
|
|
|
# Save TXT
|
|
txt_path = output_dir / "transcript.txt"
|
|
with open(txt_path, "w") as f:
|
|
for seg in segments:
|
|
f.write(seg.get("text", "").strip() + "\n")
|
|
print(f"[OK] Saved {txt_path}")
|
|
|
|
|
|
def main():
|
|
print("=" * 60)
|
|
print("Radio Show Batch Transcription - Mac M4 + mlx-whisper")
|
|
print("=" * 60)
|
|
print()
|
|
|
|
# Find episodes to process
|
|
episodes = sorted(EPISODES_DIR.glob("*.mp3"))
|
|
pending = [ep for ep in episodes if ep.stem not in COMPLETED]
|
|
|
|
print(f"[INFO] Found {len(episodes)} episodes, {len(pending)} pending")
|
|
print(f"[INFO] Model: {MODEL}")
|
|
print()
|
|
|
|
if not pending:
|
|
print("[OK] All episodes already transcribed!")
|
|
return
|
|
|
|
total_start = time.time()
|
|
completed = 0
|
|
failed = []
|
|
|
|
for i, episode in enumerate(pending, 1):
|
|
print(f"\n[{i}/{len(pending)}] {episode.name}")
|
|
print("-" * 40)
|
|
|
|
try:
|
|
result, duration = transcribe_episode(episode)
|
|
output_dir = TRANSCRIPTS_DIR / episode.stem
|
|
save_transcript(result, duration, output_dir)
|
|
completed += 1
|
|
except Exception as e:
|
|
print(f"[ERROR] Failed: {e}")
|
|
failed.append(episode.name)
|
|
|
|
# Summary
|
|
total_elapsed = time.time() - total_start
|
|
print()
|
|
print("=" * 60)
|
|
print("SUMMARY")
|
|
print("=" * 60)
|
|
print(f"[OK] Completed: {completed}/{len(pending)}")
|
|
print(f"[INFO] Total time: {total_elapsed/60:.1f} minutes")
|
|
|
|
if failed:
|
|
print(f"[WARNING] Failed: {', '.join(failed)}")
|
|
|
|
print()
|
|
print("[SUCCESS] Batch transcription complete!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|