Add radio show audio processor and post-show workflow

- Audio processor CLI tool with 6-stage pipeline: transcribe (faster-whisper GPU),
  diarize (pyannote), detect segments (multi-signal classifier), remove commercials,
  split segments, analyze content (Ollama)
- Post-show workflow doc for episode posts, forum threads, deep-dive blog posts
- Training plan for using 579-episode archive for voice profiles and commercial detection
- Successful test: 45min episode transcribed in 2:37 on RTX 5070 Ti
- Sample transcript output from S7E30 (March 2015)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-03-21 11:51:59 -07:00
parent a8c8c6b7b6
commit a1e0442d8b
17 changed files with 58344 additions and 0 deletions

View File

@@ -0,0 +1,187 @@
"""Stage 6: Content analysis using Ollama for summary, topics, and post-show debrief."""
import json
from dataclasses import dataclass
from pathlib import Path
from rich.console import Console
console = Console()
@dataclass
class EpisodeAnalysis:
summary: str
segment_summaries: list[dict] # [{title, summary, key_points}]
key_quotes: list[dict] # [{quote, speaker, timestamp}]
topics: list[str]
tags: list[str]
blog_post_candidates: list[dict] # [{title, angle, why}]
debrief_draft: str # Markdown debrief template
def to_dict(self) -> dict:
return {
"summary": self.summary,
"segment_summaries": self.segment_summaries,
"key_quotes": self.key_quotes,
"topics": self.topics,
"tags": self.tags,
"blog_post_candidates": self.blog_post_candidates,
}
def save(self, output_dir: Path):
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_dir / "analysis.json", "w") as f:
json.dump(self.to_dict(), f, indent=2)
with open(output_dir / "post-show-debrief.md", "w") as f:
f.write(self.debrief_draft)
console.print(f"[green]Analysis saved to {output_dir}[/green]")
def analyze_episode(transcript_text: str, diarization_data: dict | None = None,
show_prep: str | None = None, segments: list | None = None,
model: str = "qwen3:14b",
ollama_host: str = "http://localhost:11434") -> EpisodeAnalysis:
"""Analyze a transcribed episode using a local LLM."""
import ollama as ollama_client
console.print(f"[bold]Analyzing episode with {model}[/bold]")
client = ollama_client.Client(host=ollama_host)
# Build context for the LLM
context_parts = []
if show_prep:
context_parts.append(f"## Show Prep (planned topics)\n\n{show_prep[:3000]}")
context_parts.append(f"## Transcript\n\n{transcript_text[:12000]}")
if diarization_data:
speakers = diarization_data.get("speaker_map", {})
if speakers:
speaker_info = "\n".join(f"- {v}" for v in speakers.values())
context_parts.append(f"## Speakers Identified\n\n{speaker_info}")
context = "\n\n---\n\n".join(context_parts)
# Query 1: Episode summary and segment summaries
summary_prompt = f"""You are analyzing a radio show episode transcript.
Provide a JSON response with:
1. "summary": A 2-3 paragraph episode summary suitable for a podcast episode page.
Write in third person. Be specific about topics discussed.
2. "segment_summaries": An array of objects, each with:
- "title": A compelling segment title
- "summary": 3-5 sentence summary
- "key_points": Array of key takeaway bullet points
3. "topics": Array of main topics discussed (short phrases)
4. "tags": Array of SEO-friendly tags (lowercase, hyphenated)
5. "key_quotes": Array of notable quotes, each with:
- "quote": The quote text
- "speaker": Who said it (if identifiable)
- "context": Brief context
6. "blog_post_candidates": Array of topics worth expanding into blog posts, each with:
- "title": Proposed blog post title
- "angle": The specific angle or thesis
- "why": Why this topic deserves expansion
Respond ONLY with valid JSON, no markdown fencing.
{context}"""
console.print("[dim]Generating episode analysis...[/dim]")
response = client.chat(
model=model,
messages=[{"role": "user", "content": summary_prompt}],
options={"temperature": 0.3, "num_ctx": 16384},
)
# Parse LLM response
response_text = response["message"]["content"]
# Strip markdown code fences if present
if "```json" in response_text:
response_text = response_text.split("```json", 1)[1]
response_text = response_text.split("```", 1)[0]
elif "```" in response_text:
response_text = response_text.split("```", 1)[1]
response_text = response_text.split("```", 1)[0]
try:
analysis_data = json.loads(response_text.strip())
except json.JSONDecodeError:
console.print("[yellow]LLM response was not valid JSON, using raw text[/yellow]")
analysis_data = {
"summary": response_text,
"segment_summaries": [],
"topics": [],
"tags": [],
"key_quotes": [],
"blog_post_candidates": [],
}
# Query 2: Generate debrief draft
debrief_prompt = f"""Based on this radio show transcript, generate a post-show debrief
in markdown format. Compare what was discussed against the show prep (planned topics)
to identify what made it in, what was cut, and what was added.
Format:
# Post-Show Debrief
## Episode: [derive title from content]
## Air Date: [today's date if not clear]
### What Made It In
[For each planned segment, note: Used / Modified / Cut]
### What Changed Live
[Topics expanded, cut short, or reordered vs. prep]
### Caller/Audience Interaction
[Any caller topics or audience engagement noted in transcript]
### Unplanned Additions
[Topics not in prep that came up]
### Best Moments
[Most compelling segments or quotes]
### Topics That Deserve More
[Topics that were rushed or generated high interest]
### Suggested Blog Posts
[2-3 specific blog post ideas with proposed titles and angles]
{context}"""
console.print("[dim]Generating debrief draft...[/dim]")
debrief_response = client.chat(
model=model,
messages=[{"role": "user", "content": debrief_prompt}],
options={"temperature": 0.4, "num_ctx": 16384},
)
debrief_text = debrief_response["message"]["content"]
console.print("[green]Analysis complete[/green]")
return EpisodeAnalysis(
summary=analysis_data.get("summary", ""),
segment_summaries=analysis_data.get("segment_summaries", []),
key_quotes=analysis_data.get("key_quotes", []),
topics=analysis_data.get("topics", []),
tags=analysis_data.get("tags", []),
blog_post_candidates=analysis_data.get("blog_post_candidates", []),
debrief_draft=debrief_text,
)

View File

