Files
claudetools/projects/radio-show/audio-processor/src/segment_detector.py
Mike Swanson 87f5a9306a Audio processor: fix segment detection with transcript-driven breaks
- Add transcript break phrase detection (going_to_break/coming_back cues)
- Create segments from transcript breaks with silence boundary snapping
- Fix segment dedup in merge_adjacent (handle overlapping segments)
- Add CUDA 12 library path fix (gpu.py + venv activate hook)
- Auto-load existing transcript in detect command
- Tested on 2011-03-05 HR1: correctly identifies commercial break at 34:38

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 11:59:54 -07:00

570 lines
21 KiB
Python

"""Stage 3: Segment detection — multi-signal commercial/show content classifier."""
import json
from dataclasses import dataclass
from pathlib import Path
from enum import Enum
import numpy as np
from rich.console import Console
from rich.table import Table
console = Console()
class SegmentType(Enum):
SHOW_CONTENT = "show_content"
COMMERCIAL = "commercial"
SHOW_ELEMENT = "show_element" # intro, outro, bumper
SILENCE = "silence"
UNKNOWN = "unknown"
@dataclass
class DetectedSegment:
start: float
end: float
segment_type: SegmentType
confidence: float
label: str = "" # "Segment 1: The Week That Was", "Commercial Break 1", etc.
signals: dict = None # Individual signal scores
def __post_init__(self):
if self.signals is None:
self.signals = {}
@property
def duration(self) -> float:
return self.end - self.start
@dataclass
class SegmentDetectionResult:
segments: list[DetectedSegment]
show_segments: list[DetectedSegment]
commercial_segments: list[DetectedSegment]
element_segments: list[DetectedSegment]
total_show_time: float
total_commercial_time: float
def to_dict(self) -> dict:
return {
"total_show_time": self.total_show_time,
"total_commercial_time": self.total_commercial_time,
"segments": [
{
"start": s.start,
"end": s.end,
"type": s.segment_type.value,
"confidence": s.confidence,
"label": s.label,
"signals": s.signals,
}
for s in self.segments
],
}
def save(self, output_dir: Path):
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_dir / "detection-report.json", "w") as f:
json.dump(self.to_dict(), f, indent=2)
def print_summary(self):
table = Table(title="Segment Detection Results")
table.add_column("Time", style="cyan")
table.add_column("Duration", style="magenta")
table.add_column("Type", style="green")
table.add_column("Confidence", style="yellow")
table.add_column("Label")
for seg in self.segments:
start = _format_time(seg.start)
dur = f"{seg.duration:.0f}s"
type_style = {
SegmentType.SHOW_CONTENT: "[green]SHOW[/green]",
SegmentType.COMMERCIAL: "[red]COMMERCIAL[/red]",
SegmentType.SHOW_ELEMENT: "[blue]ELEMENT[/blue]",
SegmentType.SILENCE: "[dim]SILENCE[/dim]",
SegmentType.UNKNOWN: "[yellow]UNKNOWN[/yellow]",
}.get(seg.segment_type, str(seg.segment_type))
table.add_row(start, dur, type_style, f"{seg.confidence:.2f}", seg.label)
console.print(table)
console.print(f"\nShow content: {self.total_show_time / 60:.1f} min")
console.print(f"Commercials: {self.total_commercial_time / 60:.1f} min")
def _format_time(seconds: float) -> str:
m = int(seconds // 60)
s = int(seconds % 60)
return f"{m:02d}:{s:02d}"
class SegmentDetector:
"""Multi-signal commercial/show content detector."""
def __init__(self, config):
self.config = config
self.weights = config.segment_detection.weights
def detect(self, audio_path: Path, transcript=None, diarization=None,
show_prep=None) -> SegmentDetectionResult:
"""Run all detection signals and combine scores."""
console.print(f"[bold]Detecting segments:[/bold] {audio_path.name}")
# Load audio for analysis
audio_data, sample_rate = self._load_audio(audio_path)
duration = len(audio_data) / sample_rate
# Step 1: Find candidate boundaries using silence detection
boundaries = self._detect_silence_boundaries(audio_data, sample_rate)
console.print(f"[dim]Found {len(boundaries)} silence boundaries[/dim]")
# Step 2: Find hard break points from transcript (most reliable signal)
transcript_breaks = []
if transcript:
transcript_breaks = self._find_transcript_breaks(transcript)
console.print(f"[dim]Found {len(transcript_breaks)} break cues in transcript[/dim]")
# Step 3: Create segments using transcript breaks as primary boundaries,
# with silence boundaries refining the exact cut points
if transcript_breaks:
candidates = self._create_segments_from_breaks(
transcript_breaks, boundaries, audio_data, sample_rate, duration
)
else:
candidates = self._create_candidate_segments(boundaries, duration)
# Step 4: Score each candidate with all available signals
for candidate in candidates:
scores = {}
scores["fingerprint"] = self._score_fingerprint(
audio_data, sample_rate, candidate
)
if diarization:
scores["speaker"] = self._score_speaker_identity(
diarization, candidate
)
else:
scores["speaker"] = 0.5
scores["audio_chars"] = self._score_audio_characteristics(
audio_data, sample_rate, candidate
)
if transcript:
scores["structural"] = self._score_structural(
transcript, candidate
)
else:
scores["structural"] = 0.5
commercial_score = (
self.weights.fingerprint_match * scores.get("fingerprint", 0.5) +
self.weights.speaker_identity * scores.get("speaker", 0.5) +
self.weights.audio_characteristics * scores.get("audio_chars", 0.5) +
self.weights.structural_heuristic * scores.get("structural", 0.5)
)
candidate.signals = scores
# If segment was already typed by transcript breaks, keep it
if candidate.segment_type == SegmentType.UNKNOWN:
candidate.confidence = commercial_score
if commercial_score >= self.config.segment_detection.confidence_threshold:
candidate.segment_type = SegmentType.COMMERCIAL
else:
candidate.segment_type = SegmentType.SHOW_CONTENT
else:
candidate.confidence = max(commercial_score, 0.80)
# Step 5: Merge adjacent segments of same type
merged = self._merge_adjacent(candidates)
# Step 6: Apply duration constraints
final = self._apply_constraints(merged)
# Step 7: Label show segments using show prep if available
if show_prep:
self._label_from_prep(final, transcript, show_prep)
# Build result
show_segs = [s for s in final if s.segment_type == SegmentType.SHOW_CONTENT]
comm_segs = [s for s in final if s.segment_type == SegmentType.COMMERCIAL]
elem_segs = [s for s in final if s.segment_type == SegmentType.SHOW_ELEMENT]
result = SegmentDetectionResult(
segments=final,
show_segments=show_segs,
commercial_segments=comm_segs,
element_segments=elem_segs,
total_show_time=sum(s.duration for s in show_segs),
total_commercial_time=sum(s.duration for s in comm_segs),
)
result.print_summary()
return result
def _load_audio(self, audio_path: Path) -> tuple[np.ndarray, int]:
"""Load audio file as mono numpy array."""
import subprocess
import io
import struct
# Use ffmpeg to decode to raw PCM
result = subprocess.run(
["ffmpeg", "-i", str(audio_path), "-f", "s16le", "-ac", "1",
"-ar", "16000", "-"],
capture_output=True, timeout=300,
)
audio = np.frombuffer(result.stdout, dtype=np.int16).astype(np.float32) / 32768.0
return audio, 16000
def _detect_silence_boundaries(self, audio: np.ndarray, sr: int,
min_silence_ms: int = 500) -> list[float]:
"""Detect silence gaps in audio that likely indicate segment boundaries."""
frame_size = int(sr * 0.025) # 25ms frames
hop_size = int(sr * 0.010) # 10ms hop
threshold_db = self.config.segment_detection.silence_threshold_db
threshold_amp = 10 ** (threshold_db / 20)
min_silence_frames = int(min_silence_ms / 10)
# Calculate frame energy
energies = []
for i in range(0, len(audio) - frame_size, hop_size):
frame = audio[i:i + frame_size]
rms = np.sqrt(np.mean(frame ** 2))
energies.append(rms)
# Find silence regions
is_silent = [e < threshold_amp for e in energies]
boundaries = []
silent_count = 0
for i, silent in enumerate(is_silent):
if silent:
silent_count += 1
else:
if silent_count >= min_silence_frames:
# Mark the midpoint of the silence as a boundary
mid_frame = i - silent_count // 2
boundary_time = mid_frame * 0.010
boundaries.append(boundary_time)
silent_count = 0
return boundaries
def _find_transcript_breaks(self, transcript) -> list[dict]:
"""Find commercial break points from transcript content."""
break_cues = []
going_to_break = [
"take a quick break", "take a break", "go to commercial",
"going to break", "let's go to break", "we'll be right back",
"right back after", "news break coming up", "after the news",
"be right back", "stay tuned", "don't go anywhere",
]
coming_back = [
"welcome back", "we're back", "we are back", "back from the break",
"back from break", "back on the", "back with you",
]
for seg in transcript.segments:
text = seg.text.lower().strip()
for cue in going_to_break:
if cue in text:
break_cues.append({
"type": "break_start",
"time": seg.end,
"text": seg.text.strip(),
"cue": cue,
})
break
for cue in coming_back:
if cue in text:
break_cues.append({
"type": "break_end",
"time": seg.start,
"text": seg.text.strip(),
"cue": cue,
})
break
return break_cues
def _create_segments_from_breaks(self, transcript_breaks: list[dict],
silence_boundaries: list[float],
audio: np.ndarray, sr: int,
total_duration: float) -> list[DetectedSegment]:
"""Create segments using transcript break cues as primary boundaries.
For each break_start, find the nearest silence boundary after it (exact cut point).
For each break_end, find the nearest silence boundary before it.
The gap between break_start and break_end = commercial break.
"""
segments = []
# Pair up break_start with the next break_end
break_regions = []
i = 0
while i < len(transcript_breaks):
cue = transcript_breaks[i]
if cue["type"] == "break_start":
# Find the matching break_end
end_time = None
for j in range(i + 1, len(transcript_breaks)):
if transcript_breaks[j]["type"] == "break_end":
end_time = transcript_breaks[j]["time"]
i = j + 1
break
if end_time is None:
# No matching end — assume break lasts until a reasonable point
# (5 minutes max, or until end of audio)
end_time = min(cue["time"] + 300, total_duration)
i += 1
# Snap to nearest silence boundaries for clean cuts
start = self._nearest_silence(cue["time"], silence_boundaries, after=True)
end = self._nearest_silence(end_time, silence_boundaries, after=False)
if start and end and end > start:
break_regions.append((start, end))
elif start:
break_regions.append((start, end_time))
else:
i += 1
if not break_regions:
return self._create_candidate_segments(silence_boundaries, total_duration)
# Build segments: show → commercial → show → commercial → ...
prev_end = 0.0
for break_start, break_end in break_regions:
# Show content before this break
if break_start - prev_end > 1.0:
segments.append(DetectedSegment(
start=prev_end,
end=break_start,
segment_type=SegmentType.SHOW_CONTENT,
confidence=0.85,
label="",
))
# Commercial break
segments.append(DetectedSegment(
start=break_start,
end=break_end,
segment_type=SegmentType.COMMERCIAL,
confidence=0.85,
label="",
))
prev_end = break_end
# Final show segment after last break
if total_duration - prev_end > 1.0:
segments.append(DetectedSegment(
start=prev_end,
end=total_duration,
segment_type=SegmentType.SHOW_CONTENT,
confidence=0.85,
label="",
))
return segments
def _nearest_silence(self, time: float, boundaries: list[float],
after: bool = True, max_distance: float = 10.0) -> float | None:
"""Find the nearest silence boundary to a given time."""
best = None
best_dist = max_distance
for b in boundaries:
dist = abs(b - time)
if dist > max_distance:
continue
if after and b >= time and dist < best_dist:
best = b
best_dist = dist
elif not after and b <= time and dist < best_dist:
best = b
best_dist = dist
return best
def _create_candidate_segments(self, boundaries: list[float],
total_duration: float) -> list[DetectedSegment]:
"""Create candidate segments from silence boundaries."""
candidates = []
prev = 0.0
for boundary in boundaries:
if boundary - prev > 1.0: # Ignore segments < 1 second
candidates.append(DetectedSegment(
start=prev,
end=boundary,
segment_type=SegmentType.UNKNOWN,
confidence=0.0,
))
prev = boundary
# Final segment
if total_duration - prev > 1.0:
candidates.append(DetectedSegment(
start=prev,
end=total_duration,
segment_type=SegmentType.UNKNOWN,
confidence=0.0,
))
return candidates
def _score_fingerprint(self, audio: np.ndarray, sr: int,
segment: DetectedSegment) -> float:
"""Score based on audio fingerprint matching against element library.
Returns 0.0 (no match / definitely show) to 1.0 (definite commercial boundary).
"""
# TODO: Implement fingerprint matching against element-library/fingerprints.db
# For now, return neutral score
return 0.5
def _score_speaker_identity(self, diarization, segment: DetectedSegment) -> float:
"""Score based on whether the host is speaking.
Returns 0.0 (host definitely speaking = show content)
to 1.0 (host definitely absent = likely commercial).
"""
host_time = 0.0
total_time = segment.duration
for turn in diarization.turns:
if turn.end < segment.start or turn.start > segment.end:
continue
# Calculate overlap
overlap_start = max(turn.start, segment.start)
overlap_end = min(turn.end, segment.end)
overlap = max(0, overlap_end - overlap_start)
if "host" in turn.speaker.lower():
host_time += overlap
if total_time == 0:
return 0.5
host_fraction = host_time / total_time
# Invert: high host presence = low commercial score
return 1.0 - host_fraction
def _score_audio_characteristics(self, audio: np.ndarray, sr: int,
segment: DetectedSegment) -> float:
"""Score based on audio production characteristics.
Commercials tend to be louder, more compressed, different spectral profile.
Returns 0.0 (matches show characteristics) to 1.0 (matches commercial characteristics).
"""
start_sample = int(segment.start * sr)
end_sample = min(int(segment.end * sr), len(audio))
seg_audio = audio[start_sample:end_sample]
if len(seg_audio) < sr: # Less than 1 second
return 0.5
# RMS energy (commercials tend to be louder)
rms = np.sqrt(np.mean(seg_audio ** 2))
# Dynamic range (commercials tend to be more compressed)
frame_size = int(sr * 0.050) # 50ms frames
frame_rms = []
for i in range(0, len(seg_audio) - frame_size, frame_size):
frame = seg_audio[i:i + frame_size]
frame_rms.append(np.sqrt(np.mean(frame ** 2)))
if not frame_rms:
return 0.5
dynamic_range = max(frame_rms) / (min(frame_rms) + 1e-8)
# Simple heuristic scoring:
# High RMS + low dynamic range = compressed commercial audio
score = 0.5
if rms > 0.15: # Louder than typical speech
score += 0.15
if dynamic_range < 5.0: # Very compressed
score += 0.15
return min(1.0, max(0.0, score))
def _score_structural(self, transcript, segment: DetectedSegment) -> float:
"""Score based on transcript content structural cues.
Returns 0.0 (show content cues found) to 1.0 (commercial cues found).
"""
text = transcript.text_at(segment.start, segment.end).lower()
# Show content indicators
show_phrases = [
"welcome back", "let's move on", "next up", "our next topic",
"let's talk about", "as i mentioned", "the question is",
"caller", "what do you think", "here's the thing",
]
# Commercial/break indicators
break_phrases = [
"we'll be right back", "stay tuned", "don't go anywhere",
"after the break", "when we come back",
]
show_hits = sum(1 for p in show_phrases if p in text)
break_hits = sum(1 for p in break_phrases if p in text)
if show_hits > 0 and break_hits == 0:
return 0.2 # Likely show content
if break_hits > 0:
return 0.8 # Likely near a break
return 0.5 # Neutral
def _merge_adjacent(self, segments: list[DetectedSegment]) -> list[DetectedSegment]:
"""Merge adjacent and overlapping segments of the same type."""
if not segments:
return []
# Sort by start time first
segments.sort(key=lambda s: s.start)
merged = [segments[0]]
for seg in segments[1:]:
prev = merged[-1]
# Merge if same type AND (overlapping or within 2 seconds)
if (prev.segment_type == seg.segment_type and
seg.start <= prev.end + 2.0):
prev.end = max(prev.end, seg.end)
prev.confidence = (prev.confidence + seg.confidence) / 2
else:
merged.append(seg)
return merged
def _apply_constraints(self, segments: list[DetectedSegment]) -> list[DetectedSegment]:
"""Apply duration constraints — short 'commercial' segments are likely misclassified."""
min_break = self.config.segment_detection.min_break_duration_s
for seg in segments:
if (seg.segment_type == SegmentType.COMMERCIAL and
seg.duration < min_break):
seg.segment_type = SegmentType.SHOW_CONTENT
seg.label = "(reclassified: too short for commercial)"
return segments
def _label_from_prep(self, segments: list[DetectedSegment],
transcript, show_prep: str):
"""Label show segments by matching transcript content to show prep topics."""
# TODO: Use Ollama to match transcript sections against show prep segment titles
# For now, number them sequentially
show_count = 0
comm_count = 0
for seg in segments:
if seg.segment_type == SegmentType.SHOW_CONTENT:
show_count += 1
seg.label = f"Show Segment {show_count}"
elif seg.segment_type == SegmentType.COMMERCIAL:
comm_count += 1
seg.label = f"Commercial Break {comm_count}"