Files
guru-connect/agent/src/viewer/mod.rs
Mike Swanson 97780304e7
All checks were successful
Build and Test / Build Agent (Windows) (push) Successful in 7m2s
Build and Test / Build Server (Linux) (push) Successful in 10m24s
Build and Test / Security Audit (push) Successful in 4m15s
Build and Test / Build Summary (push) Successful in 9s
fix(agent): make native H.264 viewer render live frames
The native viewer's H.264 path (Task 7 first-cut, compile-verified only)
never rendered a frame. Three stacked bugs, all confirmed via live loopback:

1. decoder: MF_E_NOTACCEPTING (0xC00D36B5) was treated as fatal and only
   one output was drained per call, so once the MFT filled it rejected
   every subsequent frame. decode() now returns Vec<DecodedFrame>, drains
   on back-pressure and retries the unconsumed sample, then drains all
   ready outputs.
2. decoder: the NV12 output type was hand-built and rejected by the MS
   H.264 decoder MFT (MF_E_TRANSFORM_TYPE_NOT_SET, 0xC00D6D60). It is now
   negotiated by enumerating GetOutputAvailableType on STREAM_CHANGE /
   TYPE_NOT_SET.
3. render: a manual pump_messages() in about_to_wait stole winit's own
   thread messages and froze the event loop after one iteration, so frames
   were never drained from the channel. Removed; winit's run_app pump
   already services the WH_KEYBOARD_LL hook.

Validated on a 5070 loopback: 0 decode errors, frames decode/paint/present
(present count 0 -> 1740). Reviewed (APPROVE-WITH-NITS); diagnostics stripped.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-31 11:25:05 -07:00

227 lines
8.8 KiB
Rust