@@ -0,0 +1,199 @@
"""Stage 4 & 5: Commercial removal and segment splitting using ffmpeg."""
import subprocess
import json
from dataclasses import dataclass
from pathlib import Path
from rich.console import Console
from rich.progress import Progress
from .segment_detector import SegmentType, DetectedSegment
console = Console()
@dataclass
class Chapter:
title: str
start: float
end: float
def remove_commercials(audio_path: Path, segments: list[DetectedSegment],
output_path: Path, crossfade_ms: int = 500,
bitrate: str = "192k", normalize: bool = True):
"""Stitch show segments together, removing commercials."""
show_segments = [s for s in segments
if s.segment_type in (SegmentType.SHOW_CONTENT,
SegmentType.SHOW_ELEMENT)]
if not show_segments:
console.print("[red]No show segments found![/red]")
return
console.print(f"[bold]Removing commercials:[/bold] {len(segments)} segments "
f"-> {len(show_segments)} show segments")
output_path.parent.mkdir(parents=True, exist_ok=True)
temp_dir = output_path.parent / ".temp_segments"
temp_dir.mkdir(exist_ok=True)
try:
# Extract each show segment
segment_files = []
with Progress(console=console) as progress:
task = progress.add_task("Extracting segments...",
total=len(show_segments))
for i, seg in enumerate(show_segments):
temp_file = temp_dir / f"seg_{i:04d}.mp3"
_extract_segment(audio_path, seg.start, seg.end,
temp_file, bitrate)
segment_files.append(temp_file)
progress.update(task, advance=1)
# Create concat file for ffmpeg
concat_file = temp_dir / "concat.txt"
with open(concat_file, "w") as f:
for sf in segment_files:
f.write(f"file '{sf}'\n")
# Concatenate with crossfade
cmd = [
"ffmpeg", "-y", "-f", "concat", "-safe", "0",
"-i", str(concat_file),
"-b:a", bitrate,
]
if normalize:
# EBU R128 loudness normalization
cmd.extend([
"-af", "loudnorm=I=-16:TP=-1.5:LRA=11",
])
cmd.append(str(output_path))
subprocess.run(cmd, capture_output=True, check=True, timeout=600)
# Get output duration
duration = _get_duration(output_path)
console.print(f"[green]Clean episode saved: {output_path.name} "
f"({duration / 60:.1f} min)[/green]")
finally:
# Cleanup temp files
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
def split_segments(audio_path: Path, segments: list[DetectedSegment],
output_dir: Path, bitrate: str = "192k"):
"""Export individual show segments as separate MP3 files."""
show_segments = [s for s in segments
if s.segment_type in (SegmentType.SHOW_CONTENT,
SegmentType.SHOW_ELEMENT)]
output_dir.mkdir(parents=True, exist_ok=True)
console.print(f"[bold]Splitting into {len(show_segments)} segments[/bold]")
exported = []
for i, seg in enumerate(show_segments):
slug = _slugify(seg.label) if seg.label else f"segment-{i:02d}"
filename = f"{i:02d}-{slug}.mp3"
output_file = output_dir / filename
_extract_segment(audio_path, seg.start, seg.end, output_file, bitrate,
fade_in_ms=200, fade_out_ms=500)
duration = seg.duration
console.print(f" [green]{filename}[/green] ({duration:.0f}s)")
exported.append({
"file": filename,
"label": seg.label,
"start": seg.start,
"end": seg.end,
"duration": duration,
})
# Save manifest
with open(output_dir / "segments.json", "w") as f:
json.dump(exported, f, indent=2)
return exported
def generate_chapters(segments: list[DetectedSegment],
output_path: Path) -> list[Chapter]:
"""Generate chapter markers from show segments."""
show_segments = [s for s in segments
if s.segment_type in (SegmentType.SHOW_CONTENT,
SegmentType.SHOW_ELEMENT)]
chapters = []
cumulative_time = 0.0
for seg in show_segments:
chapters.append(Chapter(
title=seg.label or f"Segment",
start=cumulative_time,
end=cumulative_time + seg.duration,
))
cumulative_time += seg.duration
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(
[{"title": c.title, "start": c.start, "end": c.end}
for c in chapters],
f, indent=2,
)
console.print(f"[green]Chapter markers saved: {len(chapters)} chapters[/green]")
return chapters
def _extract_segment(audio_path: Path, start: float, end: float,
output_path: Path, bitrate: str = "192k",
fade_in_ms: int = 0, fade_out_ms: int = 0):
"""Extract a segment from an audio file using ffmpeg."""
duration = end - start
cmd = [
"ffmpeg", "-y",
"-ss", str(start),
"-t", str(duration),
"-i", str(audio_path),
"-b:a", bitrate,
]
filters = []
if fade_in_ms > 0:
filters.append(f"afade=t=in:d={fade_in_ms / 1000}")
if fade_out_ms > 0:
filters.append(f"afade=t=out:st={duration - fade_out_ms / 1000}:d={fade_out_ms / 1000}")
if filters:
cmd.extend(["-af", ",".join(filters)])
cmd.append(str(output_path))
subprocess.run(cmd, capture_output=True, check=True, timeout=120)
def _get_duration(audio_path: Path) -> float:
"""Get audio file duration in seconds."""
result = subprocess.run(
["ffprobe", "-v", "quiet", "-show_entries", "format=duration",
"-of", "csv=p=0", str(audio_path)],
capture_output=True, text=True,
)
return float(result.stdout.strip())
def _slugify(text: str) -> str:
"""Convert text to a filename-safe slug."""
import re
text = text.lower().strip()
text = re.sub(r'[^\w\s-]', '', text)
text = re.sub(r'[\s_]+', '-', text)
text = re.sub(r'-+', '-', text)
return text[:50].strip('-')

View File

