diff --git a/agent/src/viewer/decoder.rs b/agent/src/viewer/decoder.rs index 85dffa3..254faa3 100644 --- a/agent/src/viewer/decoder.rs +++ b/agent/src/viewer/decoder.rs @@ -21,8 +21,9 @@ use windows::Win32::Media::MediaFoundation::{ MFSTARTUP_LITE, MFT_CATEGORY_VIDEO_DECODER, MFT_ENUM_FLAG_SORTANDFILTER, MFT_ENUM_FLAG_SYNCMFT, MFT_MESSAGE_NOTIFY_BEGIN_STREAMING, MFT_MESSAGE_NOTIFY_END_OF_STREAM, MFT_MESSAGE_NOTIFY_END_STREAMING, MFT_MESSAGE_NOTIFY_START_OF_STREAM, MFT_OUTPUT_DATA_BUFFER, - MFT_OUTPUT_STREAM_INFO, MFT_REGISTER_TYPE_INFO, MF_E_TRANSFORM_NEED_MORE_INPUT, - MF_E_TRANSFORM_STREAM_CHANGE, MF_MT_FRAME_SIZE, MF_MT_MAJOR_TYPE, MF_MT_SUBTYPE, + MFT_OUTPUT_STREAM_INFO, MFT_REGISTER_TYPE_INFO, MF_E_NOTACCEPTING, + MF_E_TRANSFORM_NEED_MORE_INPUT, MF_E_TRANSFORM_STREAM_CHANGE, MF_E_TRANSFORM_TYPE_NOT_SET, + MF_MT_FRAME_SIZE, MF_MT_MAJOR_TYPE, MF_MT_SUBTYPE, }; /// A decoded NV12 frame and its dimensions, ready for NV12 -> BGRA conversion. @@ -91,15 +92,32 @@ impl H264Decoder { Ok(()) } - /// Set the decoder output type to NV12 once the stream size is known. - unsafe fn configure_output_nv12(&mut self) -> Result<()> { - let out_type: IMFMediaType = MFCreateMediaType().context("MFCreateMediaType(dec out)")?; - out_type.SetGUID(&MF_MT_MAJOR_TYPE, &MFMediaType_Video)?; - out_type.SetGUID(&MF_MT_SUBTYPE, &MFVideoFormat_NV12)?; - self.transform - .SetOutputType(self.output_stream_id, &out_type, 0) - .context("SetOutputType(NV12 decode)")?; - Ok(()) + /// Negotiate the decoder's NV12 output type by ENUMERATING the available + /// output types it offers (these carry the decoder-negotiated frame size), + /// then setting the NV12 one. The Microsoft H.264 decoder MFT rejects a + /// hand-built, underspecified output type, so we must select from what it + /// exposes after it has parsed enough of the bitstream. Driven by a + /// STREAM_CHANGE / TYPE_NOT_SET round-trip — never set eagerly. + unsafe fn negotiate_output_type(&mut self) -> Result<()> { + let mut index: u32 = 0; + // GetOutputAvailableType returns Err (MF_E_NO_MORE_TYPES) past the last + // entry, which ends the enumeration. + while let Ok(mt) = self + .transform + .GetOutputAvailableType(self.output_stream_id, index) + { + let subtype = mt + .GetGUID(&MF_MT_SUBTYPE) + .context("read available output subtype")?; + if subtype == MFVideoFormat_NV12 { + self.transform + .SetOutputType(self.output_stream_id, &mt, 0) + .context("SetOutputType(NV12 decode)")?; + return Ok(()); + } + index += 1; + } + Err(anyhow!("decoder offered no NV12 output type")) } /// Read the negotiated output frame size from the decoder's current output type. @@ -129,42 +147,79 @@ impl H264Decoder { Ok(()) } - /// Feed one H.264 access unit and return a decoded BGRA frame if one is - /// produced this tick. `Ok(None)` means the decoder needs more input (normal - /// while it buffers the first GOP). - pub fn decode(&mut self, h264: &[u8], pts_100ns: i64) -> Result> { + /// Feed one H.264 access unit and return all BGRA frames the decoder emits + /// in response. A single input access unit can legitimately yield zero, one, + /// or more decoded frames, so the result is a `Vec`. + /// + /// This implements the Media Foundation MFT streaming contract: `ProcessInput` + /// may return `MF_E_NOTACCEPTING`, which is NOT an error — it means the decoder + /// has pending output that must be fully drained via `ProcessOutput` before it + /// will accept the next input. The previous implementation treated NOTACCEPTING + /// as fatal and only drained one frame per call, so once the MFT filled up it + /// rejected every subsequent frame (0xC00D36B5) and nothing rendered. We now + /// drain on back-pressure, retry the same (unconsumed) sample, then drain ALL + /// ready outputs before returning. + pub fn decode(&mut self, h264: &[u8], pts_100ns: i64) -> Result> { + let mut out = Vec::new(); if h264.is_empty() { - return Ok(None); + return Ok(out); } unsafe { self.ensure_streaming()?; let sample = make_input_sample(h264, pts_100ns)?; - match self - .transform - .ProcessInput(self.input_stream_id, &sample, 0) - { - Ok(()) => {} - Err(e) if e.code() == MF_E_TRANSFORM_NEED_MORE_INPUT => {} - Err(e) => return Err(anyhow!("decoder ProcessInput failed: {e:#}")), + + // Submit the sample, tolerating back-pressure. On NOTACCEPTING the + // sample is NOT consumed, so we drain pending output and re-submit the + // same `&sample`. + loop { + match self + .transform + .ProcessInput(self.input_stream_id, &sample, 0) + { + // Input accepted (or accepted while still wanting more). + Ok(()) => break, + Err(e) if e.code() == MF_E_TRANSFORM_NEED_MORE_INPUT => break, + // Back-pressure: drain a pending output, then retry the SAME + // sample (it was not consumed). + Err(e) if e.code() == MF_E_NOTACCEPTING => { + match self.drain_one()? { + Some(frame) => { + out.push(frame); + continue; + } + // Pathological: decoder won't accept input yet has + // nothing to drain. Don't spin — warn once and drop + // this access unit. + None => { + tracing::warn!( + "H.264 decoder reported NOTACCEPTING with no drainable output; dropping access unit" + ); + return Ok(out); + } + } + } + Err(e) => return Err(anyhow!("decoder ProcessInput failed: {e:#}")), + } } - self.drain_one() + // Drain every output the decoder has ready for this input. + while let Some(frame) = self.drain_one()? { + out.push(frame); + } + + Ok(out) } } /// Drain one decoded output sample, handling the initial NV12 output-type /// negotiation (`MF_E_TRANSFORM_STREAM_CHANGE`). unsafe fn drain_one(&mut self) -> Result> { + // Tracks whether we have already (re)negotiated the output type during + // THIS drain call. Guards against spinning forever if the decoder keeps + // surfacing TYPE_NOT_SET / STREAM_CHANGE without making progress. + let mut negotiated = false; loop { - // If we have not yet set an output type, do so now (NV12). The first - // ProcessOutput typically returns STREAM_CHANGE until this is set. - if self.width == 0 { - // Try to set NV12 output; ignore failures here (the decoder may - // require a STREAM_CHANGE round-trip first). - let _ = self.configure_output_nv12(); - } - let stream_info: MFT_OUTPUT_STREAM_INFO = self .transform .GetOutputStreamInfo(self.output_stream_id) @@ -214,11 +269,26 @@ impl H264Decoder { })); } Err(e) if e.code() == MF_E_TRANSFORM_NEED_MORE_INPUT => return Ok(None), - Err(e) if e.code() == MF_E_TRANSFORM_STREAM_CHANGE => { - // The decoder learned the frame size: (re)negotiate NV12 out, - // record the size, and retry the drain. - self.configure_output_nv12() + // Both of these mean "you must (re)negotiate the output type now." + // STREAM_CHANGE fires once the decoder has parsed the sequence + // header and learned the real frame size; depending on input + // timing the MS decoder may surface TYPE_NOT_SET instead. Handle + // them identically: enumerate the decoder's available output + // types, set the NV12 one, record the negotiated size, and retry. + Err(e) + if e.code() == MF_E_TRANSFORM_STREAM_CHANGE + || e.code() == MF_E_TRANSFORM_TYPE_NOT_SET => + { + // We already negotiated once this drain yet the decoder still + // demands a type: bail rather than spin forever. + if negotiated { + return Err(anyhow!( + "decoder still reports output type not set after renegotiation: {e:#}" + )); + } + self.negotiate_output_type() .context("decoder output renegotiation after stream change")?; + negotiated = true; if let Ok((w, h)) = self.read_output_size() { self.width = w; self.height = h; diff --git a/agent/src/viewer/input.rs b/agent/src/viewer/input.rs index 34d66b7..da43ae1 100644 --- a/agent/src/viewer/input.rs +++ b/agent/src/viewer/input.rs @@ -27,9 +27,8 @@ use tracing::trace; use windows::{ Win32::Foundation::{LPARAM, LRESULT, WPARAM}, Win32::UI::WindowsAndMessaging::{ - CallNextHookEx, DispatchMessageW, PeekMessageW, SetWindowsHookExW, TranslateMessage, - UnhookWindowsHookEx, HHOOK, KBDLLHOOKSTRUCT, LLKHF_EXTENDED, MSG, PM_REMOVE, - WH_KEYBOARD_LL, WM_KEYDOWN, WM_KEYUP, WM_SYSKEYDOWN, WM_SYSKEYUP, + CallNextHookEx, SetWindowsHookExW, UnhookWindowsHookEx, HHOOK, KBDLLHOOKSTRUCT, + LLKHF_EXTENDED, WH_KEYBOARD_LL, WM_KEYDOWN, WM_KEYUP, WM_SYSKEYDOWN, WM_SYSKEYUP, }, }; @@ -237,18 +236,6 @@ fn current_modifiers() -> proto::Modifiers { } } -/// Pump Windows message queue (required for hooks to work). -#[cfg(windows)] -pub fn pump_messages() { - unsafe { - let mut msg = MSG::default(); - while PeekMessageW(&mut msg, None, 0, 0, PM_REMOVE).as_bool() { - let _ = TranslateMessage(&msg); - DispatchMessageW(&msg); - } - } -} - // Non-Windows stubs #[cfg(not(windows))] #[allow(dead_code)] @@ -262,10 +249,6 @@ impl KeyboardHook { } } -#[cfg(not(windows))] -#[allow(dead_code)] -pub fn pump_messages() {} - #[cfg(test)] mod tests { use super::*; diff --git a/agent/src/viewer/mod.rs b/agent/src/viewer/mod.rs index 351dfc6..c7d54e6 100644 --- a/agent/src/viewer/mod.rs +++ b/agent/src/viewer/mod.rs @@ -74,20 +74,27 @@ fn spawn_h264_decode_worker( let dec = decoder.as_mut().expect("decoder present after init"); match dec.decode(&data, pts) { - Ok(Some(decoded)) => { - let frame = render::FrameData { - width: decoded.width, - height: decoded.height, - data: decoded.bgra, - compressed: false, // already BGRA - is_keyframe: false, - }; - if viewer_tx.blocking_send(ViewerEvent::Frame(frame)).is_err() { - // Viewer closed; stop the worker. + // One input access unit may yield zero, one, or more frames. + Ok(frames) => { + let mut viewer_closed = false; + for decoded in frames { + let frame = render::FrameData { + width: decoded.width, + height: decoded.height, + data: decoded.bgra, + compressed: false, // already BGRA + is_keyframe: false, + }; + if viewer_tx.blocking_send(ViewerEvent::Frame(frame)).is_err() { + // Viewer closed; stop the worker. + viewer_closed = true; + break; + } + } + if viewer_closed { break; } } - Ok(None) => { /* decoder buffering; no output this tick */ } Err(e) => { warn!("H.264 decode error: {e:#}"); } diff --git a/agent/src/viewer/render.rs b/agent/src/viewer/render.rs index 0d05882..ea63864 100644 --- a/agent/src/viewer/render.rs +++ b/agent/src/viewer/render.rs @@ -465,9 +465,11 @@ impl ApplicationHandler for ViewerApp { // Keep checking for events event_loop.set_control_flow(ControlFlow::Poll); - // Process Windows messages for keyboard hook - #[cfg(windows)] - input::pump_messages(); + // NOTE: do NOT manually pump the Win32 message queue here. winit's own + // run_app loop already pumps this thread's messages (which also services + // the low-level keyboard hook). A manual PeekMessage/DispatchMessage pump + // inside about_to_wait steals winit's messages and re-enters its window + // proc, freezing the event loop after one iteration (blank viewer). // Request redraw periodically to check for new frames if let Some(window) = &self.window {