//! Viewer module - Native remote desktop viewer with full keyboard capture
//!
//! This module provides the viewer functionality for connecting to remote
//! GuruConnect sessions with low-level keyboard hooks for Win key capture.
#[cfg(windows)]
mod decoder;
mod input;
mod render;
mod transport;
use crate::proto;
use anyhow::Result;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tracing::{error, info, warn};
#[derive(Debug, Clone)]
pub enum ViewerEvent {
Connected,
Disconnected(String),
Frame(render::FrameData),
CursorPosition(i32, i32, bool),
CursorShape(proto::CursorShape),
}
#[derive(Debug, Clone)]
pub enum InputEvent {
Mouse(proto::MouseEvent),
Key(proto::KeyEvent),
// Not yet emitted by the viewer input path (special-key fidelity is pending).
#[allow(dead_code)]
SpecialKey(proto::SpecialKeyEvent),
}
/// Spawn the dedicated H.264 decode worker thread (Task 7, Windows only).
///
/// Returns a sender for `(h264_access_unit, pts_100ns)`. The worker lazily
/// creates the Media Foundation decoder on the first frame; if creation fails it
/// logs once and then silently drops subsequent frames (the raw render path is
/// never affected). Each decoded frame is converted to BGRA and delivered to the
/// viewer as an uncompressed `FrameData`, reusing the existing render path.
#[cfg(windows)]
fn spawn_h264_decode_worker(
viewer_tx: mpsc::Sender<ViewerEvent>,
) -> std::sync::mpsc::Sender<(Vec<u8>, i64)> {
let (tx, rx) = std::sync::mpsc::channel::<(Vec<u8>, i64)>();
std::thread::Builder::new()
.name("gc-h264-decode".to_string())
.spawn(move || {
let mut decoder: Option<decoder::H264Decoder> = None;
let mut init_failed = false;
while let Ok((data, pts)) = rx.recv() {
if init_failed {
continue;
}
if decoder.is_none() {
match decoder::H264Decoder::new() {
Ok(d) => {
info!("H.264 decoder initialized (Media Foundation)");
decoder = Some(d);
}
Err(e) => {
error!(
"H.264 decoder init failed: {e:#}; H.264 frames will be dropped"
);
init_failed = true;
continue;
}
}
}
let dec = decoder.as_mut().expect("decoder present after init");
match dec.decode(&data, pts) {
// One input access unit may yield zero, one, or more frames.
Ok(frames) => {
let mut viewer_closed = false;
for decoded in frames {
let frame = render::FrameData {
width: decoded.width,
height: decoded.height,
data: decoded.bgra,
compressed: false, // already BGRA
is_keyframe: false,
};
if viewer_tx.blocking_send(ViewerEvent::Frame(frame)).is_err() {
// Viewer closed; stop the worker.
viewer_closed = true;
break;
}
}
if viewer_closed {
break;
}
}
Err(e) => {
warn!("H.264 decode error: {e:#}");
}
}
}
})
.expect("failed to spawn H.264 decode worker thread");
tx
}
/// Run the viewer to connect to a remote session
pub async fn run(server_url: &str, session_id: &str, api_key: &str) -> Result<()> {
info!("GuruConnect Viewer starting");
info!("Server: {}", server_url);
info!("Session: {}", session_id);
// Create channels for communication between components
let (viewer_tx, viewer_rx) = mpsc::channel::<ViewerEvent>(100);
let (input_tx, input_rx) = mpsc::channel::<InputEvent>(100);
// Connect to server
let ws_url = format!("{}?session_id={}", server_url, session_id);
info!("Connecting to {}", ws_url);
let (ws_sender, mut ws_receiver) = transport::connect(&ws_url, api_key).await?;
let ws_sender = Arc::new(Mutex::new(ws_sender));
info!("Connected to server");
let _ = viewer_tx.send(ViewerEvent::Connected).await;
// Clone sender for input forwarding
let ws_sender_input = ws_sender.clone();
// Spawn task to forward input events to server
let mut input_rx = input_rx;
let input_task = tokio::spawn(async move {
while let Some(event) = input_rx.recv().await {
let msg = match event {
InputEvent::Mouse(m) => proto::Message {
payload: Some(proto::message::Payload::MouseEvent(m)),
},
InputEvent::Key(k) => proto::Message {
payload: Some(proto::message::Payload::KeyEvent(k)),
},
InputEvent::SpecialKey(s) => proto::Message {
payload: Some(proto::message::Payload::SpecialKey(s)),
},
};
if let Err(e) = transport::send_message(&ws_sender_input, &msg).await {
error!("Failed to send input: {}", e);
break;
}
}
});
// H.264 decode worker (Task 7, Windows only). The Media Foundation decoder
// wraps COM interfaces with thread affinity, so it runs on a DEDICATED OS
// thread (not a tokio task, which can migrate across workers at await
// points). The receive task forwards H.264 access units to it over a std
// channel; the worker decodes to BGRA and pushes a FrameData back through
// the viewer channel via `blocking_send`. On decoder-init failure the worker
// logs and drops H.264 frames (the raw path is unaffected).
#[cfg(windows)]
let h264_tx = spawn_h264_decode_worker(viewer_tx.clone());
// Spawn task to receive messages from server
let viewer_tx_recv = viewer_tx.clone();
let receive_task = tokio::spawn(async move {
while let Some(msg) = ws_receiver.recv().await {
match msg.payload {
Some(proto::message::Payload::VideoFrame(frame)) => match frame.encoding {
Some(proto::video_frame::Encoding::Raw(raw)) => {
let frame_data = render::FrameData {
width: raw.width as u32,
height: raw.height as u32,
data: raw.data,
compressed: raw.compressed,
is_keyframe: raw.is_keyframe,
};
let _ = viewer_tx_recv.send(ViewerEvent::Frame(frame_data)).await;
}
Some(proto::video_frame::Encoding::H264(enc)) => {
// Forward to the decode worker (Windows). On other
// platforms H.264 is never negotiated, so this is dead.
#[cfg(windows)]
{
if h264_tx.send((enc.data, enc.pts)).is_err() {
warn!("H.264 decode worker unavailable; dropping frame");
}
}
#[cfg(not(windows))]
{
let _ = enc;
}
}
// VP9/H265 not implemented on the viewer (raw + H.264 only).
_ => {}
},
Some(proto::message::Payload::CursorPosition(pos)) => {
let _ = viewer_tx_recv
.send(ViewerEvent::CursorPosition(pos.x, pos.y, pos.visible))
.await;
}
Some(proto::message::Payload::CursorShape(shape)) => {
let _ = viewer_tx_recv.send(ViewerEvent::CursorShape(shape)).await;
}
Some(proto::message::Payload::Disconnect(d)) => {
warn!("Server disconnected: {}", d.reason);
let _ = viewer_tx_recv
.send(ViewerEvent::Disconnected(d.reason))
.await;
break;
}
_ => {}
}
}
});
// Run the window (this blocks until window closes)
render::run_window(viewer_rx, input_tx).await?;
// Cleanup
input_task.abort();
receive_task.abort();
Ok(())
}