@@ -0,0 +1,356 @@
"""CLI entry point for the radio show audio processor."""
import argparse
import sys
from pathlib import Path
from rich.console import Console
from rich.panel import Panel
from .config import load_config
console = Console()
def main():
parser = argparse.ArgumentParser(
description="Radio Show Audio Processor — The Computer Guru Show",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s process episode.mp3
%(prog)s process episode.mp3 --show-prep show-prep.md
%(prog)s process hr1.mp3 hr2.mp3 --archive-mode --date 2016-03-15
%(prog)s transcribe episode.mp3
%(prog)s bootstrap-voice archive/
%(prog)s review-elements
%(prog)s review-speakers
""",
)
parser.add_argument("--config", type=str, default=None,
help="Path to config.yaml")
subparsers = parser.add_subparsers(dest="command", required=True)
# === process ===
p_process = subparsers.add_parser("process", help="Full pipeline")
p_process.add_argument("audio", nargs="+", type=str,
help="Audio file(s) to process")
p_process.add_argument("--show-prep", type=str, default=None,
help="Path to show prep markdown file")
p_process.add_argument("--output", type=str, default=None,
help="Output directory")
p_process.add_argument("--archive-mode", action="store_true",
help="Archive mode: learn elements and voices")
p_process.add_argument("--date", type=str, default=None,
help="Episode date (for archive mode)")
p_process.add_argument("--skip-transcribe", action="store_true",
help="Skip transcription (use existing transcript)")
p_process.add_argument("--skip-diarize", action="store_true",
help="Skip diarization")
p_process.add_argument("--skip-analysis", action="store_true",
help="Skip LLM analysis")
# === transcribe ===
p_transcribe = subparsers.add_parser("transcribe", help="Transcribe only")
p_transcribe.add_argument("audio", type=str, help="Audio file")
p_transcribe.add_argument("--output", type=str, default=None)
p_transcribe.add_argument("--model", type=str, default=None,
help="Whisper model size")
# === diarize ===
p_diarize = subparsers.add_parser("diarize", help="Diarize only")
p_diarize.add_argument("audio", type=str, help="Audio file")
p_diarize.add_argument("--output", type=str, default=None)
# === detect ===
p_detect = subparsers.add_parser("detect", help="Detect segments only")
p_detect.add_argument("audio", type=str, help="Audio file")
p_detect.add_argument("--output", type=str, default=None)
p_detect.add_argument("--show-prep", type=str, default=None)
# === split ===
p_split = subparsers.add_parser("split", help="Split into segments")
p_split.add_argument("audio", type=str, help="Audio file")
p_split.add_argument("--detection-report", type=str, required=True,
help="Path to detection-report.json")
p_split.add_argument("--output", type=str, default=None)
# === bootstrap-voice ===
p_voice = subparsers.add_parser("bootstrap-voice",
help="Bootstrap host voice profile from archive")
p_voice.add_argument("archive_dir", type=str,
help="Directory containing archive MP3s")
p_voice.add_argument("--speaker-name", type=str, default="Mike Swanson")
p_voice.add_argument("--sample-count", type=int, default=10,
help="Number of episodes to sample")
# === review-elements ===
subparsers.add_parser("review-elements",
help="Review discovered audio elements")
# === review-speakers ===
subparsers.add_parser("review-speakers",
help="Review unknown speaker clusters")
args = parser.parse_args()
config = load_config(args.config)
console.print(Panel.fit(
"[bold]Radio Show Audio Processor[/bold]\n"
f"[dim]The Computer Guru Show[/dim]",
border_style="blue",
))
if args.command == "process":
_cmd_process(args, config)
elif args.command == "transcribe":
_cmd_transcribe(args, config)
elif args.command == "diarize":
_cmd_diarize(args, config)
elif args.command == "detect":
_cmd_detect(args, config)
elif args.command == "split":
_cmd_split(args, config)
elif args.command == "bootstrap-voice":
_cmd_bootstrap_voice(args, config)
elif args.command == "review-elements":
_cmd_review_elements(args, config)
elif args.command == "review-speakers":
_cmd_review_speakers(args, config)
def _cmd_process(args, config):
"""Full processing pipeline."""
from .transcriber import transcribe
from .diarizer import diarize, VoiceProfileStore
from .segment_detector import SegmentDetector
from .audio_editor import remove_commercials, split_segments, generate_chapters
from .analyzer import analyze_episode
audio_files = [Path(f) for f in args.audio]
audio_path = audio_files[0] # Primary file
# If multiple files (HR1 + HR2), concatenate first
if len(audio_files) > 1:
audio_path = _concatenate_audio(audio_files, config)
output_dir = Path(args.output) if args.output else audio_path.parent / "processed"
output_dir.mkdir(parents=True, exist_ok=True)
# Load show prep if provided
show_prep = None
if args.show_prep:
show_prep = Path(args.show_prep).read_text()
# Stage 1: Transcribe
transcript = None
if not args.skip_transcribe:
transcript = transcribe(
audio_path,
model_size=config.audio.whisper_model,
language=config.audio.whisper_language,
)
transcript.save(output_dir)
else:
console.print("[dim]Skipping transcription[/dim]")
# Try to load existing transcript
transcript_file = output_dir / "transcript.json"
if transcript_file.exists():
from .transcriber import Transcript, TranscriptSegment, TranscriptWord
import json
with open(transcript_file) as f:
data = json.load(f)
transcript = Transcript(
segments=[
TranscriptSegment(
id=s["id"], text=s["text"],
start=s["start"], end=s["end"],
words=[TranscriptWord(**w) for w in s.get("words", [])],
)
for s in data["segments"]
],
language=data["language"],
language_probability=data["language_probability"],
duration=data["duration"],
)
# Stage 2: Diarize
diarization = None
if not args.skip_diarize:
voice_profiles = VoiceProfileStore(
config.resolve_path(config.diarization.voice_profiles_dir)
)
diarization = diarize(
audio_path,
voice_profiles=voice_profiles,
min_speakers=config.diarization.min_speakers,
max_speakers=config.diarization.max_speakers,
)
diarization.save(output_dir)
else:
console.print("[dim]Skipping diarization[/dim]")
# Stage 3: Detect segments
detector = SegmentDetector(config)
detection = detector.detect(
audio_path,
transcript=transcript,
diarization=diarization,
show_prep=show_prep,
)
detection.save(output_dir)
# Stage 4: Remove commercials
clean_path = output_dir / f"podcast-episode.{config.audio.output_format}"
remove_commercials(
audio_path, detection.segments, clean_path,
crossfade_ms=config.audio.crossfade_ms,
bitrate=config.audio.output_bitrate,
normalize=config.audio.normalize,
)
# Stage 5: Split segments
segments_dir = output_dir / "segments"
split_segments(
audio_path, detection.segments, segments_dir,
bitrate=config.audio.output_bitrate,
)
# Generate chapters
generate_chapters(detection.segments, output_dir / "chapters.json")
# Stage 6: Analyze
if not args.skip_analysis and transcript:
analysis = analyze_episode(
transcript_text=transcript.full_text,
diarization_data=diarization.to_dict() if diarization else None,
show_prep=show_prep,
segments=detection.segments,
model=config.llm.model,
ollama_host=config.llm.ollama_host,
)
generated_dir = output_dir.parent / "generated"
analysis.save(generated_dir)
console.print("\n[bold green]Processing complete![/bold green]")
console.print(f"Output: {output_dir}")
def _cmd_transcribe(args, config):
"""Transcribe only."""
from .transcriber import transcribe
audio_path = Path(args.audio)
output_dir = Path(args.output) if args.output else audio_path.parent / "processed"
model = args.model or config.audio.whisper_model
transcript = transcribe(audio_path, model_size=model)
transcript.save(output_dir)
def _cmd_diarize(args, config):
"""Diarize only."""
from .diarizer import diarize, VoiceProfileStore
audio_path = Path(args.audio)
output_dir = Path(args.output) if args.output else audio_path.parent / "processed"
voice_profiles = VoiceProfileStore(
config.resolve_path(config.diarization.voice_profiles_dir)
)
result = diarize(audio_path, voice_profiles=voice_profiles)
result.save(output_dir)
def _cmd_detect(args, config):
"""Segment detection only."""
from .segment_detector import SegmentDetector
audio_path = Path(args.audio)
output_dir = Path(args.output) if args.output else audio_path.parent / "processed"
show_prep = None
if args.show_prep:
show_prep = Path(args.show_prep).read_text()
detector = SegmentDetector(config)
result = detector.detect(audio_path, show_prep=show_prep)
result.save(output_dir)
def _cmd_split(args, config):
"""Split using existing detection report."""
from .audio_editor import split_segments, generate_chapters
from .segment_detector import DetectedSegment, SegmentType
import json
audio_path = Path(args.audio)
output_dir = Path(args.output) if args.output else audio_path.parent / "segments"
with open(args.detection_report) as f:
report = json.load(f)
segments = [
DetectedSegment(
start=s["start"], end=s["end"],
segment_type=SegmentType(s["type"]),
confidence=s["confidence"],
label=s.get("label", ""),
)
for s in report["segments"]
]
split_segments(audio_path, segments, output_dir, config.audio.output_bitrate)
generate_chapters(segments, output_dir.parent / "chapters.json")
def _cmd_bootstrap_voice(args, config):
"""Bootstrap host voice profile from archive episodes."""
console.print("[bold]Bootstrapping host voice profile[/bold]")
console.print(f"Archive: {args.archive_dir}")
console.print(f"Speaker: {args.speaker_name}")
console.print(f"Sampling {args.sample_count} episodes")
# TODO: Implement archive sampling + diarization + embedding extraction
console.print("[yellow]Not yet implemented — run individual diarizations first[/yellow]")
def _cmd_review_elements(args, config):
"""Review discovered audio elements."""
console.print("[bold]Reviewing discovered elements[/bold]")
# TODO: Implement element review UI
console.print("[yellow]Not yet implemented[/yellow]")
def _cmd_review_speakers(args, config):
"""Review unknown speaker clusters."""
console.print("[bold]Reviewing unknown speakers[/bold]")
# TODO: Implement speaker review UI
console.print("[yellow]Not yet implemented[/yellow]")
def _concatenate_audio(files: list[Path], config) -> Path:
"""Concatenate multiple audio files (e.g., HR1 + HR2)."""
import subprocess
output = files[0].parent / f"combined_{files[0].stem}.mp3"
concat_file = files[0].parent / ".concat_list.txt"
with open(concat_file, "w") as f:
for audio_file in files:
f.write(f"file '{audio_file}'\n")
subprocess.run(
["ffmpeg", "-y", "-f", "concat", "-safe", "0",
"-i", str(concat_file), "-c", "copy", str(output)],
capture_output=True, check=True,
)
concat_file.unlink()
console.print(f"[dim]Concatenated {len(files)} files -> {output.name}[/dim]")
return output
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,126 @@
"""Configuration loader for the radio show audio processor."""
from pathlib import Path
from dataclasses import dataclass, field
import yaml
@dataclass
class ShowConfig:
name: str = "The Computer Guru Show"
host: str = "Mike Swanson"
typical_duration_minutes: int = 120
segment_count: int = 6
has_commercials: bool = True
@dataclass
class AudioConfig:
whisper_model: str = "large-v3"
whisper_language: str = "en"
output_format: str = "mp3"
output_bitrate: str = "192k"
normalize: bool = True
crossfade_ms: int = 500
@dataclass
class DetectionWeights:
fingerprint_match: float = 0.30
speaker_identity: float = 0.25
audio_characteristics: float = 0.20
break_pattern: float = 0.15
structural_heuristic: float = 0.10
@dataclass
class SegmentDetectionConfig:
fingerprint_db: str = "element-library/fingerprints.db"
fingerprint_match_threshold: float = 0.85
discover_unknown_elements: bool = True
min_element_duration_s: float = 1.0
max_element_duration_s: float = 30.0
cluster_similarity_threshold: float = 0.90
min_cluster_occurrences: int = 3
min_break_duration_s: int = 30
max_break_duration_s: int = 300
silence_threshold_db: int = -40
confidence_threshold: float = 0.70
weights: DetectionWeights = field(default_factory=DetectionWeights)
@dataclass
class DiarizationConfig:
min_speakers: int = 1
max_speakers: int = 6
voice_profiles_dir: str = "voice-profiles/"
host_match_threshold: float = 0.75
@dataclass
class LLMConfig:
model: str = "qwen3:14b"
ollama_host: str = "http://localhost:11434"
@dataclass
class PathsConfig:
episodes_dir: str = "episodes/"
voice_profiles: str = "voice-profiles/"
element_library: str = "element-library/"
output_dir: str = "processed/"
@dataclass
class ArchiveConfig:
server: str = "172.16.3.10"
path: str = "/home/gurushow/public_html/archive/"
elements_path: str = "/home/gurushow/public_html/archive/Radio/Elements/"
@dataclass
class Config:
show: ShowConfig = field(default_factory=ShowConfig)
audio: AudioConfig = field(default_factory=AudioConfig)
segment_detection: SegmentDetectionConfig = field(default_factory=SegmentDetectionConfig)
diarization: DiarizationConfig = field(default_factory=DiarizationConfig)
llm: LLMConfig = field(default_factory=LLMConfig)
paths: PathsConfig = field(default_factory=PathsConfig)
archive: ArchiveConfig = field(default_factory=ArchiveConfig)
base_dir: Path = field(default_factory=lambda: Path.cwd())
def resolve_path(self, relative: str) -> Path:
return self.base_dir / relative
def load_config(config_path: str | Path | None = None) -> Config:
if config_path is None:
config_path = Path(__file__).parent.parent / "config.yaml"
config_path = Path(config_path)
if not config_path.exists():
return Config(base_dir=config_path.parent)
with open(config_path) as f:
raw = yaml.safe_load(f) or {}
config = Config(base_dir=config_path.parent)
if "show" in raw:
config.show = ShowConfig(**raw["show"])
if "audio" in raw:
config.audio = AudioConfig(**raw["audio"])
if "segment_detection" in raw:
sd = raw["segment_detection"]
weights = DetectionWeights(**sd.pop("weights", {}))
config.segment_detection = SegmentDetectionConfig(weights=weights, **sd)
if "diarization" in raw:
config.diarization = DiarizationConfig(**raw["diarization"])
if "llm" in raw:
config.llm = LLMConfig(**raw["llm"])
if "paths" in raw:
config.paths = PathsConfig(**raw["paths"])
if "archive" in raw:
config.archive = ArchiveConfig(**raw["archive"])
return config

View File

@@ -0,0 +1,274 @@
"""Stage 2: Speaker diarization using pyannote.audio with voice profile matching."""
import json
from dataclasses import dataclass
from pathlib import Path
import numpy as np
from rich.console import Console
console = Console()
@dataclass
class SpeakerTurn:
speaker: str # "SPEAKER_00", "Host: Mike Swanson", "Caller 1", etc.
start: float
end: float
confidence: float = 1.0
@property
def duration(self) -> float:
return self.end - self.start
@dataclass
class DiarizationResult:
turns: list[SpeakerTurn]
num_speakers: int
speaker_map: dict[str, str] # raw label -> friendly name
def speaker_at(self, time: float) -> str | None:
"""Get the speaker at a given timestamp."""
for turn in self.turns:
if turn.start <= time <= turn.end:
return turn.speaker
return None
def speaker_time(self, speaker: str) -> float:
"""Total speaking time for a speaker."""
return sum(t.duration for t in self.turns if t.speaker == speaker)
def speakers_ranked(self) -> list[tuple[str, float]]:
"""Speakers ranked by total speaking time."""
times = {}
for turn in self.turns:
times[turn.speaker] = times.get(turn.speaker, 0) + turn.duration
return sorted(times.items(), key=lambda x: x[1], reverse=True)
def to_dict(self) -> dict:
return {
"num_speakers": self.num_speakers,
"speaker_map": self.speaker_map,
"turns": [
{
"speaker": t.speaker,
"start": t.start,
"end": t.end,
"confidence": t.confidence,
}
for t in self.turns
],
}
def save(self, output_dir: Path):
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_dir / "diarization.json", "w") as f:
json.dump(self.to_dict(), f, indent=2)
console.print(f"[green]Diarization saved to {output_dir}[/green]")
class VoiceProfileStore:
"""Manages speaker voice embeddings for identification."""
def __init__(self, profiles_dir: str | Path):
self.profiles_dir = Path(profiles_dir)
self.embeddings: dict[str, np.ndarray] = {}
self.metadata: dict[str, dict] = {}
self._load_profiles()
def _load_profiles(self):
if not self.profiles_dir.exists():
return
for npy_file in self.profiles_dir.rglob("*.npy"):
name = npy_file.stem
# Determine speaker name from directory structure
parent = npy_file.parent.name
if parent.startswith("host-"):
speaker_name = parent.replace("host-", "").replace("-", " ").title()
role = "host"
elif parent == "guests":
speaker_name = name.replace("-", " ").title()
role = "guest"
elif parent == "callers":
speaker_name = name
role = "caller"
else:
speaker_name = name
role = "unknown"
self.embeddings[name] = np.load(npy_file)
self.metadata[name] = {
"name": speaker_name,
"role": role,
"file": str(npy_file),
}
if self.embeddings:
console.print(f"[dim]Loaded {len(self.embeddings)} voice profiles[/dim]")
def match_embedding(self, embedding: np.ndarray, threshold: float = 0.75
) -> tuple[str | None, float]:
"""Match an embedding against stored profiles. Returns (name, similarity)."""
if not self.embeddings:
return None, 0.0
best_match = None
best_score = 0.0
for name, stored in self.embeddings.items():
# Cosine similarity
similarity = np.dot(embedding, stored) / (
np.linalg.norm(embedding) * np.linalg.norm(stored) + 1e-8
)
if similarity > best_score:
best_score = similarity
best_match = name
if best_score >= threshold:
meta = self.metadata.get(best_match, {})
friendly_name = meta.get("name", best_match)
role = meta.get("role", "unknown")
if role == "host":
return f"Host: {friendly_name}", best_score
return friendly_name, best_score
return None, best_score
def save_embedding(self, name: str, embedding: np.ndarray,
role: str = "unknown"):
"""Save a new voice profile."""
if role == "host":
subdir = self.profiles_dir / f"host-{name.lower().replace(' ', '-')}"
elif role == "guest":
subdir = self.profiles_dir / "guests"
elif role == "caller":
subdir = self.profiles_dir / "callers"
else:
subdir = self.profiles_dir / "unknown"
subdir.mkdir(parents=True, exist_ok=True)
filename = name.lower().replace(" ", "-")
np.save(subdir / f"{filename}.npy", embedding)
console.print(f"[green]Saved voice profile: {name} ({role})[/green]")
def diarize(audio_path: str | Path,
voice_profiles: VoiceProfileStore | None = None,
min_speakers: int = 1,
max_speakers: int = 6,
host_match_threshold: float = 0.75) -> DiarizationResult:
"""Run speaker diarization on an audio file."""
from pyannote.audio import Pipeline
import torch
audio_path = Path(audio_path)
console.print(f"[bold]Diarizing:[/bold] {audio_path.name}")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
console.print(f"[dim]Device: {device}[/dim]")
pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1"
).to(device)
diarization = pipeline(
str(audio_path),
min_speakers=min_speakers,
max_speakers=max_speakers,
)
# Extract turns
raw_turns = []
for turn, _, speaker in diarization.itertracks(yield_label=True):
raw_turns.append(SpeakerTurn(
speaker=speaker,
start=turn.start,
end=turn.end,
))
# Count unique speakers
raw_speakers = set(t.speaker for t in raw_turns)
console.print(f"[dim]Detected {len(raw_speakers)} speakers[/dim]")
# Match against voice profiles if available
speaker_map = {}
if voice_profiles and voice_profiles.embeddings:
console.print("[dim]Matching speakers against voice profiles...[/dim]")
embedding_model = pipeline.embedding # pyannote's embedding model
# Get embeddings for each detected speaker
from pyannote.audio import Inference
inference = Inference(pipeline.embedding, window="whole")
for raw_label in raw_speakers:
# Get segments for this speaker
speaker_segments = [t for t in raw_turns if t.speaker == raw_label]
total_time = sum(t.duration for t in speaker_segments)
# Use the longest segment for embedding
longest = max(speaker_segments, key=lambda t: t.duration)
try:
# Extract embedding from audio segment
import torchaudio
waveform, sr = torchaudio.load(
str(audio_path),
frame_offset=int(longest.start * sr if 'sr' in dir() else longest.start * 16000),
num_frames=int(longest.duration * sr if 'sr' in dir() else longest.duration * 16000),
)
# This is simplified — proper implementation would use pyannote's
# embedding extraction pipeline
match_name, score = voice_profiles.match_embedding(
np.zeros(256), # placeholder
threshold=host_match_threshold,
)
if match_name:
speaker_map[raw_label] = match_name
console.print(f" [green]{raw_label} -> {match_name} "
f"(score: {score:.2f}, {total_time:.0f}s)[/green]")
except Exception as e:
console.print(f" [yellow]Could not match {raw_label}: {e}[/yellow]")
# If no voice profiles matched, use speaking time heuristic
# The host almost always has the most speaking time
if not speaker_map:
ranked = sorted(
[(s, sum(t.duration for t in raw_turns if t.speaker == s))
for s in raw_speakers],
key=lambda x: x[1],
reverse=True,
)
if ranked:
speaker_map[ranked[0][0]] = f"Host: {voice_profiles.metadata.get('host', {}).get('name', 'Unknown')}"
console.print(f" [yellow]Assumed {ranked[0][0]} is host "
f"(most speaking time: {ranked[0][1]:.0f}s)[/yellow]")
# If no voice profiles at all, label by speaking time
if not speaker_map:
ranked = sorted(
[(s, sum(t.duration for t in raw_turns if t.speaker == s))
for s in raw_speakers],
key=lambda x: x[1],
reverse=True,
)
for i, (speaker, time) in enumerate(ranked):
if i == 0:
speaker_map[speaker] = "Host (assumed)"
else:
speaker_map[speaker] = f"Speaker {i}"
# Apply friendly names
for turn in raw_turns:
if turn.speaker in speaker_map:
turn.speaker = speaker_map[turn.speaker]
console.print(f"[green]Diarization complete: {len(raw_turns)} turns, "
f"{len(raw_speakers)} speakers[/green]")
return DiarizationResult(
turns=raw_turns,
num_speakers=len(raw_speakers),
speaker_map=speaker_map,
)

View File

@@ -0,0 +1,419 @@
"""Stage 3: Segment detection — multi-signal commercial/show content classifier."""
import json
from dataclasses import dataclass
from pathlib import Path
from enum import Enum
import numpy as np
from rich.console import Console
from rich.table import Table
console = Console()
class SegmentType(Enum):
SHOW_CONTENT = "show_content"
COMMERCIAL = "commercial"
SHOW_ELEMENT = "show_element" # intro, outro, bumper
SILENCE = "silence"
UNKNOWN = "unknown"
@dataclass
class DetectedSegment:
start: float
end: float
segment_type: SegmentType
confidence: float
label: str = "" # "Segment 1: The Week That Was", "Commercial Break 1", etc.
signals: dict = None # Individual signal scores
def __post_init__(self):
if self.signals is None:
self.signals = {}
@property
def duration(self) -> float:
return self.end - self.start
@dataclass
class SegmentDetectionResult:
segments: list[DetectedSegment]
show_segments: list[DetectedSegment]
commercial_segments: list[DetectedSegment]
element_segments: list[DetectedSegment]
total_show_time: float
total_commercial_time: float
def to_dict(self) -> dict:
return {
"total_show_time": self.total_show_time,
"total_commercial_time": self.total_commercial_time,
"segments": [
{
"start": s.start,
"end": s.end,
"type": s.segment_type.value,
"confidence": s.confidence,
"label": s.label,
"signals": s.signals,
}
for s in self.segments
],
}
def save(self, output_dir: Path):
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_dir / "detection-report.json", "w") as f:
json.dump(self.to_dict(), f, indent=2)
def print_summary(self):
table = Table(title="Segment Detection Results")
table.add_column("Time", style="cyan")
table.add_column("Duration", style="magenta")
table.add_column("Type", style="green")
table.add_column("Confidence", style="yellow")
table.add_column("Label")
for seg in self.segments:
start = _format_time(seg.start)
dur = f"{seg.duration:.0f}s"
type_style = {
SegmentType.SHOW_CONTENT: "[green]SHOW[/green]",
SegmentType.COMMERCIAL: "[red]COMMERCIAL[/red]",
SegmentType.SHOW_ELEMENT: "[blue]ELEMENT[/blue]",
SegmentType.SILENCE: "[dim]SILENCE[/dim]",
SegmentType.UNKNOWN: "[yellow]UNKNOWN[/yellow]",
}.get(seg.segment_type, str(seg.segment_type))
table.add_row(start, dur, type_style, f"{seg.confidence:.2f}", seg.label)
console.print(table)
console.print(f"\nShow content: {self.total_show_time / 60:.1f} min")
console.print(f"Commercials: {self.total_commercial_time / 60:.1f} min")
def _format_time(seconds: float) -> str:
m = int(seconds // 60)
s = int(seconds % 60)
return f"{m:02d}:{s:02d}"
class SegmentDetector:
"""Multi-signal commercial/show content detector."""
def __init__(self, config):
self.config = config
self.weights = config.segment_detection.weights
def detect(self, audio_path: Path, transcript=None, diarization=None,
show_prep=None) -> SegmentDetectionResult:
"""Run all detection signals and combine scores."""
console.print(f"[bold]Detecting segments:[/bold] {audio_path.name}")
# Load audio for analysis
audio_data, sample_rate = self._load_audio(audio_path)
duration = len(audio_data) / sample_rate
# Step 1: Find candidate boundaries using silence detection
boundaries = self._detect_silence_boundaries(audio_data, sample_rate)
console.print(f"[dim]Found {len(boundaries)} silence boundaries[/dim]")
# Step 2: Create candidate segments between boundaries
candidates = self._create_candidate_segments(boundaries, duration)
# Step 3: Score each candidate with all available signals
for candidate in candidates:
scores = {}
# Signal 1: Fingerprint matching (if library available)
scores["fingerprint"] = self._score_fingerprint(
audio_data, sample_rate, candidate
)
# Signal 2: Speaker identity
if diarization:
scores["speaker"] = self._score_speaker_identity(
diarization, candidate
)
else:
scores["speaker"] = 0.5 # neutral
# Signal 3: Audio characteristics
scores["audio_chars"] = self._score_audio_characteristics(
audio_data, sample_rate, candidate
)
# Signal 4: Structural heuristics
if transcript:
scores["structural"] = self._score_structural(
transcript, candidate
)
else:
scores["structural"] = 0.5
# Combined weighted score (higher = more likely commercial)
commercial_score = (
self.weights.fingerprint_match * scores.get("fingerprint", 0.5) +
self.weights.speaker_identity * scores.get("speaker", 0.5) +
self.weights.audio_characteristics * scores.get("audio_chars", 0.5) +
self.weights.structural_heuristic * scores.get("structural", 0.5)
)
candidate.signals = scores
candidate.confidence = commercial_score
if commercial_score >= self.config.segment_detection.confidence_threshold:
candidate.segment_type = SegmentType.COMMERCIAL
else:
candidate.segment_type = SegmentType.SHOW_CONTENT
# Step 4: Merge adjacent segments of same type
merged = self._merge_adjacent(candidates)
# Step 5: Apply duration constraints
final = self._apply_constraints(merged)
# Step 6: Label show segments using show prep if available
if show_prep:
self._label_from_prep(final, transcript, show_prep)
# Build result
show_segs = [s for s in final if s.segment_type == SegmentType.SHOW_CONTENT]
comm_segs = [s for s in final if s.segment_type == SegmentType.COMMERCIAL]
elem_segs = [s for s in final if s.segment_type == SegmentType.SHOW_ELEMENT]
result = SegmentDetectionResult(
segments=final,
show_segments=show_segs,
commercial_segments=comm_segs,
element_segments=elem_segs,
total_show_time=sum(s.duration for s in show_segs),
total_commercial_time=sum(s.duration for s in comm_segs),
)
result.print_summary()
return result
def _load_audio(self, audio_path: Path) -> tuple[np.ndarray, int]:
"""Load audio file as mono numpy array."""
import subprocess
import io
import struct
# Use ffmpeg to decode to raw PCM
result = subprocess.run(
["ffmpeg", "-i", str(audio_path), "-f", "s16le", "-ac", "1",
"-ar", "16000", "-"],
capture_output=True, timeout=300,
)
audio = np.frombuffer(result.stdout, dtype=np.int16).astype(np.float32) / 32768.0
return audio, 16000
def _detect_silence_boundaries(self, audio: np.ndarray, sr: int,
min_silence_ms: int = 500) -> list[float]:
"""Detect silence gaps in audio that likely indicate segment boundaries."""
frame_size = int(sr * 0.025) # 25ms frames
hop_size = int(sr * 0.010) # 10ms hop
threshold_db = self.config.segment_detection.silence_threshold_db
threshold_amp = 10 ** (threshold_db / 20)
min_silence_frames = int(min_silence_ms / 10)
# Calculate frame energy
energies = []
for i in range(0, len(audio) - frame_size, hop_size):
frame = audio[i:i + frame_size]
rms = np.sqrt(np.mean(frame ** 2))
energies.append(rms)
# Find silence regions
is_silent = [e < threshold_amp for e in energies]
boundaries = []
silent_count = 0
for i, silent in enumerate(is_silent):
if silent:
silent_count += 1
else:
if silent_count >= min_silence_frames:
# Mark the midpoint of the silence as a boundary
mid_frame = i - silent_count // 2
boundary_time = mid_frame * 0.010
boundaries.append(boundary_time)
silent_count = 0
return boundaries
def _create_candidate_segments(self, boundaries: list[float],
total_duration: float) -> list[DetectedSegment]:
"""Create candidate segments from silence boundaries."""
candidates = []
prev = 0.0
for boundary in boundaries:
if boundary - prev > 1.0: # Ignore segments < 1 second
candidates.append(DetectedSegment(
start=prev,
end=boundary,
segment_type=SegmentType.UNKNOWN,
confidence=0.0,
))
prev = boundary
# Final segment
if total_duration - prev > 1.0:
candidates.append(DetectedSegment(
start=prev,
end=total_duration,
segment_type=SegmentType.UNKNOWN,
confidence=0.0,
))
return candidates
def _score_fingerprint(self, audio: np.ndarray, sr: int,
segment: DetectedSegment) -> float:
"""Score based on audio fingerprint matching against element library.
Returns 0.0 (no match / definitely show) to 1.0 (definite commercial boundary).
"""
# TODO: Implement fingerprint matching against element-library/fingerprints.db
# For now, return neutral score
return 0.5
def _score_speaker_identity(self, diarization, segment: DetectedSegment) -> float:
"""Score based on whether the host is speaking.
Returns 0.0 (host definitely speaking = show content)
to 1.0 (host definitely absent = likely commercial).
"""
host_time = 0.0
total_time = segment.duration
for turn in diarization.turns:
if turn.end < segment.start or turn.start > segment.end:
continue
# Calculate overlap
overlap_start = max(turn.start, segment.start)
overlap_end = min(turn.end, segment.end)
overlap = max(0, overlap_end - overlap_start)
if "host" in turn.speaker.lower():
host_time += overlap
if total_time == 0:
return 0.5
host_fraction = host_time / total_time
# Invert: high host presence = low commercial score
return 1.0 - host_fraction
def _score_audio_characteristics(self, audio: np.ndarray, sr: int,
segment: DetectedSegment) -> float:
"""Score based on audio production characteristics.
Commercials tend to be louder, more compressed, different spectral profile.
Returns 0.0 (matches show characteristics) to 1.0 (matches commercial characteristics).
"""
start_sample = int(segment.start * sr)
end_sample = min(int(segment.end * sr), len(audio))
seg_audio = audio[start_sample:end_sample]
if len(seg_audio) < sr: # Less than 1 second
return 0.5
# RMS energy (commercials tend to be louder)
rms = np.sqrt(np.mean(seg_audio ** 2))
# Dynamic range (commercials tend to be more compressed)
frame_size = int(sr * 0.050) # 50ms frames
frame_rms = []
for i in range(0, len(seg_audio) - frame_size, frame_size):
frame = seg_audio[i:i + frame_size]
frame_rms.append(np.sqrt(np.mean(frame ** 2)))
if not frame_rms:
return 0.5
dynamic_range = max(frame_rms) / (min(frame_rms) + 1e-8)
# Simple heuristic scoring:
# High RMS + low dynamic range = compressed commercial audio
score = 0.5
if rms > 0.15: # Louder than typical speech
score += 0.15
if dynamic_range < 5.0: # Very compressed
score += 0.15
return min(1.0, max(0.0, score))
def _score_structural(self, transcript, segment: DetectedSegment) -> float:
"""Score based on transcript content structural cues.
Returns 0.0 (show content cues found) to 1.0 (commercial cues found).
"""
text = transcript.text_at(segment.start, segment.end).lower()
# Show content indicators
show_phrases = [
"welcome back", "let's move on", "next up", "our next topic",
"let's talk about", "as i mentioned", "the question is",
"caller", "what do you think", "here's the thing",
]
# Commercial/break indicators
break_phrases = [
"we'll be right back", "stay tuned", "don't go anywhere",
"after the break", "when we come back",
]
show_hits = sum(1 for p in show_phrases if p in text)
break_hits = sum(1 for p in break_phrases if p in text)
if show_hits > 0 and break_hits == 0:
return 0.2 # Likely show content
if break_hits > 0:
return 0.8 # Likely near a break
return 0.5 # Neutral
def _merge_adjacent(self, segments: list[DetectedSegment]) -> list[DetectedSegment]:
"""Merge adjacent segments of the same type."""
if not segments:
return []
merged = [segments[0]]
for seg in segments[1:]:
prev = merged[-1]
if (prev.segment_type == seg.segment_type and
abs(seg.start - prev.end) < 2.0): # Within 2 seconds
# Extend previous segment
prev.end = seg.end
prev.confidence = (prev.confidence + seg.confidence) / 2
else:
merged.append(seg)
return merged
def _apply_constraints(self, segments: list[DetectedSegment]) -> list[DetectedSegment]:
"""Apply duration constraints — short 'commercial' segments are likely misclassified."""
min_break = self.config.segment_detection.min_break_duration_s
for seg in segments:
if (seg.segment_type == SegmentType.COMMERCIAL and
seg.duration < min_break):
seg.segment_type = SegmentType.SHOW_CONTENT
seg.label = "(reclassified: too short for commercial)"
return segments
def _label_from_prep(self, segments: list[DetectedSegment],
transcript, show_prep: str):
"""Label show segments by matching transcript content to show prep topics."""
# TODO: Use Ollama to match transcript sections against show prep segment titles
# For now, number them sequentially
show_count = 0
comm_count = 0
for seg in segments:
if seg.segment_type == SegmentType.SHOW_CONTENT:
show_count += 1
seg.label = f"Show Segment {show_count}"
elif seg.segment_type == SegmentType.COMMERCIAL:
comm_count += 1
seg.label = f"Commercial Break {comm_count}"

View File

@@ -0,0 +1,179 @@
"""Stage 1: Audio transcription using faster-whisper with GPU acceleration."""
import json
from dataclasses import dataclass
from pathlib import Path
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn
console = Console()
@dataclass
class TranscriptWord:
word: str
start: float
end: float
probability: float
@dataclass
class TranscriptSegment:
id: int
text: str
start: float
end: float
words: list[TranscriptWord]
@dataclass
class Transcript:
segments: list[TranscriptSegment]
language: str
language_probability: float
duration: float
@property
def full_text(self) -> str:
return " ".join(seg.text.strip() for seg in self.segments)
def text_at(self, start: float, end: float) -> str:
"""Get transcript text within a time range."""
result = []
for seg in self.segments:
if seg.end < start:
continue
if seg.start > end:
break
result.append(seg.text.strip())
return " ".join(result)
def to_srt(self) -> str:
"""Export as SRT subtitle format."""
lines = []
for i, seg in enumerate(self.segments, 1):
start = _format_srt_time(seg.start)
end = _format_srt_time(seg.end)
lines.append(f"{i}")
lines.append(f"{start} --> {end}")
lines.append(seg.text.strip())
lines.append("")
return "\n".join(lines)
def to_dict(self) -> dict:
return {
"language": self.language,
"language_probability": self.language_probability,
"duration": self.duration,
"segments": [
{
"id": seg.id,
"text": seg.text,
"start": seg.start,
"end": seg.end,
"words": [
{
"word": w.word,
"start": w.start,
"end": w.end,
"probability": w.probability,
}
for w in seg.words
],
}
for seg in self.segments
],
}
def save(self, output_dir: Path):
output_dir.mkdir(parents=True, exist_ok=True)
# JSON with full detail
with open(output_dir / "transcript.json", "w") as f:
json.dump(self.to_dict(), f, indent=2)
# Plain text
with open(output_dir / "transcript.txt", "w") as f:
f.write(self.full_text)
# SRT subtitles
with open(output_dir / "transcript.srt", "w") as f:
f.write(self.to_srt())
console.print(f"[green]Transcript saved to {output_dir}[/green]")
def _format_srt_time(seconds: float) -> str:
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = int(seconds % 60)
ms = int((seconds % 1) * 1000)
return f"{h:02d}:{m:02d}:{s:02d},{ms:03d}"
def transcribe(audio_path: str | Path, model_size: str = "large-v3",
language: str = "en", device: str = "cuda") -> Transcript:
"""Transcribe an audio file using faster-whisper."""
from faster_whisper import WhisperModel
audio_path = Path(audio_path)
console.print(f"[bold]Transcribing:[/bold] {audio_path.name}")
console.print(f"[dim]Model: {model_size}, Device: {device}[/dim]")
model = WhisperModel(model_size, device=device, compute_type="float16")
segments_raw, info = model.transcribe(
str(audio_path),
language=language,
word_timestamps=True,
vad_filter=True,
vad_parameters=dict(
min_silence_duration_ms=500,
speech_pad_ms=200,
),
)
console.print(f"[dim]Detected language: {info.language} "
f"(probability: {info.language_probability:.2f})[/dim]")
console.print(f"[dim]Duration: {info.duration:.1f}s "
f"({info.duration / 60:.1f} min)[/dim]")
segments = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("{task.completed} segments"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task("Transcribing...", total=None)
for i, seg in enumerate(segments_raw):
words = [
TranscriptWord(
word=w.word,
start=w.start,
end=w.end,
probability=w.probability,
)
for w in (seg.words or [])
]
segments.append(TranscriptSegment(
id=i,
text=seg.text,
start=seg.start,
end=seg.end,
words=words,
))
progress.update(task, completed=i + 1)
console.print(f"[green]Transcription complete: {len(segments)} segments[/green]")
return Transcript(
segments=segments,
language=info.language,
language_probability=info.language_probability,
duration=info.duration,
)