//! WebSocket relay handlers //! //! Handles WebSocket connections from agents and viewers, //! relaying video frames and input events between them. use axum::{ extract::{ ws::{Message, WebSocket, WebSocketUpgrade}, ConnectInfo, Query, State, }, http::StatusCode, response::IntoResponse, }; use futures_util::{SinkExt, StreamExt}; use prost::Message as ProstMessage; use serde::Deserialize; use std::net::SocketAddr; use tracing::{error, info, warn}; use uuid::Uuid; use crate::auth::ViewerAccess; use crate::db::{self, Database}; use crate::proto; use crate::session::SessionManager; use crate::AppState; /// Maximum size of a single inbound WebSocket message on the AGENT plane. /// /// Agents stream encoded video frames; a single frame (full-screen keyframe at /// high resolution, raw/Zstd or H.264) must fit, but anything beyond a few MB is /// hostile. 4 MiB is comfortably above a realistic worst-case frame while /// bounding the per-frame `to_vec()` + broadcast allocation. Frames larger than /// this are rejected by the WebSocket layer before reaching `to_vec()`/broadcast. /// (Closes the WS-OOM HIGH on the agent plane.) const AGENT_WS_MAX_MESSAGE_BYTES: usize = 4 * 1024 * 1024; /// Maximum size of a single inbound WebSocket message on the VIEWER plane. /// /// Viewers send only small control messages (mouse/key events, chat). 64 KiB is /// already generous for these; a viewer has no legitimate reason to push large /// payloads up the relay. (Closes the WS-OOM HIGH on the viewer plane and bounds /// the input path together with the rate limiter below.) const VIEWER_WS_MAX_MESSAGE_BYTES: usize = 64 * 1024; /// Maximum viewer→agent input events forwarded per second, per viewer /// connection. A human technician generates well under this; the cap exists to /// stop a compromised/hostile viewer from flooding the target with injected /// input (`SendInput`). Excess events are DROPPED (coalesced away), never /// buffered unboundedly. (Closes the input-injection MEDIUM.) const VIEWER_INPUT_EVENTS_PER_SEC: u32 = 200; #[derive(Debug, Deserialize)] pub struct AgentParams { agent_id: String, #[serde(default)] agent_name: Option, #[serde(default)] support_code: Option, #[serde(default)] hostname: Option, /// API key for persistent (managed) agents #[serde(default)] api_key: Option, } #[derive(Debug, Deserialize)] pub struct ViewerParams { session_id: String, #[serde(default = "default_viewer_name")] viewer_name: String, /// JWT token for authentication (required) #[serde(default)] token: Option, } fn default_viewer_name() -> String { "Technician".to_string() } /// WebSocket handler for agent connections pub async fn agent_ws_handler( ws: WebSocketUpgrade, State(state): State, ConnectInfo(addr): ConnectInfo, Query(params): Query, ) -> Result { // The CLIENT-SUPPLIED agent_id. For a per-agent-key (managed) agent this is // untrusted until it is reconciled against the key's machine identity below; // for a support-code/attended agent it is used as-is. `agent_id` is rebound // to the AUTHENTICATED identity for `cak_`-keyed agents (Task 3 binding). let mut agent_id = params.agent_id.clone(); let agent_name = params .hostname .clone() .or(params.agent_name.clone()) .unwrap_or_else(|| agent_id.clone()); let support_code = params.support_code.clone(); let api_key = params.api_key.clone(); let client_ip = addr.ip(); // SECURITY: Agent must provide either a support code OR an API key // Support code = ad-hoc support session (technician generated code) // API key = persistent managed agent if support_code.is_none() && api_key.is_none() { warn!( "Agent connection rejected: {} from {} - no support code or API key", agent_id, client_ip ); // Log failed connection attempt to database if let Some(ref db) = state.db { let _ = db::events::log_event( db.pool(), Uuid::new_v4(), // Temporary UUID for failed attempt db::events::EventTypes::CONNECTION_REJECTED_NO_AUTH, None, Some(&agent_id), Some(serde_json::json!({ "reason": "no_auth_method", "agent_id": agent_id })), Some(client_ip), ) .await; } return Err(StatusCode::UNAUTHORIZED); } // Validate support code if provided if let Some(ref code) = support_code { // Check if it's a valid, pending support code let code_info = state.support_codes.get_status(code).await; if code_info.is_none() { warn!( "Agent connection rejected: {} from {} - invalid support code {}", agent_id, client_ip, code ); // Log failed connection attempt if let Some(ref db) = state.db { let _ = db::events::log_event( db.pool(), Uuid::new_v4(), db::events::EventTypes::CONNECTION_REJECTED_INVALID_CODE, None, Some(&agent_id), Some(serde_json::json!({ "reason": "invalid_code", "support_code": code, "agent_id": agent_id })), Some(client_ip), ) .await; } return Err(StatusCode::UNAUTHORIZED); } let status = code_info.unwrap(); if status != "pending" && status != "connected" { warn!( "Agent connection rejected: {} from {} - support code {} has status {}", agent_id, client_ip, code, status ); // Log failed connection attempt (expired/cancelled code) if let Some(ref db) = state.db { let event_type = if status == "cancelled" { db::events::EventTypes::CONNECTION_REJECTED_CANCELLED_CODE } else { db::events::EventTypes::CONNECTION_REJECTED_EXPIRED_CODE }; let _ = db::events::log_event( db.pool(), Uuid::new_v4(), event_type, None, Some(&agent_id), Some(serde_json::json!({ "reason": status, "support_code": code, "agent_id": agent_id })), Some(client_ip), ) .await; } return Err(StatusCode::UNAUTHORIZED); } info!( "Agent {} from {} authenticated via support code {}", agent_id, client_ip, code ); } // Validate API key if provided (for persistent agents) if let Some(ref key) = api_key { // Agent-plane auth ONLY: a per-agent `cak_` key (hash-compared against // connect_agent_keys, rejecting revoked) or the deprecated shared // AGENT_API_KEY fallback. A dashboard/user JWT is NEVER accepted here. match validate_agent_api_key(&state, key).await { AgentKeyAuth::PerAgentKey(Some(trusted_agent_id)) => { // IDENTITY BINDING (Task 3): persistent reattach must bind to the // authenticated machine identity, NOT a client-supplied agent_id. // If the client claimed a different agent_id, ignore the claim and // use the key's machine identity — a valid key for machine X can // never seize machine Y's persistent session slot. if trusted_agent_id != agent_id { warn!( "Agent from {} presented agent_id '{}' but its key authenticates \ machine '{}'; binding to the authenticated identity", client_ip, agent_id, trusted_agent_id ); } agent_id = trusted_agent_id; info!( "Agent {} from {} authenticated via per-agent key", agent_id, client_ip ); } AgentKeyAuth::PerAgentKey(None) => { // Key verified but the owning machine could not be resolved // (transient DB error). Fail closed rather than reattach to an // unverified slot. warn!( "Agent connection rejected from {}: key authenticated but machine \ identity could not be resolved", client_ip ); return Err(StatusCode::SERVICE_UNAVAILABLE); } AgentKeyAuth::SharedKey => { // Deprecated shared key: no per-agent identity; legacy behavior // uses the client-supplied agent_id as-is. info!( "Agent {} from {} authenticated via DEPRECATED shared API key", agent_id, client_ip ); } AgentKeyAuth::Invalid => { warn!( "Agent connection rejected: {} from {} - invalid API key", agent_id, client_ip ); // Log failed connection attempt if let Some(ref db) = state.db { let _ = db::events::log_event( db.pool(), Uuid::new_v4(), db::events::EventTypes::CONNECTION_REJECTED_INVALID_API_KEY, None, Some(&agent_id), Some(serde_json::json!({ "reason": "invalid_api_key", "agent_id": agent_id })), Some(client_ip), ) .await; } return Err(StatusCode::UNAUTHORIZED); } } } let sessions = state.sessions.clone(); let support_codes = state.support_codes.clone(); let db = state.db.clone(); // Bounded relay: cap inbound frame/message size before the socket is upgraded // so oversized agent frames are rejected by the WS layer, never `to_vec()`'d // and broadcast. (WS-OOM HIGH.) let ws = ws .max_message_size(AGENT_WS_MAX_MESSAGE_BYTES) .max_frame_size(AGENT_WS_MAX_MESSAGE_BYTES); Ok(ws.on_upgrade(move |socket| { handle_agent_connection( socket, sessions, support_codes, db, agent_id, agent_name, support_code, Some(client_ip), ) })) } /// Outcome of validating an agent key presented on the agent plane. enum AgentKeyAuth { /// Authenticated by a per-agent `cak_` key. Carries the TRUSTED machine /// identity (the canonical `agent_id` of the machine the key belongs to), /// resolved from `connect_machines` via the key's `machine_id`. The WS /// handler binds persistent reattach to THIS identity, not the client's /// query-string `agent_id`. /// /// `None` inside means the key verified but the owning machine row could not /// be resolved (e.g. transient DB error) — treated as authenticated but with /// no trusted identity to bind, so the connection is rejected rather than /// allowed to reattach to an unverified slot. PerAgentKey(Option), /// Authenticated by the DEPRECATED shared `AGENT_API_KEY`. No per-agent /// identity is established; the client-supplied `agent_id` is used as-is /// (legacy behavior, sunset path). SharedKey, /// Not a valid agent key. Invalid, } /// Validate an agent key presented on the agent plane. /// /// SECURITY (v2): a dashboard/user JWT is NEVER a valid agent credential — the /// old `jwt_config.validate_token` branch was the relay CRITICAL and is gone. /// Accepts, in order: /// 1. A per-agent `cak_` key: SHA-256 hash compared against /// `connect_agent_keys`; revoked keys are rejected (the DB query filters /// `revoked_at IS NULL`). This is the supported path. On success the /// owning machine's canonical `agent_id` is resolved and returned so the /// handler can bind the session to the AUTHENTICATED identity. /// 2. The shared `AGENT_API_KEY` env value — DEPRECATED fallback, retained /// only for not-yet-migrated agents. Its use is logged at WARNING and it /// should be removed once all managed agents carry per-agent keys. /// /// Never logs the presented key or any hash. async fn validate_agent_api_key(state: &AppState, api_key: &str) -> AgentKeyAuth { // 1. Per-agent key (the supported path). Requires a database; without one, // only the deprecated shared-key fallback below can apply. if let Some(ref db) = state.db { if let Some(machine_id) = crate::auth::agent_keys::verify_agent_key(db.pool(), api_key).await { // Resolve the trusted identity from the authenticated key's machine. let trusted_agent_id = match db::machines::get_machine_by_id(db.pool(), machine_id) .await { Ok(Some(machine)) => Some(machine.agent_id), Ok(None) => None, Err(e) => { tracing::error!("Failed to resolve machine for authenticated agent key: {}", e); None } }; return AgentKeyAuth::PerAgentKey(trusted_agent_id); } } // 2. DEPRECATED shared-key fallback. Constant-time-ish equality is not // critical here (the key is high-entropy and this path is sunset), but // we still avoid logging the value. if let Some(ref configured_key) = state.agent_api_key { if api_key == configured_key { warn!( "[WARNING] Agent authenticated via the DEPRECATED shared AGENT_API_KEY \ fallback. Migrate this agent to a per-agent cak_ key; the shared key \ will be removed." ); return AgentKeyAuth::SharedKey; } } AgentKeyAuth::Invalid } /// WebSocket handler for viewer connections pub async fn viewer_ws_handler( ws: WebSocketUpgrade, State(state): State, ConnectInfo(addr): ConnectInfo, Query(params): Query, ) -> Result { let client_ip = addr.ip(); // The session the viewer is asking to join. Parse early — a malformed UUID // can never match a viewer token's `session_id` claim, so reject up front. let requested_session_id = Uuid::parse_str(¶ms.session_id).map_err(|_| { warn!( "Viewer connection rejected from {}: invalid session_id", client_ip ); StatusCode::BAD_REQUEST })?; // Require a session-scoped VIEWER token (minted by `mint_viewer_token`), // NOT a raw login JWT. This is the v2 mechanism that, together with the // authorization gate at mint time, closes the any-JWT-joins-any-session and // blacklist-bypass CRITICALs. let token = params.token.ok_or_else(|| { warn!( "Viewer connection rejected from {}: missing viewer token", client_ip ); StatusCode::UNAUTHORIZED })?; // 1. Signature + expiry + `purpose == "viewer"`. A login JWT fails this // (wrong claim shape / no `purpose`), so login tokens are no longer // accepted on the viewer plane. let claims = state.jwt_config.validate_viewer_token(&token).map_err(|e| { warn!( "Viewer connection rejected from {}: invalid viewer token: {}", client_ip, e ); StatusCode::UNAUTHORIZED })?; // 2. Revocation check on the WS plane (CRITICAL #2): a logged-out / revoked // token must not grant live remote control even before natural expiry. if state.token_blacklist.is_revoked(&token).await { warn!( "Viewer connection rejected from {}: viewer token revoked", client_ip ); return Err(StatusCode::UNAUTHORIZED); } // 3. Bind the token to THIS session (CRITICAL #1): the token's `session_id` // claim must equal the session being joined. A token minted for session A // cannot be replayed against session B. let claim_session_id = claims.session_uuid().map_err(|e| { warn!( "Viewer connection rejected from {}: malformed session_id claim: {}", client_ip, e ); StatusCode::UNAUTHORIZED })?; if claim_session_id != requested_session_id { warn!( "Viewer connection rejected from {}: token session mismatch \ (token scoped to a different session than requested {})", client_ip, requested_session_id ); return Err(StatusCode::FORBIDDEN); } // The access mode comes from the VERIFIED token claims (signed; cannot be // forged or upgraded by the client). The relay enforces it: a view-only token // has its input silently dropped, a control token forwards input as today. let access = claims.access; info!( "Viewer (user {}) authenticated via session-scoped {} token for session {} from {}", claims.sub, access.as_str(), requested_session_id, client_ip ); let session_id = params.session_id; let viewer_name = params.viewer_name; let sessions = state.sessions.clone(); let db = state.db.clone(); // Bounded relay: cap inbound message size on the viewer plane. Viewers send // only small control messages, so a tight cap both prevents OOM and bounds // the input path. (WS-OOM HIGH.) let ws = ws .max_message_size(VIEWER_WS_MAX_MESSAGE_BYTES) .max_frame_size(VIEWER_WS_MAX_MESSAGE_BYTES); Ok(ws.on_upgrade(move |socket| { handle_viewer_connection( socket, sessions, db, session_id, viewer_name, access, Some(client_ip), ) })) } /// Handle an agent WebSocket connection #[allow(clippy::too_many_arguments)] // signature mirrors the relay/session protocol contract; refactor into a params struct tracked in docs/specs/native-remote-control/ async fn handle_agent_connection( socket: WebSocket, sessions: SessionManager, support_codes: crate::support_codes::SupportCodeManager, db: Option, agent_id: String, agent_name: String, support_code: Option, client_ip: Option, ) { info!( "Agent connected: {} ({}) from {:?}", agent_name, agent_id, client_ip ); let (mut ws_sender, mut ws_receiver) = socket.split(); // If a support code was provided, check if it's valid if let Some(ref code) = support_code { // Check if the code is cancelled or invalid if support_codes.is_cancelled(code).await { warn!("Agent tried to connect with cancelled code: {}", code); // Send disconnect message to agent let disconnect_msg = proto::Message { payload: Some(proto::message::Payload::Disconnect(proto::Disconnect { reason: "Support session was cancelled by technician".to_string(), })), }; let mut buf = Vec::new(); if prost::Message::encode(&disconnect_msg, &mut buf).is_ok() { let _ = ws_sender.send(Message::Binary(buf)).await; } let _ = ws_sender.close().await; return; } } // Register the agent and get channels // Persistent agents (no support code) keep their session when disconnected let is_persistent = support_code.is_none(); let (session_id, frame_tx, mut input_rx) = sessions .register_agent(agent_id.clone(), agent_name.clone(), is_persistent) .await; info!("Session created: {} (agent in idle mode)", session_id); // Database: upsert machine and create session record let _machine_id = if let Some(ref db) = db { match db::machines::upsert_machine(db.pool(), &agent_id, &agent_name, is_persistent).await { Ok(machine) => { // Create session record let _ = db::sessions::create_session( db.pool(), session_id, machine.id, support_code.is_some(), support_code.as_deref(), ) .await; // Log session started event let _ = db::events::log_event( db.pool(), session_id, db::events::EventTypes::SESSION_STARTED, None, None, None, client_ip, ) .await; Some(machine.id) } Err(e) => { warn!("Failed to upsert machine in database: {}", e); None } } } else { None }; // If a support code was provided, mark it as connected if let Some(ref code) = support_code { info!("Linking support code {} to session {}", code, session_id); support_codes .mark_connected(code, Some(agent_name.clone()), Some(agent_id.clone())) .await; support_codes.link_session(code, session_id).await; // Database: update support code if let Some(ref db) = db { let _ = db::support_codes::mark_code_connected( db.pool(), code, Some(session_id), Some(&agent_name), Some(&agent_id), ) .await; } } // Use Arc for sender so we can use it from multiple places let ws_sender = std::sync::Arc::new(tokio::sync::Mutex::new(ws_sender)); let ws_sender_input = ws_sender.clone(); let ws_sender_cancel = ws_sender.clone(); // Task to forward input events from viewers to agent let input_forward = tokio::spawn(async move { while let Some(input_data) = input_rx.recv().await { let mut sender = ws_sender_input.lock().await; if sender.send(Message::Binary(input_data)).await.is_err() { break; } } }); let sessions_cleanup = sessions.clone(); let sessions_status = sessions.clone(); let support_codes_cleanup = support_codes.clone(); let support_code_cleanup = support_code.clone(); let support_code_check = support_code.clone(); let support_codes_check = support_codes.clone(); // Task to check for cancellation every 2 seconds let cancel_check = tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(2)); loop { interval.tick().await; if let Some(ref code) = support_code_check { if support_codes_check.is_cancelled(code).await { info!("Support code {} was cancelled, disconnecting agent", code); // Send disconnect message let disconnect_msg = proto::Message { payload: Some(proto::message::Payload::Disconnect(proto::Disconnect { reason: "Support session was cancelled by technician".to_string(), })), }; let mut buf = Vec::new(); if prost::Message::encode(&disconnect_msg, &mut buf).is_ok() { let mut sender = ws_sender_cancel.lock().await; let _ = sender.send(Message::Binary(buf)).await; let _ = sender.close().await; } break; } } } }); // Main loop: receive messages from agent while let Some(msg) = ws_receiver.next().await { match msg { Ok(Message::Binary(data)) => { // Try to decode as protobuf message match proto::Message::decode(data.as_ref()) { Ok(proto_msg) => { match &proto_msg.payload { Some(proto::message::Payload::VideoFrame(_)) => { // Broadcast frame to all viewers (only sent when streaming) let _ = frame_tx.send(data.to_vec()); } Some(proto::message::Payload::ChatMessage(chat)) => { // Broadcast chat message to all viewers info!("Chat from client: {}", chat.content); let _ = frame_tx.send(data.to_vec()); } Some(proto::message::Payload::AgentStatus(status)) => { // Update session with agent status let agent_version = if status.agent_version.is_empty() { None } else { Some(status.agent_version.clone()) }; let organization = if status.organization.is_empty() { None } else { Some(status.organization.clone()) }; let site = if status.site.is_empty() { None } else { Some(status.site.clone()) }; sessions_status .update_agent_status( session_id, Some(status.os_version.clone()), status.is_elevated, status.uptime_secs, status.display_count, status.is_streaming, agent_version.clone(), organization.clone(), site.clone(), status.tags.clone(), ) .await; // Update version in database if present if let (Some(ref db), Some(ref version)) = (&db, &agent_version) { let _ = crate::db::releases::update_machine_version( db.pool(), &agent_id, version, ) .await; } // Update organization/site/tags in database if present if let Some(ref db) = db { let _ = crate::db::machines::update_machine_metadata( db.pool(), &agent_id, organization.as_deref(), site.as_deref(), &status.tags, ) .await; } info!("Agent status update: {} - streaming={}, uptime={}s, version={:?}, org={:?}, site={:?}", status.hostname, status.is_streaming, status.uptime_secs, agent_version, organization, site); } Some(proto::message::Payload::Heartbeat(_)) => { // Update heartbeat timestamp sessions_status.update_heartbeat(session_id).await; } Some(proto::message::Payload::HeartbeatAck(_)) => { // Agent acknowledged our heartbeat sessions_status.update_heartbeat(session_id).await; } _ => {} } } Err(e) => { warn!("Failed to decode agent message: {}", e); } } } Ok(Message::Close(_)) => { info!("Agent disconnected: {}", agent_id); break; } Ok(Message::Ping(data)) => { // Pong is handled automatically by axum let _ = data; } Ok(_) => {} Err(e) => { error!("WebSocket error from agent {}: {}", agent_id, e); break; } } } // Cleanup input_forward.abort(); cancel_check.abort(); // Mark agent as disconnected (persistent agents stay in list as offline) sessions_cleanup.mark_agent_disconnected(session_id).await; // Database: end session and mark machine offline if let Some(ref db) = db { // End the session record let _ = db::sessions::end_session(db.pool(), session_id, "ended").await; // Mark machine as offline let _ = db::machines::mark_machine_offline(db.pool(), &agent_id).await; // Log session ended event let _ = db::events::log_event( db.pool(), session_id, db::events::EventTypes::SESSION_ENDED, None, None, None, client_ip, ) .await; } // Mark support code as completed if one was used (unless cancelled) if let Some(ref code) = support_code_cleanup { if !support_codes_cleanup.is_cancelled(code).await { support_codes_cleanup.mark_completed(code).await; // Database: mark code as completed if let Some(ref db) = db { let _ = db::support_codes::mark_code_completed(db.pool(), code).await; } info!("Support code {} marked as completed", code); } } info!("Session {} ended", session_id); } /// Handle a viewer WebSocket connection /// /// `access` is the VERIFIED access mode from the viewer token's signed claims. /// For [`ViewerAccess::ViewOnly`] the relay refuses to forward ANY input event /// to the agent (video still streams out); for [`ViewerAccess::Control`] input /// is forwarded subject to the per-viewer rate throttle below. #[allow(clippy::too_many_arguments)] // signature mirrors the relay/session protocol contract; refactor into a params struct tracked in docs/specs/native-remote-control/ async fn handle_viewer_connection( socket: WebSocket, sessions: SessionManager, db: Option, session_id_str: String, viewer_name: String, access: ViewerAccess, client_ip: Option, ) { // Parse session ID let session_id = match uuid::Uuid::parse_str(&session_id_str) { Ok(id) => id, Err(_) => { warn!("Invalid session ID: {}", session_id_str); return; } }; // Generate unique viewer ID let viewer_id = Uuid::new_v4().to_string(); // Join the session (this sends StartStream to agent if first viewer) let (mut frame_rx, input_tx) = match sessions .join_session(session_id, viewer_id.clone(), viewer_name.clone()) .await { Some(channels) => channels, None => { warn!("Session not found: {}", session_id); return; } }; info!( "Viewer {} ({}) joined session: {} from {:?}", viewer_name, viewer_id, session_id, client_ip ); // Database: log viewer joined event if let Some(ref db) = db { let _ = db::events::log_event( db.pool(), session_id, db::events::EventTypes::VIEWER_JOINED, Some(&viewer_id), Some(&viewer_name), None, client_ip, ) .await; } let (mut ws_sender, mut ws_receiver) = socket.split(); // Task to forward frames from agent to this viewer let frame_forward = tokio::spawn(async move { while let Ok(frame_data) = frame_rx.recv().await { if ws_sender.send(Message::Binary(frame_data)).await.is_err() { break; } } }); let sessions_cleanup = sessions.clone(); let viewer_id_cleanup = viewer_id.clone(); let viewer_name_cleanup = viewer_name.clone(); // Per-viewer input rate limiter (input-injection MEDIUM). A simple // refilling token bucket: at most `VIEWER_INPUT_EVENTS_PER_SEC` input events // are forwarded per second. Excess events are DROPPED (coalesced away) rather // than buffered — the session `input_tx` channel is already bounded // (capacity 64), and `try_send` never blocks, so a flood can neither stall // the relay nor grow memory. Chat is not throttled here (it is not an // injected-input vector). Only MouseEvent/KeyEvent/SpecialKey count against // the bucket. let mut input_tokens: f64 = VIEWER_INPUT_EVENTS_PER_SEC as f64; let mut last_refill = std::time::Instant::now(); let mut dropped_input: u64 = 0; // Count of input events refused solely because this is a view-only token, so // the refusal can be observed (logged once-per-power-of-two) without leaking // event contents or spamming the log on a noisy viewer. let mut refused_viewonly_input: u64 = 0; // Main loop: receive input from viewer and forward to agent while let Some(msg) = ws_receiver.next().await { match msg { Ok(Message::Binary(data)) => { // Try to decode as protobuf message match proto::Message::decode(data.as_ref()) { Ok(proto_msg) => { match &proto_msg.payload { Some(proto::message::Payload::MouseEvent(_)) | Some(proto::message::Payload::KeyEvent(_)) | Some(proto::message::Payload::SpecialKey(_)) if !access.can_control() => { // VIEW-ONLY ENFORCEMENT (authz-strength split, // audit CRITICAL #1). This viewer holds a // view-only token: the relay refuses to forward // ANY injected-input event to the agent. We drop // it silently (the viewer keeps receiving video). // The access mode came from the SIGNED token, so a // view-only viewer cannot escalate to control by // crafting messages — the relay simply never // relays their input. refused_viewonly_input += 1; if refused_viewonly_input.is_power_of_two() { warn!( "View-only viewer {} sent input on session {}; \ refusing to forward ({} input events refused so far)", viewer_id, session_id, refused_viewonly_input ); } } Some(proto::message::Payload::MouseEvent(_)) | Some(proto::message::Payload::KeyEvent(_)) | Some(proto::message::Payload::SpecialKey(_)) => { // Control token: forward input (subject to the // per-viewer rate throttle below). // Refill the token bucket based on elapsed time, // capped at one second's worth of capacity. let elapsed = last_refill.elapsed().as_secs_f64(); if elapsed > 0.0 { input_tokens = (input_tokens + elapsed * VIEWER_INPUT_EVENTS_PER_SEC as f64) .min(VIEWER_INPUT_EVENTS_PER_SEC as f64); last_refill = std::time::Instant::now(); } if input_tokens >= 1.0 { input_tokens -= 1.0; // Non-blocking, bounded send: drop on a full // queue rather than awaiting/buffering. if input_tx.try_send(data.to_vec()).is_err() { dropped_input += 1; } } else { // Over the per-second cap: drop (coalesce). dropped_input += 1; if dropped_input.is_power_of_two() { warn!( "Viewer {} exceeding input rate cap on session {} \ ({} events dropped so far)", viewer_id, session_id, dropped_input ); } } } Some(proto::message::Payload::ChatMessage(chat)) => { // Forward chat message to agent (not throttled — // not an injected-input vector). Bounded send. info!("Chat from technician: {}", chat.content); let _ = input_tx.try_send(data.to_vec()); } _ => {} } } Err(e) => { warn!("Failed to decode viewer message: {}", e); } } } Ok(Message::Close(_)) => { info!( "Viewer {} disconnected from session: {}", viewer_id, session_id ); break; } Ok(_) => {} Err(e) => { error!("WebSocket error from viewer {}: {}", viewer_id, e); break; } } } // Cleanup (this sends StopStream to agent if last viewer) frame_forward.abort(); sessions_cleanup .leave_session(session_id, &viewer_id_cleanup) .await; // Database: log viewer left event if let Some(ref db) = db { let _ = db::events::log_event( db.pool(), session_id, db::events::EventTypes::VIEWER_LEFT, Some(&viewer_id_cleanup), Some(&viewer_name_cleanup), None, client_ip, ) .await; } info!("Viewer {} left session: {}", viewer_id_cleanup, session_id); }