From 97780304e7896e6a694633d4db0e4512b59946d8 Mon Sep 17 00:00:00 2001 From: Mike Swanson Date: Sun, 31 May 2026 11:25:05 -0700 Subject: [PATCH] fix(agent): make native H.264 viewer render live frames The native viewer's H.264 path (Task 7 first-cut, compile-verified only) never rendered a frame. Three stacked bugs, all confirmed via live loopback: 1. decoder: MF_E_NOTACCEPTING (0xC00D36B5) was treated as fatal and only one output was drained per call, so once the MFT filled it rejected every subsequent frame. decode() now returns Vec, drains on back-pressure and retries the unconsumed sample, then drains all ready outputs. 2. decoder: the NV12 output type was hand-built and rejected by the MS H.264 decoder MFT (MF_E_TRANSFORM_TYPE_NOT_SET, 0xC00D6D60). It is now negotiated by enumerating GetOutputAvailableType on STREAM_CHANGE / TYPE_NOT_SET. 3. render: a manual pump_messages() in about_to_wait stole winit's own thread messages and froze the event loop after one iteration, so frames were never drained from the channel. Removed; winit's run_app pump already services the WH_KEYBOARD_LL hook. Validated on a 5070 loopback: 0 decode errors, frames decode/paint/present (present count 0 -> 1740). Reviewed (APPROVE-WITH-NITS); diagnostics stripped. Co-Authored-By: Claude Opus 4.8 (1M context) --- agent/src/viewer/decoder.rs | 142 +++++++++++++++++++++++++++--------- agent/src/viewer/input.rs | 21 +----- agent/src/viewer/mod.rs | 29 +++++--- agent/src/viewer/render.rs | 8 +- 4 files changed, 131 insertions(+), 69 deletions(-) diff --git a/agent/src/viewer/decoder.rs b/agent/src/viewer/decoder.rs index 85dffa3..254faa3 100644 --- a/agent/src/viewer/decoder.rs +++ b/agent/src/viewer/decoder.rs @@ -21,8 +21,9 @@ use windows::Win32::Media::MediaFoundation::{ MFSTARTUP_LITE, MFT_CATEGORY_VIDEO_DECODER, MFT_ENUM_FLAG_SORTANDFILTER, MFT_ENUM_FLAG_SYNCMFT, MFT_MESSAGE_NOTIFY_BEGIN_STREAMING, MFT_MESSAGE_NOTIFY_END_OF_STREAM, MFT_MESSAGE_NOTIFY_END_STREAMING, MFT_MESSAGE_NOTIFY_START_OF_STREAM, MFT_OUTPUT_DATA_BUFFER, - MFT_OUTPUT_STREAM_INFO, MFT_REGISTER_TYPE_INFO, MF_E_TRANSFORM_NEED_MORE_INPUT, - MF_E_TRANSFORM_STREAM_CHANGE, MF_MT_FRAME_SIZE, MF_MT_MAJOR_TYPE, MF_MT_SUBTYPE, + MFT_OUTPUT_STREAM_INFO, MFT_REGISTER_TYPE_INFO, MF_E_NOTACCEPTING, + MF_E_TRANSFORM_NEED_MORE_INPUT, MF_E_TRANSFORM_STREAM_CHANGE, MF_E_TRANSFORM_TYPE_NOT_SET, + MF_MT_FRAME_SIZE, MF_MT_MAJOR_TYPE, MF_MT_SUBTYPE, }; /// A decoded NV12 frame and its dimensions, ready for NV12 -> BGRA conversion. @@ -91,15 +92,32 @@ impl H264Decoder { Ok(()) } - /// Set the decoder output type to NV12 once the stream size is known. - unsafe fn configure_output_nv12(&mut self) -> Result<()> { - let out_type: IMFMediaType = MFCreateMediaType().context("MFCreateMediaType(dec out)")?; - out_type.SetGUID(&MF_MT_MAJOR_TYPE, &MFMediaType_Video)?; - out_type.SetGUID(&MF_MT_SUBTYPE, &MFVideoFormat_NV12)?; - self.transform - .SetOutputType(self.output_stream_id, &out_type, 0) - .context("SetOutputType(NV12 decode)")?; - Ok(()) + /// Negotiate the decoder's NV12 output type by ENUMERATING the available + /// output types it offers (these carry the decoder-negotiated frame size), + /// then setting the NV12 one. The Microsoft H.264 decoder MFT rejects a + /// hand-built, underspecified output type, so we must select from what it + /// exposes after it has parsed enough of the bitstream. Driven by a + /// STREAM_CHANGE / TYPE_NOT_SET round-trip — never set eagerly. + unsafe fn negotiate_output_type(&mut self) -> Result<()> { + let mut index: u32 = 0; + // GetOutputAvailableType returns Err (MF_E_NO_MORE_TYPES) past the last + // entry, which ends the enumeration. + while let Ok(mt) = self + .transform + .GetOutputAvailableType(self.output_stream_id, index) + { + let subtype = mt + .GetGUID(&MF_MT_SUBTYPE) + .context("read available output subtype")?; + if subtype == MFVideoFormat_NV12 { + self.transform + .SetOutputType(self.output_stream_id, &mt, 0) + .context("SetOutputType(NV12 decode)")?; + return Ok(()); + } + index += 1; + } + Err(anyhow!("decoder offered no NV12 output type")) } /// Read the negotiated output frame size from the decoder's current output type. @@ -129,42 +147,79 @@ impl H264Decoder { Ok(()) } - /// Feed one H.264 access unit and return a decoded BGRA frame if one is - /// produced this tick. `Ok(None)` means the decoder needs more input (normal - /// while it buffers the first GOP). - pub fn decode(&mut self, h264: &[u8], pts_100ns: i64) -> Result> { + /// Feed one H.264 access unit and return all BGRA frames the decoder emits + /// in response. A single input access unit can legitimately yield zero, one, + /// or more decoded frames, so the result is a `Vec`. + /// + /// This implements the Media Foundation MFT streaming contract: `ProcessInput` + /// may return `MF_E_NOTACCEPTING`, which is NOT an error — it means the decoder + /// has pending output that must be fully drained via `ProcessOutput` before it + /// will accept the next input. The previous implementation treated NOTACCEPTING + /// as fatal and only drained one frame per call, so once the MFT filled up it + /// rejected every subsequent frame (0xC00D36B5) and nothing rendered. We now + /// drain on back-pressure, retry the same (unconsumed) sample, then drain ALL + /// ready outputs before returning. + pub fn decode(&mut self, h264: &[u8], pts_100ns: i64) -> Result> { + let mut out = Vec::new(); if h264.is_empty() { - return Ok(None); + return Ok(out); } unsafe { self.ensure_streaming()?; let sample = make_input_sample(h264, pts_100ns)?; - match self - .transform - .ProcessInput(self.input_stream_id, &sample, 0) - { - Ok(()) => {} - Err(e) if e.code() == MF_E_TRANSFORM_NEED_MORE_INPUT => {} - Err(e) => return Err(anyhow!("decoder ProcessInput failed: {e:#}")), + + // Submit the sample, tolerating back-pressure. On NOTACCEPTING the + // sample is NOT consumed, so we drain pending output and re-submit the + // same `&sample`. + loop { + match self + .transform + .ProcessInput(self.input_stream_id, &sample, 0) + { + // Input accepted (or accepted while still wanting more). + Ok(()) => break, + Err(e) if e.code() == MF_E_TRANSFORM_NEED_MORE_INPUT => break, + // Back-pressure: drain a pending output, then retry the SAME + // sample (it was not consumed). + Err(e) if e.code() == MF_E_NOTACCEPTING => { + match self.drain_one()? { + Some(frame) => { + out.push(frame); + continue; + } + // Pathological: decoder won't accept input yet has + // nothing to drain. Don't spin — warn once and drop + // this access unit. + None => { + tracing::warn!( + "H.264 decoder reported NOTACCEPTING with no drainable output; dropping access unit" + ); + return Ok(out); + } + } + } + Err(e) => return Err(anyhow!("decoder ProcessInput failed: {e:#}")), + } } - self.drain_one() + // Drain every output the decoder has ready for this input. + while let Some(frame) = self.drain_one()? { + out.push(frame); + } + + Ok(out) } } /// Drain one decoded output sample, handling the initial NV12 output-type /// negotiation (`MF_E_TRANSFORM_STREAM_CHANGE`). unsafe fn drain_one(&mut self) -> Result> { + // Tracks whether we have already (re)negotiated the output type during + // THIS drain call. Guards against spinning forever if the decoder keeps + // surfacing TYPE_NOT_SET / STREAM_CHANGE without making progress. + let mut negotiated = false; loop { - // If we have not yet set an output type, do so now (NV12). The first - // ProcessOutput typically returns STREAM_CHANGE until this is set. - if self.width == 0 { - // Try to set NV12 output; ignore failures here (the decoder may - // require a STREAM_CHANGE round-trip first). - let _ = self.configure_output_nv12(); - } - let stream_info: MFT_OUTPUT_STREAM_INFO = self .transform .GetOutputStreamInfo(self.output_stream_id) @@ -214,11 +269,26 @@ impl H264Decoder { })); } Err(e) if e.code() == MF_E_TRANSFORM_NEED_MORE_INPUT => return Ok(None), - Err(e) if e.code() == MF_E_TRANSFORM_STREAM_CHANGE => { - // The decoder learned the frame size: (re)negotiate NV12 out, - // record the size, and retry the drain. - self.configure_output_nv12() + // Both of these mean "you must (re)negotiate the output type now." + // STREAM_CHANGE fires once the decoder has parsed the sequence + // header and learned the real frame size; depending on input + // timing the MS decoder may surface TYPE_NOT_SET instead. Handle + // them identically: enumerate the decoder's available output + // types, set the NV12 one, record the negotiated size, and retry. + Err(e) + if e.code() == MF_E_TRANSFORM_STREAM_CHANGE + || e.code() == MF_E_TRANSFORM_TYPE_NOT_SET => + { + // We already negotiated once this drain yet the decoder still + // demands a type: bail rather than spin forever. + if negotiated { + return Err(anyhow!( + "decoder still reports output type not set after renegotiation: {e:#}" + )); + } + self.negotiate_output_type() .context("decoder output renegotiation after stream change")?; + negotiated = true; if let Ok((w, h)) = self.read_output_size() { self.width = w; self.height = h; diff --git a/agent/src/viewer/input.rs b/agent/src/viewer/input.rs index 34d66b7..da43ae1 100644 --- a/agent/src/viewer/input.rs +++ b/agent/src/viewer/input.rs @@ -27,9 +27,8 @@ use tracing::trace; use windows::{ Win32::Foundation::{LPARAM, LRESULT, WPARAM}, Win32::UI::WindowsAndMessaging::{ - CallNextHookEx, DispatchMessageW, PeekMessageW, SetWindowsHookExW, TranslateMessage, - UnhookWindowsHookEx, HHOOK, KBDLLHOOKSTRUCT, LLKHF_EXTENDED, MSG, PM_REMOVE, - WH_KEYBOARD_LL, WM_KEYDOWN, WM_KEYUP, WM_SYSKEYDOWN, WM_SYSKEYUP, + CallNextHookEx, SetWindowsHookExW, UnhookWindowsHookEx, HHOOK, KBDLLHOOKSTRUCT, + LLKHF_EXTENDED, WH_KEYBOARD_LL, WM_KEYDOWN, WM_KEYUP, WM_SYSKEYDOWN, WM_SYSKEYUP, }, }; @@ -237,18 +236,6 @@ fn current_modifiers() -> proto::Modifiers { } } -/// Pump Windows message queue (required for hooks to work). -#[cfg(windows)] -pub fn pump_messages() { - unsafe { - let mut msg = MSG::default(); - while PeekMessageW(&mut msg, None, 0, 0, PM_REMOVE).as_bool() { - let _ = TranslateMessage(&msg); - DispatchMessageW(&msg); - } - } -} - // Non-Windows stubs #[cfg(not(windows))] #[allow(dead_code)] @@ -262,10 +249,6 @@ impl KeyboardHook { } } -#[cfg(not(windows))] -#[allow(dead_code)] -pub fn pump_messages() {} - #[cfg(test)] mod tests { use super::*; diff --git a/agent/src/viewer/mod.rs b/agent/src/viewer/mod.rs index 351dfc6..c7d54e6 100644 --- a/agent/src/viewer/mod.rs +++ b/agent/src/viewer/mod.rs @@ -74,20 +74,27 @@ fn spawn_h264_decode_worker( let dec = decoder.as_mut().expect("decoder present after init"); match dec.decode(&data, pts) { - Ok(Some(decoded)) => { - let frame = render::FrameData { - width: decoded.width, - height: decoded.height, - data: decoded.bgra, - compressed: false, // already BGRA - is_keyframe: false, - }; - if viewer_tx.blocking_send(ViewerEvent::Frame(frame)).is_err() { - // Viewer closed; stop the worker. + // One input access unit may yield zero, one, or more frames. + Ok(frames) => { + let mut viewer_closed = false; + for decoded in frames { + let frame = render::FrameData { + width: decoded.width, + height: decoded.height, + data: decoded.bgra, + compressed: false, // already BGRA + is_keyframe: false, + }; + if viewer_tx.blocking_send(ViewerEvent::Frame(frame)).is_err() { + // Viewer closed; stop the worker. + viewer_closed = true; + break; + } + } + if viewer_closed { break; } } - Ok(None) => { /* decoder buffering; no output this tick */ } Err(e) => { warn!("H.264 decode error: {e:#}"); } diff --git a/agent/src/viewer/render.rs b/agent/src/viewer/render.rs index 0d05882..ea63864 100644 --- a/agent/src/viewer/render.rs +++ b/agent/src/viewer/render.rs @@ -465,9 +465,11 @@ impl ApplicationHandler for ViewerApp { // Keep checking for events event_loop.set_control_flow(ControlFlow::Poll); - // Process Windows messages for keyboard hook - #[cfg(windows)] - input::pump_messages(); + // NOTE: do NOT manually pump the Win32 message queue here. winit's own + // run_app loop already pumps this thread's messages (which also services + // the low-level keyboard hook). A manual PeekMessage/DispatchMessage pump + // inside about_to_wait steals winit's messages and re-enters its window + // proc, freezing the event loop after one iteration (blank viewer). // Request redraw periodically to check for new frames if let Some(window) = &self.window {