//! Session management for GuruConnect //! //! Manages active remote desktop sessions, tracking which agents //! are connected and which viewers are watching them. use std::collections::HashMap; use std::sync::Arc; use std::time::Instant; use tokio::sync::{broadcast, RwLock}; use uuid::Uuid; /// Unique identifier for a session pub type SessionId = Uuid; /// Unique identifier for an agent pub type AgentId = String; /// Unique identifier for a viewer pub type ViewerId = String; /// Information about a connected viewer/technician #[derive(Debug, Clone)] pub struct ViewerInfo { pub id: ViewerId, pub name: String, pub connected_at: chrono::DateTime, } /// Heartbeat timeout (90 seconds - 3x the agent's 30 second interval) const HEARTBEAT_TIMEOUT_SECS: u64 = 90; /// Session state #[derive(Debug, Clone)] pub struct Session { pub id: SessionId, pub agent_id: AgentId, pub agent_name: String, pub started_at: chrono::DateTime, pub viewer_count: usize, pub viewers: Vec, // List of connected technicians pub is_streaming: bool, pub is_online: bool, // Whether agent is currently connected pub is_persistent: bool, // Persistent agent (no support code) vs support session pub last_heartbeat: chrono::DateTime, // Agent status info pub os_version: Option, pub is_elevated: bool, pub uptime_secs: i64, pub display_count: i32, pub agent_version: Option, // Agent software version pub organization: Option, // Company/organization name pub site: Option, // Site/location name pub tags: Vec, // Tags for categorization } /// Channel for sending frames from agent to viewers pub type FrameSender = broadcast::Sender>; pub type FrameReceiver = broadcast::Receiver>; /// Channel for sending input events from viewer to agent pub type InputSender = tokio::sync::mpsc::Sender>; pub type InputReceiver = tokio::sync::mpsc::Receiver>; /// Internal session data with channels struct SessionData { info: Session, /// Channel for video frames (agent -> viewers) frame_tx: FrameSender, /// Channel for input events (viewer -> agent) input_tx: InputSender, input_rx: Option, /// Map of connected viewers (id -> info) viewers: HashMap, /// Instant for heartbeat tracking last_heartbeat_instant: Instant, } /// Manages all active sessions #[derive(Clone)] pub struct SessionManager { sessions: Arc>>, agents: Arc>>, } impl SessionManager { pub fn new() -> Self { Self { sessions: Arc::new(RwLock::new(HashMap::new())), agents: Arc::new(RwLock::new(HashMap::new())), } } /// Register a new agent and create a session /// If agent was previously connected (offline session exists), reuse that session pub async fn register_agent(&self, agent_id: AgentId, agent_name: String, is_persistent: bool) -> (SessionId, FrameSender, InputReceiver) { // Check if this agent already has an offline session (reconnecting) { let agents = self.agents.read().await; if let Some(&existing_session_id) = agents.get(&agent_id) { let mut sessions = self.sessions.write().await; if let Some(session_data) = sessions.get_mut(&existing_session_id) { if !session_data.info.is_online { // Reuse existing session - mark as online and create new channels tracing::info!("Agent {} reconnecting to existing session {}", agent_id, existing_session_id); let (frame_tx, _) = broadcast::channel(16); let (input_tx, input_rx) = tokio::sync::mpsc::channel(64); session_data.info.is_online = true; session_data.info.last_heartbeat = chrono::Utc::now(); session_data.info.agent_name = agent_name; // Update name in case it changed session_data.frame_tx = frame_tx.clone(); session_data.input_tx = input_tx; session_data.last_heartbeat_instant = Instant::now(); return (existing_session_id, frame_tx, input_rx); } } } } // Create new session let session_id = Uuid::new_v4(); // Create channels let (frame_tx, _) = broadcast::channel(16); // Buffer 16 frames let (input_tx, input_rx) = tokio::sync::mpsc::channel(64); // Buffer 64 input events let now = chrono::Utc::now(); let session = Session { id: session_id, agent_id: agent_id.clone(), agent_name, started_at: now, viewer_count: 0, viewers: Vec::new(), is_streaming: false, is_online: true, is_persistent, last_heartbeat: now, os_version: None, is_elevated: false, uptime_secs: 0, display_count: 1, agent_version: None, organization: None, site: None, tags: Vec::new(), }; let session_data = SessionData { info: session, frame_tx: frame_tx.clone(), input_tx, input_rx: None, viewers: HashMap::new(), last_heartbeat_instant: Instant::now(), }; let mut sessions = self.sessions.write().await; sessions.insert(session_id, session_data); let mut agents = self.agents.write().await; agents.insert(agent_id, session_id); (session_id, frame_tx, input_rx) } /// Update agent status from heartbeat or status message pub async fn update_agent_status( &self, session_id: SessionId, os_version: Option, is_elevated: bool, uptime_secs: i64, display_count: i32, is_streaming: bool, agent_version: Option, organization: Option, site: Option, tags: Vec, ) { let mut sessions = self.sessions.write().await; if let Some(session_data) = sessions.get_mut(&session_id) { session_data.info.last_heartbeat = chrono::Utc::now(); session_data.last_heartbeat_instant = Instant::now(); session_data.info.is_streaming = is_streaming; if let Some(os) = os_version { session_data.info.os_version = Some(os); } session_data.info.is_elevated = is_elevated; session_data.info.uptime_secs = uptime_secs; session_data.info.display_count = display_count; if let Some(version) = agent_version { session_data.info.agent_version = Some(version); } if let Some(org) = organization { session_data.info.organization = Some(org); } if let Some(s) = site { session_data.info.site = Some(s); } if !tags.is_empty() { session_data.info.tags = tags; } } } /// Update heartbeat timestamp pub async fn update_heartbeat(&self, session_id: SessionId) { let mut sessions = self.sessions.write().await; if let Some(session_data) = sessions.get_mut(&session_id) { session_data.info.last_heartbeat = chrono::Utc::now(); session_data.last_heartbeat_instant = Instant::now(); } } /// Check if a session has timed out (no heartbeat for too long) pub async fn is_session_timed_out(&self, session_id: SessionId) -> bool { let sessions = self.sessions.read().await; if let Some(session_data) = sessions.get(&session_id) { session_data.last_heartbeat_instant.elapsed().as_secs() > HEARTBEAT_TIMEOUT_SECS } else { true // Non-existent sessions are considered timed out } } /// Get sessions that have timed out pub async fn get_timed_out_sessions(&self) -> Vec { let sessions = self.sessions.read().await; sessions .iter() .filter(|(_, data)| data.last_heartbeat_instant.elapsed().as_secs() > HEARTBEAT_TIMEOUT_SECS) .map(|(id, _)| *id) .collect() } /// Get a session by agent ID pub async fn get_session_by_agent(&self, agent_id: &str) -> Option { let agents = self.agents.read().await; let session_id = agents.get(agent_id)?; let sessions = self.sessions.read().await; sessions.get(session_id).map(|s| s.info.clone()) } /// Get a session by session ID pub async fn get_session(&self, session_id: SessionId) -> Option { let sessions = self.sessions.read().await; sessions.get(&session_id).map(|s| s.info.clone()) } /// Join a session as a viewer, returns channels and sends StartStream to agent pub async fn join_session(&self, session_id: SessionId, viewer_id: ViewerId, viewer_name: String) -> Option<(FrameReceiver, InputSender)> { let mut sessions = self.sessions.write().await; let session_data = sessions.get_mut(&session_id)?; let was_empty = session_data.viewers.is_empty(); // Add viewer info let viewer_info = ViewerInfo { id: viewer_id.clone(), name: viewer_name.clone(), connected_at: chrono::Utc::now(), }; session_data.viewers.insert(viewer_id.clone(), viewer_info); // Update session info session_data.info.viewer_count = session_data.viewers.len(); session_data.info.viewers = session_data.viewers.values().cloned().collect(); let frame_rx = session_data.frame_tx.subscribe(); let input_tx = session_data.input_tx.clone(); // If this is the first viewer, send StartStream to agent if was_empty { tracing::info!("Viewer {} ({}) joined session {}, sending StartStream", viewer_name, viewer_id, session_id); Self::send_start_stream_internal(session_data, &viewer_id).await; } else { tracing::info!("Viewer {} ({}) joined session {}", viewer_name, viewer_id, session_id); } Some((frame_rx, input_tx)) } /// Internal helper to send StartStream message async fn send_start_stream_internal(session_data: &SessionData, viewer_id: &str) { use crate::proto; use prost::Message; let start_stream = proto::Message { payload: Some(proto::message::Payload::StartStream(proto::StartStream { viewer_id: viewer_id.to_string(), display_id: 0, // Primary display })), }; let mut buf = Vec::new(); if start_stream.encode(&mut buf).is_ok() { let _ = session_data.input_tx.send(buf).await; } } /// Leave a session as a viewer, sends StopStream if no viewers left pub async fn leave_session(&self, session_id: SessionId, viewer_id: &ViewerId) { let mut sessions = self.sessions.write().await; if let Some(session_data) = sessions.get_mut(&session_id) { let viewer_name = session_data.viewers.get(viewer_id).map(|v| v.name.clone()); session_data.viewers.remove(viewer_id); session_data.info.viewer_count = session_data.viewers.len(); session_data.info.viewers = session_data.viewers.values().cloned().collect(); // If no more viewers, send StopStream to agent if session_data.viewers.is_empty() { tracing::info!("Last viewer {} ({}) left session {}, sending StopStream", viewer_name.as_deref().unwrap_or("unknown"), viewer_id, session_id); Self::send_stop_stream_internal(session_data, viewer_id).await; } else { tracing::info!("Viewer {} ({}) left session {}", viewer_name.as_deref().unwrap_or("unknown"), viewer_id, session_id); } } } /// Internal helper to send StopStream message async fn send_stop_stream_internal(session_data: &SessionData, viewer_id: &str) { use crate::proto; use prost::Message; let stop_stream = proto::Message { payload: Some(proto::message::Payload::StopStream(proto::StopStream { viewer_id: viewer_id.to_string(), })), }; let mut buf = Vec::new(); if stop_stream.encode(&mut buf).is_ok() { let _ = session_data.input_tx.send(buf).await; } } /// Mark agent as disconnected /// For persistent agents: keep session but mark as offline /// For support sessions: remove session entirely pub async fn mark_agent_disconnected(&self, session_id: SessionId) { let mut sessions = self.sessions.write().await; if let Some(session_data) = sessions.get_mut(&session_id) { if session_data.info.is_persistent { // Persistent agent - keep session but mark as offline tracing::info!("Persistent agent {} marked offline (session {} preserved)", session_data.info.agent_id, session_id); session_data.info.is_online = false; session_data.info.is_streaming = false; session_data.info.viewer_count = 0; session_data.info.viewers.clear(); session_data.viewers.clear(); } else { // Support session - remove entirely let agent_id = session_data.info.agent_id.clone(); sessions.remove(&session_id); drop(sessions); // Release sessions lock before acquiring agents lock let mut agents = self.agents.write().await; agents.remove(&agent_id); tracing::info!("Support session {} removed", session_id); } } } /// Remove a session entirely (for cleanup) pub async fn remove_session(&self, session_id: SessionId) { let mut sessions = self.sessions.write().await; if let Some(session_data) = sessions.remove(&session_id) { drop(sessions); let mut agents = self.agents.write().await; agents.remove(&session_data.info.agent_id); } } /// Disconnect a session by sending a disconnect message to the agent /// Returns true if the message was sent successfully pub async fn disconnect_session(&self, session_id: SessionId, reason: &str) -> bool { let sessions = self.sessions.read().await; if let Some(session_data) = sessions.get(&session_id) { // Create disconnect message use crate::proto; use prost::Message; let disconnect_msg = proto::Message { payload: Some(proto::message::Payload::Disconnect(proto::Disconnect { reason: reason.to_string(), })), }; let mut buf = Vec::new(); if disconnect_msg.encode(&mut buf).is_ok() { // Send via input channel (will be forwarded to agent's WebSocket) if session_data.input_tx.send(buf).await.is_ok() { return true; } } } false } /// List all active sessions pub async fn list_sessions(&self) -> Vec { let sessions = self.sessions.read().await; sessions.values().map(|s| s.info.clone()).collect() } /// Send an admin command to an agent (uninstall, restart, etc.) /// Returns true if the message was sent successfully pub async fn send_admin_command(&self, session_id: SessionId, command: crate::proto::AdminCommandType, reason: &str) -> bool { let sessions = self.sessions.read().await; if let Some(session_data) = sessions.get(&session_id) { if !session_data.info.is_online { tracing::warn!("Cannot send admin command to offline agent"); return false; } use crate::proto; use prost::Message; let admin_cmd = proto::Message { payload: Some(proto::message::Payload::AdminCommand(proto::AdminCommand { command: command as i32, reason: reason.to_string(), })), }; let mut buf = Vec::new(); if admin_cmd.encode(&mut buf).is_ok() { if session_data.input_tx.send(buf).await.is_ok() { tracing::info!("Sent admin command {:?} to session {}", command, session_id); return true; } } } false } /// Remove an agent/machine from the session manager (for deletion) /// Returns the agent_id if found pub async fn remove_agent(&self, agent_id: &str) -> Option { let agents = self.agents.read().await; let session_id = agents.get(agent_id).copied()?; drop(agents); self.remove_session(session_id).await; Some(session_id) } } impl Default for SessionManager { fn default() -> Self { Self::new() } } impl SessionManager { /// Restore a machine as an offline session (called on startup from database) pub async fn restore_offline_machine(&self, agent_id: &str, hostname: &str) -> SessionId { let session_id = Uuid::new_v4(); let now = chrono::Utc::now(); let session = Session { id: session_id, agent_id: agent_id.to_string(), agent_name: hostname.to_string(), started_at: now, viewer_count: 0, viewers: Vec::new(), is_streaming: false, is_online: false, // Offline until agent reconnects is_persistent: true, last_heartbeat: now, os_version: None, is_elevated: false, uptime_secs: 0, display_count: 1, agent_version: None, organization: None, site: None, tags: Vec::new(), }; // Create placeholder channels (will be replaced on reconnect) let (frame_tx, _) = broadcast::channel(16); let (input_tx, input_rx) = tokio::sync::mpsc::channel(64); let session_data = SessionData { info: session, frame_tx, input_tx, input_rx: Some(input_rx), viewers: HashMap::new(), last_heartbeat_instant: Instant::now(), }; let mut sessions = self.sessions.write().await; sessions.insert(session_id, session_data); let mut agents = self.agents.write().await; agents.insert(agent_id.to_string(), session_id); tracing::info!("Restored offline machine: {} ({})", hostname, agent_id); session_id } }