"""Stage 4 & 5: Commercial removal and segment splitting using ffmpeg.""" import subprocess import json from dataclasses import dataclass from pathlib import Path from rich.console import Console from rich.progress import Progress from .segment_detector import SegmentType, DetectedSegment console = Console() @dataclass class Chapter: title: str start: float end: float def remove_commercials(audio_path: Path, segments: list[DetectedSegment], output_path: Path, crossfade_ms: int = 500, bitrate: str = "192k", normalize: bool = True): """Stitch show segments together, removing commercials.""" show_segments = [s for s in segments if s.segment_type in (SegmentType.SHOW_CONTENT, SegmentType.SHOW_ELEMENT)] if not show_segments: console.print("[red]No show segments found![/red]") return console.print(f"[bold]Removing commercials:[/bold] {len(segments)} segments " f"-> {len(show_segments)} show segments") output_path.parent.mkdir(parents=True, exist_ok=True) temp_dir = output_path.parent / ".temp_segments" temp_dir.mkdir(exist_ok=True) try: # Extract each show segment segment_files = [] with Progress(console=console) as progress: task = progress.add_task("Extracting segments...", total=len(show_segments)) for i, seg in enumerate(show_segments): temp_file = temp_dir / f"seg_{i:04d}.mp3" _extract_segment(audio_path, seg.start, seg.end, temp_file, bitrate) segment_files.append(temp_file) progress.update(task, advance=1) # Create concat file for ffmpeg concat_file = temp_dir / "concat.txt" with open(concat_file, "w") as f: for sf in segment_files: f.write(f"file '{sf}'\n") # Concatenate with crossfade cmd = [ "ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(concat_file), "-b:a", bitrate, ] if normalize: # EBU R128 loudness normalization cmd.extend([ "-af", "loudnorm=I=-16:TP=-1.5:LRA=11", ]) cmd.append(str(output_path)) subprocess.run(cmd, capture_output=True, check=True, timeout=600) # Get output duration duration = _get_duration(output_path) console.print(f"[green]Clean episode saved: {output_path.name} " f"({duration / 60:.1f} min)[/green]") finally: # Cleanup temp files import shutil shutil.rmtree(temp_dir, ignore_errors=True) def split_segments(audio_path: Path, segments: list[DetectedSegment], output_dir: Path, bitrate: str = "192k"): """Export individual show segments as separate MP3 files.""" show_segments = [s for s in segments if s.segment_type in (SegmentType.SHOW_CONTENT, SegmentType.SHOW_ELEMENT)] output_dir.mkdir(parents=True, exist_ok=True) console.print(f"[bold]Splitting into {len(show_segments)} segments[/bold]") exported = [] for i, seg in enumerate(show_segments): slug = _slugify(seg.label) if seg.label else f"segment-{i:02d}" filename = f"{i:02d}-{slug}.mp3" output_file = output_dir / filename _extract_segment(audio_path, seg.start, seg.end, output_file, bitrate, fade_in_ms=200, fade_out_ms=500) duration = seg.duration console.print(f" [green]{filename}[/green] ({duration:.0f}s)") exported.append({ "file": filename, "label": seg.label, "start": seg.start, "end": seg.end, "duration": duration, }) # Save manifest with open(output_dir / "segments.json", "w") as f: json.dump(exported, f, indent=2) return exported def generate_chapters(segments: list[DetectedSegment], output_path: Path) -> list[Chapter]: """Generate chapter markers from show segments.""" show_segments = [s for s in segments if s.segment_type in (SegmentType.SHOW_CONTENT, SegmentType.SHOW_ELEMENT)] chapters = [] cumulative_time = 0.0 for seg in show_segments: chapters.append(Chapter( title=seg.label or f"Segment", start=cumulative_time, end=cumulative_time + seg.duration, )) cumulative_time += seg.duration output_path.parent.mkdir(parents=True, exist_ok=True) with open(output_path, "w") as f: json.dump( [{"title": c.title, "start": c.start, "end": c.end} for c in chapters], f, indent=2, ) console.print(f"[green]Chapter markers saved: {len(chapters)} chapters[/green]") return chapters def _extract_segment(audio_path: Path, start: float, end: float, output_path: Path, bitrate: str = "192k", fade_in_ms: int = 0, fade_out_ms: int = 0): """Extract a segment from an audio file using ffmpeg.""" duration = end - start cmd = [ "ffmpeg", "-y", "-ss", str(start), "-t", str(duration), "-i", str(audio_path), "-b:a", bitrate, ] filters = [] if fade_in_ms > 0: filters.append(f"afade=t=in:d={fade_in_ms / 1000}") if fade_out_ms > 0: filters.append(f"afade=t=out:st={duration - fade_out_ms / 1000}:d={fade_out_ms / 1000}") if filters: cmd.extend(["-af", ",".join(filters)]) cmd.append(str(output_path)) subprocess.run(cmd, capture_output=True, check=True, timeout=120) def _get_duration(audio_path: Path) -> float: """Get audio file duration in seconds.""" result = subprocess.run( ["ffprobe", "-v", "quiet", "-show_entries", "format=duration", "-of", "csv=p=0", str(audio_path)], capture_output=True, text=True, ) return float(result.stdout.strip()) def _slugify(text: str) -> str: """Convert text to a filename-safe slug.""" import re text = text.lower().strip() text = re.sub(r'[^\w\s-]', '', text) text = re.sub(r'[\s_]+', '-', text) text = re.sub(r'-+', '-', text) return text[:50].strip('-')