All checks were successful
The native viewer's H.264 path (Task 7 first-cut, compile-verified only) never rendered a frame. Three stacked bugs, all confirmed via live loopback: 1. decoder: MF_E_NOTACCEPTING (0xC00D36B5) was treated as fatal and only one output was drained per call, so once the MFT filled it rejected every subsequent frame. decode() now returns Vec<DecodedFrame>, drains on back-pressure and retries the unconsumed sample, then drains all ready outputs. 2. decoder: the NV12 output type was hand-built and rejected by the MS H.264 decoder MFT (MF_E_TRANSFORM_TYPE_NOT_SET, 0xC00D6D60). It is now negotiated by enumerating GetOutputAvailableType on STREAM_CHANGE / TYPE_NOT_SET. 3. render: a manual pump_messages() in about_to_wait stole winit's own thread messages and froze the event loop after one iteration, so frames were never drained from the channel. Removed; winit's run_app pump already services the WH_KEYBOARD_LL hook. Validated on a 5070 loopback: 0 decode errors, frames decode/paint/present (present count 0 -> 1740). Reviewed (APPROVE-WITH-NITS); diagnostics stripped. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
227 lines
8.8 KiB
Rust
227 lines
8.8 KiB
Rust
//! Viewer module - Native remote desktop viewer with full keyboard capture
|
|
//!
|
|
//! This module provides the viewer functionality for connecting to remote
|
|
//! GuruConnect sessions with low-level keyboard hooks for Win key capture.
|
|
|
|
#[cfg(windows)]
|
|
mod decoder;
|
|
mod input;
|
|
mod render;
|
|
mod transport;
|
|
|
|
use crate::proto;
|
|
use anyhow::Result;
|
|
use std::sync::Arc;
|
|
use tokio::sync::{mpsc, Mutex};
|
|
use tracing::{error, info, warn};
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub enum ViewerEvent {
|
|
Connected,
|
|
Disconnected(String),
|
|
Frame(render::FrameData),
|
|
CursorPosition(i32, i32, bool),
|
|
CursorShape(proto::CursorShape),
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub enum InputEvent {
|
|
Mouse(proto::MouseEvent),
|
|
Key(proto::KeyEvent),
|
|
// Not yet emitted by the viewer input path (special-key fidelity is pending).
|
|
#[allow(dead_code)]
|
|
SpecialKey(proto::SpecialKeyEvent),
|
|
}
|
|
|
|
/// Spawn the dedicated H.264 decode worker thread (Task 7, Windows only).
|
|
///
|
|
/// Returns a sender for `(h264_access_unit, pts_100ns)`. The worker lazily
|
|
/// creates the Media Foundation decoder on the first frame; if creation fails it
|
|
/// logs once and then silently drops subsequent frames (the raw render path is
|
|
/// never affected). Each decoded frame is converted to BGRA and delivered to the
|
|
/// viewer as an uncompressed `FrameData`, reusing the existing render path.
|
|
#[cfg(windows)]
|
|
fn spawn_h264_decode_worker(
|
|
viewer_tx: mpsc::Sender<ViewerEvent>,
|
|
) -> std::sync::mpsc::Sender<(Vec<u8>, i64)> {
|
|
let (tx, rx) = std::sync::mpsc::channel::<(Vec<u8>, i64)>();
|
|
|
|
std::thread::Builder::new()
|
|
.name("gc-h264-decode".to_string())
|
|
.spawn(move || {
|
|
let mut decoder: Option<decoder::H264Decoder> = None;
|
|
let mut init_failed = false;
|
|
|
|
while let Ok((data, pts)) = rx.recv() {
|
|
if init_failed {
|
|
continue;
|
|
}
|
|
if decoder.is_none() {
|
|
match decoder::H264Decoder::new() {
|
|
Ok(d) => {
|
|
info!("H.264 decoder initialized (Media Foundation)");
|
|
decoder = Some(d);
|
|
}
|
|
Err(e) => {
|
|
error!(
|
|
"H.264 decoder init failed: {e:#}; H.264 frames will be dropped"
|
|
);
|
|
init_failed = true;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
let dec = decoder.as_mut().expect("decoder present after init");
|
|
match dec.decode(&data, pts) {
|
|
// One input access unit may yield zero, one, or more frames.
|
|
Ok(frames) => {
|
|
let mut viewer_closed = false;
|
|
for decoded in frames {
|
|
let frame = render::FrameData {
|
|
width: decoded.width,
|
|
height: decoded.height,
|
|
data: decoded.bgra,
|
|
compressed: false, // already BGRA
|
|
is_keyframe: false,
|
|
};
|
|
if viewer_tx.blocking_send(ViewerEvent::Frame(frame)).is_err() {
|
|
// Viewer closed; stop the worker.
|
|
viewer_closed = true;
|
|
break;
|
|
}
|
|
}
|
|
if viewer_closed {
|
|
break;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
warn!("H.264 decode error: {e:#}");
|
|
}
|
|
}
|
|
}
|
|
})
|
|
.expect("failed to spawn H.264 decode worker thread");
|
|
|
|
tx
|
|
}
|
|
|
|
/// Run the viewer to connect to a remote session
|
|
pub async fn run(server_url: &str, session_id: &str, api_key: &str) -> Result<()> {
|
|
info!("GuruConnect Viewer starting");
|
|
info!("Server: {}", server_url);
|
|
info!("Session: {}", session_id);
|
|
|
|
// Create channels for communication between components
|
|
let (viewer_tx, viewer_rx) = mpsc::channel::<ViewerEvent>(100);
|
|
let (input_tx, input_rx) = mpsc::channel::<InputEvent>(100);
|
|
|
|
// Connect to server
|
|
let ws_url = format!("{}?session_id={}", server_url, session_id);
|
|
info!("Connecting to {}", ws_url);
|
|
|
|
let (ws_sender, mut ws_receiver) = transport::connect(&ws_url, api_key).await?;
|
|
let ws_sender = Arc::new(Mutex::new(ws_sender));
|
|
|
|
info!("Connected to server");
|
|
let _ = viewer_tx.send(ViewerEvent::Connected).await;
|
|
|
|
// Clone sender for input forwarding
|
|
let ws_sender_input = ws_sender.clone();
|
|
|
|
// Spawn task to forward input events to server
|
|
let mut input_rx = input_rx;
|
|
let input_task = tokio::spawn(async move {
|
|
while let Some(event) = input_rx.recv().await {
|
|
let msg = match event {
|
|
InputEvent::Mouse(m) => proto::Message {
|
|
payload: Some(proto::message::Payload::MouseEvent(m)),
|
|
},
|
|
InputEvent::Key(k) => proto::Message {
|
|
payload: Some(proto::message::Payload::KeyEvent(k)),
|
|
},
|
|
InputEvent::SpecialKey(s) => proto::Message {
|
|
payload: Some(proto::message::Payload::SpecialKey(s)),
|
|
},
|
|
};
|
|
|
|
if let Err(e) = transport::send_message(&ws_sender_input, &msg).await {
|
|
error!("Failed to send input: {}", e);
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
|
|
// H.264 decode worker (Task 7, Windows only). The Media Foundation decoder
|
|
// wraps COM interfaces with thread affinity, so it runs on a DEDICATED OS
|
|
// thread (not a tokio task, which can migrate across workers at await
|
|
// points). The receive task forwards H.264 access units to it over a std
|
|
// channel; the worker decodes to BGRA and pushes a FrameData back through
|
|
// the viewer channel via `blocking_send`. On decoder-init failure the worker
|
|
// logs and drops H.264 frames (the raw path is unaffected).
|
|
#[cfg(windows)]
|
|
let h264_tx = spawn_h264_decode_worker(viewer_tx.clone());
|
|
|
|
// Spawn task to receive messages from server
|
|
let viewer_tx_recv = viewer_tx.clone();
|
|
let receive_task = tokio::spawn(async move {
|
|
while let Some(msg) = ws_receiver.recv().await {
|
|
match msg.payload {
|
|
Some(proto::message::Payload::VideoFrame(frame)) => match frame.encoding {
|
|
Some(proto::video_frame::Encoding::Raw(raw)) => {
|
|
let frame_data = render::FrameData {
|
|
width: raw.width as u32,
|
|
height: raw.height as u32,
|
|
data: raw.data,
|
|
compressed: raw.compressed,
|
|
is_keyframe: raw.is_keyframe,
|
|
};
|
|
let _ = viewer_tx_recv.send(ViewerEvent::Frame(frame_data)).await;
|
|
}
|
|
Some(proto::video_frame::Encoding::H264(enc)) => {
|
|
// Forward to the decode worker (Windows). On other
|
|
// platforms H.264 is never negotiated, so this is dead.
|
|
#[cfg(windows)]
|
|
{
|
|
if h264_tx.send((enc.data, enc.pts)).is_err() {
|
|
warn!("H.264 decode worker unavailable; dropping frame");
|
|
}
|
|
}
|
|
#[cfg(not(windows))]
|
|
{
|
|
let _ = enc;
|
|
}
|
|
}
|
|
// VP9/H265 not implemented on the viewer (raw + H.264 only).
|
|
_ => {}
|
|
},
|
|
Some(proto::message::Payload::CursorPosition(pos)) => {
|
|
let _ = viewer_tx_recv
|
|
.send(ViewerEvent::CursorPosition(pos.x, pos.y, pos.visible))
|
|
.await;
|
|
}
|
|
Some(proto::message::Payload::CursorShape(shape)) => {
|
|
let _ = viewer_tx_recv.send(ViewerEvent::CursorShape(shape)).await;
|
|
}
|
|
Some(proto::message::Payload::Disconnect(d)) => {
|
|
warn!("Server disconnected: {}", d.reason);
|
|
let _ = viewer_tx_recv
|
|
.send(ViewerEvent::Disconnected(d.reason))
|
|
.await;
|
|
break;
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
});
|
|
|
|
// Run the window (this blocks until window closes)
|
|
render::run_window(viewer_rx, input_tx).await?;
|
|
|
|
// Cleanup
|
|
input_task.abort();
|
|
receive_task.abort();
|
|
|
|
Ok(())
|
|
}
|