feat(agent,server): v2 secure-session-core Task 7 - HW H.264 + negotiated raw fallback
All checks were successful
All checks were successful
SPEC-002 Phase 1 Task 7 (the last), code-reviewed APPROVED, locally verified (cargo fmt + clippy -D warnings exit 0 + cargo test --workspace 89 pass + build). - Encoder trait + factory: RawEncoder (salvaged, UNCHANGED) and H264Encoder, selected by negotiation; factory falls back to raw on H.264 init failure. - Negotiation: agent advertises supports_h264 (MFTEnumEx HW probe, cached) in AgentStatus; server picks the codec via select_video_codec(supports, prefer) and stamps StartStream.video_codec; agent re-guards on local HW. Policy constant DEFAULT_PREFER_H264 = false, so RAW is negotiated for every session today - H.264 stays dormant until live hardware validation (Task 8). - MF H.264 encoder (h264.rs, FIRST-CUT / compile-verified-only): HW encoder MFT, BGRA->NV12 (color.rs, unit-tested), sync drain, fall-back-to-raw on any failure. - Viewer H.264 decoder (decoder.rs, FIRST-CUT): MF decoder on a dedicated COM thread; drops+logs on failure, raw render path untouched. - proto additive: VideoCodec enum, StartStream.video_codec=3, SessionResponse.video_codec=5, AgentStatus.supports_h264=11. - Raw+Zstd path byte-for-byte unchanged; remains the guaranteed default/fallback. Review confirmed unsafe impl Send for H264Encoder is sound (single-owned &mut on the block_on thread; session future never spawned) and every MF failure degrades to raw. H.264 is NOT claimed functional - compile/clippy/build-verified only; live validation + force-IDR + the no-spawn-invariant doc are Task 8 go-live gates. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
452
agent/src/viewer/decoder.rs
Normal file
452
agent/src/viewer/decoder.rs
Normal file
@@ -0,0 +1,452 @@
|
||||
//! H.264 video decoder for the native viewer (Task 7).
|
||||
//!
|
||||
//! FIRST-CUT / COMPILE-VERIFIED ONLY. Decodes an H.264 elementary stream
|
||||
//! (`EncodedFrame{h264}`) via a Media Foundation H.264 decoder MFT into NV12,
|
||||
//! then converts NV12 -> BGRA so it can flow through the EXISTING raw render
|
||||
//! path (`render::FrameData { compressed: false, BGRA }`). Not yet validated on
|
||||
//! real hardware with a live stream — that is plan Task 8. On decode-init
|
||||
//! failure the decoder reports an error and the viewer logs it; the raw-frame
|
||||
//! render path is untouched for raw sessions.
|
||||
//!
|
||||
//! The decoder is created lazily on the first H.264 frame (so a raw session
|
||||
//! never spins up MF). It is `!Send` (COM), so it lives on the viewer's receive
|
||||
//! task and is wrapped accordingly by the caller.
|
||||
|
||||
#![cfg(windows)]
|
||||
|
||||
use anyhow::{anyhow, Context, Result};
|
||||
use windows::Win32::Media::MediaFoundation::{
|
||||
IMFMediaType, IMFSample, IMFTransform, MFCreateMediaType, MFCreateMemoryBuffer, MFCreateSample,
|
||||
MFMediaType_Video, MFShutdown, MFStartup, MFTEnumEx, MFVideoFormat_H264, MFVideoFormat_NV12,
|
||||
MFSTARTUP_LITE, MFT_CATEGORY_VIDEO_DECODER, MFT_ENUM_FLAG_SORTANDFILTER, MFT_ENUM_FLAG_SYNCMFT,
|
||||
MFT_MESSAGE_NOTIFY_BEGIN_STREAMING, MFT_MESSAGE_NOTIFY_END_OF_STREAM,
|
||||
MFT_MESSAGE_NOTIFY_END_STREAMING, MFT_MESSAGE_NOTIFY_START_OF_STREAM, MFT_OUTPUT_DATA_BUFFER,
|
||||
MFT_OUTPUT_STREAM_INFO, MFT_REGISTER_TYPE_INFO, MF_E_TRANSFORM_NEED_MORE_INPUT,
|
||||
MF_E_TRANSFORM_STREAM_CHANGE, MF_MT_FRAME_SIZE, MF_MT_MAJOR_TYPE, MF_MT_SUBTYPE,
|
||||
};
|
||||
|
||||
/// A decoded NV12 frame and its dimensions, ready for NV12 -> BGRA conversion.
|
||||
pub struct DecodedFrame {
|
||||
pub width: u32,
|
||||
pub height: u32,
|
||||
/// BGRA pixels (4 bytes/px), ready for `render::FrameData`.
|
||||
pub bgra: Vec<u8>,
|
||||
}
|
||||
|
||||
/// Media Foundation H.264 decoder wrapper.
|
||||
pub struct H264Decoder {
|
||||
transform: IMFTransform,
|
||||
width: u32,
|
||||
height: u32,
|
||||
streaming: bool,
|
||||
input_stream_id: u32,
|
||||
output_stream_id: u32,
|
||||
mf_started: bool,
|
||||
}
|
||||
|
||||
// NOTE: H264Decoder is intentionally NOT `Send`. It wraps COM interfaces with
|
||||
// thread affinity and is created + used entirely on the dedicated `gc-h264-decode`
|
||||
// OS thread (see viewer::spawn_h264_decode_worker), so it never crosses a thread
|
||||
// boundary and does not need a Send assertion.
|
||||
|
||||
impl H264Decoder {
|
||||
/// Construct an H.264 decoder MFT and set its input type to H.264. The
|
||||
/// output type (NV12) is negotiated after the first frames decode the
|
||||
/// sequence header (we (re)read the real frame size on a stream change).
|
||||
pub fn new() -> Result<Self> {
|
||||
unsafe {
|
||||
MFStartup(mf_version(), MFSTARTUP_LITE).context("MFStartup (decoder)")?;
|
||||
let transform = match activate_decoder() {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
let _ = MFShutdown();
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
|
||||
let mut dec = Self {
|
||||
transform,
|
||||
width: 0,
|
||||
height: 0,
|
||||
streaming: false,
|
||||
input_stream_id: 0,
|
||||
output_stream_id: 0,
|
||||
mf_started: true,
|
||||
};
|
||||
|
||||
dec.configure_input()?;
|
||||
Ok(dec)
|
||||
}
|
||||
}
|
||||
|
||||
/// Set the decoder input type to H.264 (no fixed frame size — the decoder
|
||||
/// learns it from the bitstream).
|
||||
unsafe fn configure_input(&mut self) -> Result<()> {
|
||||
let in_type: IMFMediaType = MFCreateMediaType().context("MFCreateMediaType(dec in)")?;
|
||||
in_type.SetGUID(&MF_MT_MAJOR_TYPE, &MFMediaType_Video)?;
|
||||
in_type.SetGUID(&MF_MT_SUBTYPE, &MFVideoFormat_H264)?;
|
||||
self.transform
|
||||
.SetInputType(self.input_stream_id, &in_type, 0)
|
||||
.context("SetInputType(H264 decode)")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set the decoder output type to NV12 once the stream size is known.
|
||||
unsafe fn configure_output_nv12(&mut self) -> Result<()> {
|
||||
let out_type: IMFMediaType = MFCreateMediaType().context("MFCreateMediaType(dec out)")?;
|
||||
out_type.SetGUID(&MF_MT_MAJOR_TYPE, &MFMediaType_Video)?;
|
||||
out_type.SetGUID(&MF_MT_SUBTYPE, &MFVideoFormat_NV12)?;
|
||||
self.transform
|
||||
.SetOutputType(self.output_stream_id, &out_type, 0)
|
||||
.context("SetOutputType(NV12 decode)")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read the negotiated output frame size from the decoder's current output type.
|
||||
unsafe fn read_output_size(&mut self) -> Result<(u32, u32)> {
|
||||
let out_type = self
|
||||
.transform
|
||||
.GetOutputCurrentType(self.output_stream_id)
|
||||
.context("GetOutputCurrentType")?;
|
||||
let packed = out_type
|
||||
.GetUINT64(&MF_MT_FRAME_SIZE)
|
||||
.context("read MF_MT_FRAME_SIZE")?;
|
||||
let width = (packed >> 32) as u32;
|
||||
let height = (packed & 0xFFFF_FFFF) as u32;
|
||||
Ok((width, height))
|
||||
}
|
||||
|
||||
unsafe fn ensure_streaming(&mut self) -> Result<()> {
|
||||
if !self.streaming {
|
||||
self.transform
|
||||
.ProcessMessage(MFT_MESSAGE_NOTIFY_BEGIN_STREAMING, 0)
|
||||
.context("decoder BEGIN_STREAMING")?;
|
||||
self.transform
|
||||
.ProcessMessage(MFT_MESSAGE_NOTIFY_START_OF_STREAM, 0)
|
||||
.context("decoder START_OF_STREAM")?;
|
||||
self.streaming = true;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Feed one H.264 access unit and return a decoded BGRA frame if one is
|
||||
/// produced this tick. `Ok(None)` means the decoder needs more input (normal
|
||||
/// while it buffers the first GOP).
|
||||
pub fn decode(&mut self, h264: &[u8], pts_100ns: i64) -> Result<Option<DecodedFrame>> {
|
||||
if h264.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
unsafe {
|
||||
self.ensure_streaming()?;
|
||||
|
||||
let sample = make_input_sample(h264, pts_100ns)?;
|
||||
match self
|
||||
.transform
|
||||
.ProcessInput(self.input_stream_id, &sample, 0)
|
||||
{
|
||||
Ok(()) => {}
|
||||
Err(e) if e.code() == MF_E_TRANSFORM_NEED_MORE_INPUT => {}
|
||||
Err(e) => return Err(anyhow!("decoder ProcessInput failed: {e:#}")),
|
||||
}
|
||||
|
||||
self.drain_one()
|
||||
}
|
||||
}
|
||||
|
||||
/// Drain one decoded output sample, handling the initial NV12 output-type
|
||||
/// negotiation (`MF_E_TRANSFORM_STREAM_CHANGE`).
|
||||
unsafe fn drain_one(&mut self) -> Result<Option<DecodedFrame>> {
|
||||
loop {
|
||||
// If we have not yet set an output type, do so now (NV12). The first
|
||||
// ProcessOutput typically returns STREAM_CHANGE until this is set.
|
||||
if self.width == 0 {
|
||||
// Try to set NV12 output; ignore failures here (the decoder may
|
||||
// require a STREAM_CHANGE round-trip first).
|
||||
let _ = self.configure_output_nv12();
|
||||
}
|
||||
|
||||
let stream_info: MFT_OUTPUT_STREAM_INFO = self
|
||||
.transform
|
||||
.GetOutputStreamInfo(self.output_stream_id)
|
||||
.context("decoder GetOutputStreamInfo")?;
|
||||
|
||||
const MFT_OUTPUT_STREAM_PROVIDES_SAMPLES: u32 = 0x100;
|
||||
let mft_provides = stream_info.dwFlags & MFT_OUTPUT_STREAM_PROVIDES_SAMPLES != 0;
|
||||
|
||||
let mut out_buffer = MFT_OUTPUT_DATA_BUFFER {
|
||||
dwStreamID: self.output_stream_id,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
if !mft_provides {
|
||||
let alloc = stream_info.cbSize.max(self.guess_nv12_size());
|
||||
let sample: IMFSample = MFCreateSample().context("MFCreateSample(dec out)")?;
|
||||
let buffer =
|
||||
MFCreateMemoryBuffer(alloc).context("MFCreateMemoryBuffer(dec out)")?;
|
||||
sample.AddBuffer(&buffer)?;
|
||||
out_buffer.pSample = std::mem::ManuallyDrop::new(Some(sample));
|
||||
}
|
||||
|
||||
let mut status: u32 = 0;
|
||||
let mut bufs = [out_buffer];
|
||||
let hr = self.transform.ProcessOutput(0, &mut bufs, &mut status);
|
||||
let produced = std::mem::ManuallyDrop::take(&mut bufs[0].pSample);
|
||||
|
||||
match hr {
|
||||
Ok(()) => {
|
||||
// (Re)read the negotiated size in case it just became known.
|
||||
if let Ok((w, h)) = self.read_output_size() {
|
||||
self.width = w;
|
||||
self.height = h;
|
||||
}
|
||||
let Some(sample) = produced else {
|
||||
return Ok(None);
|
||||
};
|
||||
if self.width == 0 || self.height == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
let nv12 = sample_to_vec(&sample)?;
|
||||
let bgra = nv12_to_bgra(&nv12, self.width, self.height)?;
|
||||
return Ok(Some(DecodedFrame {
|
||||
width: self.width,
|
||||
height: self.height,
|
||||
bgra,
|
||||
}));
|
||||
}
|
||||
Err(e) if e.code() == MF_E_TRANSFORM_NEED_MORE_INPUT => return Ok(None),
|
||||
Err(e) if e.code() == MF_E_TRANSFORM_STREAM_CHANGE => {
|
||||
// The decoder learned the frame size: (re)negotiate NV12 out,
|
||||
// record the size, and retry the drain.
|
||||
self.configure_output_nv12()
|
||||
.context("decoder output renegotiation after stream change")?;
|
||||
if let Ok((w, h)) = self.read_output_size() {
|
||||
self.width = w;
|
||||
self.height = h;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
Err(e) => return Err(anyhow!("decoder ProcessOutput failed: {e:#}")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Conservative NV12 buffer estimate when the decoder doesn't report cbSize.
|
||||
fn guess_nv12_size(&self) -> u32 {
|
||||
if self.width != 0 && self.height != 0 {
|
||||
self.width * self.height * 3 / 2
|
||||
} else {
|
||||
// 1080p NV12 upper bound until the real size is known.
|
||||
1920 * 1080 * 3 / 2
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for H264Decoder {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
if self.streaming {
|
||||
let _ = self
|
||||
.transform
|
||||
.ProcessMessage(MFT_MESSAGE_NOTIFY_END_OF_STREAM, 0);
|
||||
let _ = self
|
||||
.transform
|
||||
.ProcessMessage(MFT_MESSAGE_NOTIFY_END_STREAMING, 0);
|
||||
}
|
||||
if self.mf_started {
|
||||
let _ = MFShutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Enumerate and activate an H.264 decoder MFT (hardware preferred, software
|
||||
/// acceptable — decode does not require a HW encoder).
|
||||
unsafe fn activate_decoder() -> Result<IMFTransform> {
|
||||
let input_type = MFT_REGISTER_TYPE_INFO {
|
||||
guidMajorType: MFMediaType_Video,
|
||||
guidSubtype: MFVideoFormat_H264,
|
||||
};
|
||||
|
||||
let mut activate_ptr: *mut Option<windows::Win32::Media::MediaFoundation::IMFActivate> =
|
||||
std::ptr::null_mut();
|
||||
let mut count: u32 = 0;
|
||||
|
||||
// Allow both HW and SW decoders; SYNCMFT keeps the simple ProcessInput/Output
|
||||
// contract this first cut uses.
|
||||
MFTEnumEx(
|
||||
MFT_CATEGORY_VIDEO_DECODER,
|
||||
MFT_ENUM_FLAG_SYNCMFT | MFT_ENUM_FLAG_SORTANDFILTER,
|
||||
Some(&input_type as *const _),
|
||||
None,
|
||||
&mut activate_ptr,
|
||||
&mut count,
|
||||
)
|
||||
.context("MFTEnumEx (H264 decoder)")?;
|
||||
|
||||
if count == 0 || activate_ptr.is_null() {
|
||||
if !activate_ptr.is_null() {
|
||||
windows::Win32::System::Com::CoTaskMemFree(Some(activate_ptr as *const _));
|
||||
}
|
||||
return Err(anyhow!("no H.264 decoder MFT available"));
|
||||
}
|
||||
|
||||
let slice = std::slice::from_raw_parts_mut(activate_ptr, count as usize);
|
||||
let mut chosen: Option<IMFTransform> = None;
|
||||
for entry in slice.iter_mut() {
|
||||
if chosen.is_none() {
|
||||
if let Some(activate) = entry.as_ref() {
|
||||
if let Ok(t) = activate.ActivateObject::<IMFTransform>() {
|
||||
chosen = Some(t);
|
||||
}
|
||||
}
|
||||
}
|
||||
entry.take();
|
||||
}
|
||||
windows::Win32::System::Com::CoTaskMemFree(Some(activate_ptr as *const _));
|
||||
|
||||
chosen.ok_or_else(|| anyhow!("failed to activate H.264 decoder MFT"))
|
||||
}
|
||||
|
||||
/// Wrap an H.264 access unit into an IMFSample.
|
||||
unsafe fn make_input_sample(data: &[u8], pts_100ns: i64) -> Result<IMFSample> {
|
||||
let sample: IMFSample = MFCreateSample().context("MFCreateSample(dec in)")?;
|
||||
let buffer = MFCreateMemoryBuffer(data.len() as u32).context("MFCreateMemoryBuffer(dec in)")?;
|
||||
|
||||
let mut ptr: *mut u8 = std::ptr::null_mut();
|
||||
let mut max_len: u32 = 0;
|
||||
buffer
|
||||
.Lock(&mut ptr, Some(&mut max_len), None)
|
||||
.context("decoder input Lock")?;
|
||||
if (max_len as usize) < data.len() || ptr.is_null() {
|
||||
let _ = buffer.Unlock();
|
||||
return Err(anyhow!("MF buffer too small for H.264 access unit"));
|
||||
}
|
||||
std::ptr::copy_nonoverlapping(data.as_ptr(), ptr, data.len());
|
||||
buffer.SetCurrentLength(data.len() as u32)?;
|
||||
buffer.Unlock()?;
|
||||
|
||||
sample.AddBuffer(&buffer)?;
|
||||
sample.SetSampleTime(pts_100ns)?;
|
||||
Ok(sample)
|
||||
}
|
||||
|
||||
/// Copy a sample's contiguous bytes into a Vec.
|
||||
unsafe fn sample_to_vec(sample: &IMFSample) -> Result<Vec<u8>> {
|
||||
let buffer = sample
|
||||
.ConvertToContiguousBuffer()
|
||||
.context("decoder ConvertToContiguousBuffer")?;
|
||||
let mut ptr: *mut u8 = std::ptr::null_mut();
|
||||
let mut len: u32 = 0;
|
||||
buffer
|
||||
.Lock(&mut ptr, None, Some(&mut len))
|
||||
.context("decoder output Lock")?;
|
||||
let out = if ptr.is_null() || len == 0 {
|
||||
Vec::new()
|
||||
} else {
|
||||
std::slice::from_raw_parts(ptr, len as usize).to_vec()
|
||||
};
|
||||
let _ = buffer.Unlock();
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// MF version word for `MFStartup` (see encoder::h264).
|
||||
fn mf_version() -> u32 {
|
||||
0x0002_0070
|
||||
}
|
||||
|
||||
/// Convert an NV12 buffer to BGRA (BT.601 limited range). Inverse of the
|
||||
/// encoder's BGRA->NV12. Shared with the unit tests below.
|
||||
pub fn nv12_to_bgra(nv12: &[u8], width: u32, height: u32) -> Result<Vec<u8>> {
|
||||
let w = width as usize;
|
||||
let h = height as usize;
|
||||
let y_size = w * h;
|
||||
let need = y_size * 3 / 2;
|
||||
if nv12.len() < need {
|
||||
return Err(anyhow!("NV12 buffer too small: {} < {}", nv12.len(), need));
|
||||
}
|
||||
|
||||
let (y_plane, uv_plane) = nv12.split_at(y_size);
|
||||
let mut bgra = vec![0u8; w * h * 4];
|
||||
let chroma_cols = w / 2;
|
||||
|
||||
for row in 0..h {
|
||||
for col in 0..w {
|
||||
let y = y_plane[row * w + col] as i32;
|
||||
let cx = col / 2;
|
||||
let cy = row / 2;
|
||||
let uv_idx = (cy * chroma_cols + cx) * 2;
|
||||
let u = uv_plane[uv_idx] as i32;
|
||||
let v = uv_plane[uv_idx + 1] as i32;
|
||||
|
||||
// BT.601 limited-range YUV -> RGB.
|
||||
let c = y - 16;
|
||||
let d = u - 128;
|
||||
let e = v - 128;
|
||||
let r = ((298 * c + 409 * e + 128) >> 8).clamp(0, 255);
|
||||
let g = ((298 * c - 100 * d - 208 * e + 128) >> 8).clamp(0, 255);
|
||||
let b = ((298 * c + 516 * d + 128) >> 8).clamp(0, 255);
|
||||
|
||||
let px = (row * w + col) * 4;
|
||||
bgra[px] = b as u8;
|
||||
bgra[px + 1] = g as u8;
|
||||
bgra[px + 2] = r as u8;
|
||||
bgra[px + 3] = 255;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(bgra)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::encoder::color::{bgra_to_nv12, nv12_size};
|
||||
|
||||
/// Round-trip a solid color through BGRA->NV12->BGRA. Chroma subsampling and
|
||||
/// limited-range rounding introduce small error, so allow a tolerance.
|
||||
#[test]
|
||||
fn nv12_bgra_roundtrip_is_approximately_lossless_for_solid_color() {
|
||||
let w = 4u32;
|
||||
let h = 4u32;
|
||||
// Mid gray.
|
||||
let mut bgra = vec![0u8; (w * h * 4) as usize];
|
||||
for px in bgra.chunks_mut(4) {
|
||||
px[0] = 120; // B
|
||||
px[1] = 120; // G
|
||||
px[2] = 120; // R
|
||||
px[3] = 255;
|
||||
}
|
||||
let mut nv12 = vec![0u8; nv12_size(w, h)];
|
||||
bgra_to_nv12(&bgra, w, h, &mut nv12).unwrap();
|
||||
let back = nv12_to_bgra(&nv12, w, h).unwrap();
|
||||
|
||||
for (orig, got) in bgra.chunks(4).zip(back.chunks(4)) {
|
||||
for ch in 0..3 {
|
||||
let diff = (orig[ch] as i32 - got[ch] as i32).abs();
|
||||
assert!(diff <= 6, "channel {ch} drift {diff} too large");
|
||||
}
|
||||
assert_eq!(got[3], 255, "alpha must be opaque");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nv12_to_bgra_rejects_short_buffer() {
|
||||
let nv12 = vec![0u8; 4];
|
||||
assert!(nv12_to_bgra(&nv12, 16, 16).is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn black_nv12_decodes_to_black_bgra() {
|
||||
// Limited-range black: Y=16, UV=128.
|
||||
let w = 2u32;
|
||||
let h = 2u32;
|
||||
let mut nv12 = vec![128u8; nv12_size(w, h)];
|
||||
for y in nv12.iter_mut().take((w * h) as usize) {
|
||||
*y = 16;
|
||||
}
|
||||
let bgra = nv12_to_bgra(&nv12, w, h).unwrap();
|
||||
for px in bgra.chunks(4) {
|
||||
assert!(px[0] <= 2 && px[1] <= 2 && px[2] <= 2, "near-black");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,8 @@
|
||||
//! This module provides the viewer functionality for connecting to remote
|
||||
//! GuruConnect sessions with low-level keyboard hooks for Win key capture.
|
||||
|
||||
#[cfg(windows)]
|
||||
mod decoder;
|
||||
mod input;
|
||||
mod render;
|
||||
mod transport;
|
||||
@@ -31,6 +33,72 @@ pub enum InputEvent {
|
||||
SpecialKey(proto::SpecialKeyEvent),
|
||||
}
|
||||
|
||||
/// Spawn the dedicated H.264 decode worker thread (Task 7, Windows only).
|
||||
///
|
||||
/// Returns a sender for `(h264_access_unit, pts_100ns)`. The worker lazily
|
||||
/// creates the Media Foundation decoder on the first frame; if creation fails it
|
||||
/// logs once and then silently drops subsequent frames (the raw render path is
|
||||
/// never affected). Each decoded frame is converted to BGRA and delivered to the
|
||||
/// viewer as an uncompressed `FrameData`, reusing the existing render path.
|
||||
#[cfg(windows)]
|
||||
fn spawn_h264_decode_worker(
|
||||
viewer_tx: mpsc::Sender<ViewerEvent>,
|
||||
) -> std::sync::mpsc::Sender<(Vec<u8>, i64)> {
|
||||
let (tx, rx) = std::sync::mpsc::channel::<(Vec<u8>, i64)>();
|
||||
|
||||
std::thread::Builder::new()
|
||||
.name("gc-h264-decode".to_string())
|
||||
.spawn(move || {
|
||||
let mut decoder: Option<decoder::H264Decoder> = None;
|
||||
let mut init_failed = false;
|
||||
|
||||
while let Ok((data, pts)) = rx.recv() {
|
||||
if init_failed {
|
||||
continue;
|
||||
}
|
||||
if decoder.is_none() {
|
||||
match decoder::H264Decoder::new() {
|
||||
Ok(d) => {
|
||||
info!("H.264 decoder initialized (Media Foundation)");
|
||||
decoder = Some(d);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"H.264 decoder init failed: {e:#}; H.264 frames will be dropped"
|
||||
);
|
||||
init_failed = true;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let dec = decoder.as_mut().expect("decoder present after init");
|
||||
match dec.decode(&data, pts) {
|
||||
Ok(Some(decoded)) => {
|
||||
let frame = render::FrameData {
|
||||
width: decoded.width,
|
||||
height: decoded.height,
|
||||
data: decoded.bgra,
|
||||
compressed: false, // already BGRA
|
||||
is_keyframe: false,
|
||||
};
|
||||
if viewer_tx.blocking_send(ViewerEvent::Frame(frame)).is_err() {
|
||||
// Viewer closed; stop the worker.
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(None) => { /* decoder buffering; no output this tick */ }
|
||||
Err(e) => {
|
||||
warn!("H.264 decode error: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.expect("failed to spawn H.264 decode worker thread");
|
||||
|
||||
tx
|
||||
}
|
||||
|
||||
/// Run the viewer to connect to a remote session
|
||||
pub async fn run(server_url: &str, session_id: &str, api_key: &str) -> Result<()> {
|
||||
info!("GuruConnect Viewer starting");
|
||||
@@ -77,13 +145,23 @@ pub async fn run(server_url: &str, session_id: &str, api_key: &str) -> Result<()
|
||||
}
|
||||
});
|
||||
|
||||
// H.264 decode worker (Task 7, Windows only). The Media Foundation decoder
|
||||
// wraps COM interfaces with thread affinity, so it runs on a DEDICATED OS
|
||||
// thread (not a tokio task, which can migrate across workers at await
|
||||
// points). The receive task forwards H.264 access units to it over a std
|
||||
// channel; the worker decodes to BGRA and pushes a FrameData back through
|
||||
// the viewer channel via `blocking_send`. On decoder-init failure the worker
|
||||
// logs and drops H.264 frames (the raw path is unaffected).
|
||||
#[cfg(windows)]
|
||||
let h264_tx = spawn_h264_decode_worker(viewer_tx.clone());
|
||||
|
||||
// Spawn task to receive messages from server
|
||||
let viewer_tx_recv = viewer_tx.clone();
|
||||
let receive_task = tokio::spawn(async move {
|
||||
while let Some(msg) = ws_receiver.recv().await {
|
||||
match msg.payload {
|
||||
Some(proto::message::Payload::VideoFrame(frame)) => {
|
||||
if let Some(proto::video_frame::Encoding::Raw(raw)) = frame.encoding {
|
||||
Some(proto::message::Payload::VideoFrame(frame)) => match frame.encoding {
|
||||
Some(proto::video_frame::Encoding::Raw(raw)) => {
|
||||
let frame_data = render::FrameData {
|
||||
width: raw.width as u32,
|
||||
height: raw.height as u32,
|
||||
@@ -93,7 +171,23 @@ pub async fn run(server_url: &str, session_id: &str, api_key: &str) -> Result<()
|
||||
};
|
||||
let _ = viewer_tx_recv.send(ViewerEvent::Frame(frame_data)).await;
|
||||
}
|
||||
}
|
||||
Some(proto::video_frame::Encoding::H264(enc)) => {
|
||||
// Forward to the decode worker (Windows). On other
|
||||
// platforms H.264 is never negotiated, so this is dead.
|
||||
#[cfg(windows)]
|
||||
{
|
||||
if h264_tx.send((enc.data, enc.pts)).is_err() {
|
||||
warn!("H.264 decode worker unavailable; dropping frame");
|
||||
}
|
||||
}
|
||||
#[cfg(not(windows))]
|
||||
{
|
||||
let _ = enc;
|
||||
}
|
||||
}
|
||||
// VP9/H265 not implemented on the viewer (raw + H.264 only).
|
||||
_ => {}
|
||||
},
|
||||
Some(proto::message::Payload::CursorPosition(pos)) => {
|
||||
let _ = viewer_tx_recv
|
||||
.send(ViewerEvent::CursorPosition(pos.x, pos.y, pos.visible))
|
||||
|
||||
Reference in New Issue
Block a user