diff --git a/server/src/main.rs b/server/src/main.rs index 1a3b28a..672f3f3 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -57,6 +57,16 @@ const SPA_DIR: &str = "static/app"; /// non-WS, non-asset GET so `BrowserRouter` deep links (`/machines`, /// `/sessions`, `/login`) survive a hard reload. const SPA_INDEX: &str = "static/app/index.html"; + +/// How long an OFFLINE persistent session may sit idle before the reaper removes +/// it (v2-stable-identity Task 4). Measured on the session's monotonic +/// `last_heartbeat_instant`. Ten minutes is well past the agent's 30s heartbeat / +/// 90s timeout, so only genuinely-gone machines age out — a brief reconnect blip +/// never reaps a real session. +const PERSISTENT_SESSION_TTL: std::time::Duration = std::time::Duration::from_secs(10 * 60); + +/// Cadence of the stale-session reaper sweep (v2-stable-identity Task 4). +const PERSISTENT_SESSION_REAP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60); use metrics::SharedMetrics; use prometheus_client::registry::Registry; use support_codes::{CodeValidation, CreateCodeRequest, SupportCode, SupportCodeManager}; @@ -287,6 +297,37 @@ async fn main() -> Result<()> { } } + // Spawn the stale-session reaper (v2-stable-identity Task 4). Periodically + // removes OFFLINE persistent sessions that have aged out past + // PERSISTENT_SESSION_TTL with no viewer attached, purging both the agent_id and + // machine_uid indexes via SessionManager::remove_session. Spawned AFTER the + // startup restore so restored-offline rows are present (they age out once they + // pass the TTL, per plan). The task holds only a clone of the SessionManager + // (Arc-backed internally), so it shares the live session map. + { + let reaper_sessions = sessions.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(PERSISTENT_SESSION_REAP_INTERVAL); + // Skip the immediate first tick so we do not sweep before the server is + // even serving; the first real sweep happens one interval in. + interval.tick().await; + loop { + interval.tick().await; + let reaped = reaper_sessions + .reap_stale_persistent(PERSISTENT_SESSION_TTL) + .await; + if reaped > 0 { + info!("Stale-session reaper removed {} offline session(s)", reaped); + } + } + }); + } + info!( + "Stale-session reaper started (ttl {}s, sweep every {}s)", + PERSISTENT_SESSION_TTL.as_secs(), + PERSISTENT_SESSION_REAP_INTERVAL.as_secs() + ); + // Agent API key for persistent agents (optional) let agent_api_key = std::env::var("AGENT_API_KEY").ok(); if let Some(ref key) = agent_api_key { diff --git a/server/src/session/mod.rs b/server/src/session/mod.rs index 104e518..d5dacfb 100644 --- a/server/src/session/mod.rs +++ b/server/src/session/mod.rs @@ -81,6 +81,14 @@ pub struct Session { pub is_streaming: bool, pub is_online: bool, // Whether agent is currently connected pub is_persistent: bool, // Persistent agent (no support code) vs support session + /// Stable hardware identity (`machine_uid`) this session belongs to, when the + /// agent reported one over the UN-KEYED dedup path (v2-stable-identity Task 2). + /// `None` for keyed agents (uid suppressed by the relay) and for legacy agents + /// that report no uid. Recorded on the session so the reaper can log which + /// physical machine it reaped and so same-machine supersede (Task 4) can find an + /// older offline session for the SAME uid that was never placed in the reattach + /// index (e.g. a legacy no-uid session that predates the agent's uid upgrade). + pub machine_uid: Option, /// Attended-consent state (Task 5). Gates viewer join for attended sessions. pub consent_state: ConsentState, pub last_heartbeat: chrono::DateTime, @@ -269,6 +277,12 @@ impl SessionManager { session_data.info.is_online = true; session_data.info.last_heartbeat = chrono::Utc::now(); session_data.info.agent_name = agent_name; // Update name in case it changed + // Record the reported uid on the session so the reaper can log it + // and supersede can match it later. Only set it from a reported + // uid; never clear an existing one on a None (legacy) reconnect. + if let Some(uid) = machine_uid { + session_data.info.machine_uid = Some(uid.to_string()); + } session_data.frame_tx = frame_tx.clone(); session_data.input_tx = input_tx; session_data.last_heartbeat_instant = Instant::now(); @@ -287,6 +301,64 @@ impl SessionManager { } } + // Same-machine supersede (v2-stable-identity Task 4). + // + // We only reach here (a NEW session) when BOTH reattach lookups missed: no + // session was indexed under this `machine_uid` AND none under this `agent_id`. + // Task 2's uid reattach already prevents stranding whenever the prior session + // was indexed by uid; the residual gap it does NOT cover is a prior session + // that carries this machine's uid on its record but is NOT in the reattach + // index — e.g. a legacy no-uid session created before the agent learned its + // uid, then later upgraded and reconnected with a FRESH agent_id + uid. That + // older session would otherwise be stranded (it ages out via the reaper, but + // we can collapse it deterministically here). + // + // Scope is deliberately narrow and conservative: only when THIS connection + // reports a uid, and only superseding OFFLINE, PERSISTENT, VIEWERLESS sessions + // whose recorded `machine_uid` equals it. We never supersede an online session, + // a viewer-attached session, a support session, or one with a different/absent + // uid. Keyed agents pass `machine_uid = None` (relay-suppressed) so they never + // trigger or fall victim to this — the Task-2 security gate is untouched. + if let Some(uid) = machine_uid { + let stranded: Vec = { + let sessions = self.sessions.read().await; + sessions + .iter() + .filter(|(_, data)| { + !data.info.is_online + && data.info.is_persistent + && data.viewers.is_empty() + && data.info.machine_uid.as_deref() == Some(uid) + }) + .map(|(id, _)| *id) + .collect() + }; + for sid in stranded { + // Re-assert the supersede predicate (offline + persistent + + // viewerless + same-uid) UNDER the write lock at removal time. In the + // window since we snapshotted the candidate the victim's agent could + // have reconnected (gone online / attached a viewer); the guard spares + // a now-live session. remove_session_if purges BOTH the agent_id and + // machine_uid indexes consistently, like remove_session. + let removed = self + .remove_session_if(sid, |data| { + !data.info.is_online + && data.info.is_persistent + && data.viewers.is_empty() + && data.info.machine_uid.as_deref() == Some(uid) + }) + .await; + if removed { + tracing::info!( + "Superseded stranded same-machine session {} (machine_uid {}) for reconnecting agent {}", + sid, + uid, + agent_id + ); + } + } + } + // Create new session let session_id = Uuid::new_v4(); @@ -305,6 +377,7 @@ impl SessionManager { is_streaming: false, is_online: true, is_persistent, + machine_uid: machine_uid.map(|s| s.to_string()), // Attended (support-code) sessions require consent before a viewer // may join; managed/persistent sessions do not (Phase-1 policy). consent_state: if is_persistent { @@ -661,6 +734,47 @@ impl SessionManager { } } + /// Remove a session only if it still satisfies `pred` when re-checked under + /// the write lock — closes the snapshot->remove TOCTOU in reaping/supersede + /// (v2-stable-identity Task 4 hardening). + /// + /// The reap/supersede paths select victims under a READ lock, then drop it + /// before removal. In that window a victim's agent could reconnect (go online + /// / attach a viewer); an unconditional [`remove_session`](Self::remove_session) + /// would then drop a now-live session. This helper re-asserts the removal + /// predicate atomically under the SAME write lock it removes with, so a session + /// that became ineligible since selection is spared. Returns `true` iff removed. + /// + /// On removal it purges the `sessions` map AND both indexes (`agents` by + /// `agent_id`, `machine_uids` by value) exactly as `remove_session` does. The + /// `agents`/`machine_uids` locks are taken nested INSIDE the held `sessions` + /// write lock, never the reverse — matching `register_agent`'s acquisition order + /// — so there is no lock-order inversion. `pred` is synchronous, so no lock is + /// held across an `.await`. Does NOT call `remove_session` (which would re-take + /// the `sessions` lock and deadlock); it inlines the same purge. + async fn remove_session_if(&self, session_id: SessionId, pred: F) -> bool + where + F: FnOnce(&SessionData) -> bool, + { + let mut sessions = self.sessions.write().await; + let Some(session_data) = sessions.get(&session_id) else { + return false; + }; + if !pred(session_data) { + return false; + } + // Predicate still holds under the write lock — remove and purge indexes, + // mirroring remove_session exactly so no index entry leaks. + let agent_id = session_data.info.agent_id.clone(); + sessions.remove(&session_id); + let mut agents = self.agents.write().await; + agents.remove(&agent_id); + drop(agents); + let mut machine_uids = self.machine_uids.write().await; + machine_uids.retain(|_, &mut sid| sid != session_id); + true + } + /// Disconnect a session by sending a disconnect message to the agent /// Returns true if the message was sent successfully pub async fn disconnect_session(&self, session_id: SessionId, reason: &str) -> bool { @@ -737,6 +851,82 @@ impl SessionManager { self.remove_session(session_id).await; Some(session_id) } + + /// Reap stale OFFLINE persistent sessions (v2-stable-identity Task 4). + /// + /// Sweeps all sessions and removes those that are persistent, currently + /// OFFLINE, have NO viewer attached, and whose last heartbeat is older than + /// `ttl`. Age is measured on the monotonic `last_heartbeat_instant` (immune to + /// wall-clock changes), which is refreshed on every heartbeat/status update, + /// on (re)attach, and on session creation/restore — so a restored-offline row + /// becomes eligible only after `ttl` elapses past the restore, as the plan + /// permits. + /// + /// Conservative by construction — a session is reaped ONLY if every condition + /// holds; when in doubt it is spared: + /// - ONLINE sessions are never reaped (a live agent owns its session). + /// - Viewer-attached sessions are never reaped (a tech is mid-session). + /// - Non-persistent (support-code) sessions are never reaped here; they are + /// torn down on disconnect by `mark_agent_disconnected`, not aged out. + /// + /// Removal is routed through [`remove_session`](Self::remove_session) so BOTH + /// the `agents` (by `agent_id`) and `machine_uids` indexes are purged together, + /// never leaving a dangling index entry. Returns the number of sessions reaped. + pub async fn reap_stale_persistent(&self, ttl: std::time::Duration) -> usize { + // Snapshot the victims under a read lock, then remove them with the normal + // write path. We collect the identifying fields for logging up front so the + // log line is accurate even though the session is gone by the time we log. + let victims: Vec<(SessionId, String, Option, std::time::Duration)> = { + let sessions = self.sessions.read().await; + sessions + .iter() + .filter(|(_, data)| { + !data.info.is_online + && data.info.is_persistent + && data.viewers.is_empty() + && data.last_heartbeat_instant.elapsed() > ttl + }) + .map(|(id, data)| { + ( + *id, + data.info.agent_id.clone(), + data.info.machine_uid.clone(), + data.last_heartbeat_instant.elapsed(), + ) + }) + .collect() + }; + + let mut count = 0usize; + for (session_id, agent_id, machine_uid, age) in victims { + // Re-assert the reap predicate UNDER the write lock at removal time. The + // candidate was selected under a read lock that we have since dropped; in + // that window the agent could have reconnected (gone online / attached a + // viewer) or been heartbeated within the TTL. The guard spares a session + // that is no longer eligible. Only log/count when a removal actually + // happened. remove_session_if purges BOTH indexes like remove_session. + let removed = self + .remove_session_if(session_id, |data| { + !data.info.is_online + && data.info.is_persistent + && data.viewers.is_empty() + && data.last_heartbeat_instant.elapsed() > ttl + }) + .await; + if removed { + count += 1; + tracing::info!( + "Reaped stale offline persistent session {} (agent_id {}, machine_uid {:?}, idle {}s > ttl {}s)", + session_id, + agent_id, + machine_uid.as_deref().unwrap_or("none"), + age.as_secs(), + ttl.as_secs() + ); + } + } + count + } } impl Default for SessionManager { @@ -772,6 +962,7 @@ impl SessionManager { is_streaming: false, is_online: false, // Offline until agent reconnects is_persistent: true, + machine_uid: machine_uid.map(|s| s.to_string()), consent_state: ConsentState::NotRequired, // managed/persistent last_heartbeat: now, os_version: None, @@ -823,6 +1014,42 @@ impl SessionManager { async fn is_machine_uid_indexed(&self, uid: &str) -> bool { self.machine_uids.read().await.contains_key(uid) } + + /// Test-only: backdate a session's monotonic heartbeat instant so the reaper + /// (Task 4) sees it as stale without a real sleep. Subtracts `age` from "now"; + /// no-op if the session does not exist or `age` overflows the instant. + #[cfg(test)] + async fn set_last_heartbeat_age(&self, session_id: SessionId, age: std::time::Duration) { + let mut sessions = self.sessions.write().await; + if let Some(data) = sessions.get_mut(&session_id) { + if let Some(backdated) = Instant::now().checked_sub(age) { + data.last_heartbeat_instant = backdated; + } + } + } + + /// Test-only: mark a session OFFLINE without touching its viewers. The live + /// `mark_agent_disconnected` path clears viewers for persistent sessions, so + /// this lets a reaper test exercise the viewer/support guards in isolation + /// (an offline+stale session that still carries a viewer, or a support session). + #[cfg(test)] + async fn force_offline_for_test(&self, session_id: SessionId) { + let mut sessions = self.sessions.write().await; + if let Some(data) = sessions.get_mut(&session_id) { + data.info.is_online = false; + } + } + + /// Test-only: record a `machine_uid` on an existing session's info without + /// going through the index. Lets the supersede test set up a legacy session + /// that later learned its uid, so the new-session supersede sweep can match it. + #[cfg(test)] + async fn set_machine_uid_for_test(&self, session_id: SessionId, uid: &str) { + let mut sessions = self.sessions.write().await; + if let Some(data) = sessions.get_mut(&session_id) { + data.info.machine_uid = Some(uid.to_string()); + } + } } #[cfg(test)] @@ -1210,4 +1437,241 @@ mod tests { assert_eq!(sid2, sid, "restored machine must reattach by machine_uid"); assert_eq!(mgr.list_sessions().await.len(), 1); } + + // ---- v2-stable-identity Task 4: session reaping + same-machine supersede ---- + + /// An OFFLINE persistent session whose last heartbeat is older than the TTL is + /// reaped. We backdate its monotonic heartbeat instant rather than sleep. + #[tokio::test] + async fn offline_persistent_past_ttl_is_reaped() { + let mgr = SessionManager::new(); + let ttl = std::time::Duration::from_secs(600); + + let (sid, _f, _i) = mgr + .register_agent("reap-agent".to_string(), "HOST-REAP".to_string(), true, None) + .await; + mgr.mark_agent_disconnected(sid).await; // now offline + // Age it well past the TTL. + mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(1200)) + .await; + + let reaped = mgr.reap_stale_persistent(ttl).await; + assert_eq!(reaped, 1, "a stale offline persistent session must be reaped"); + assert!(mgr.get_session(sid).await.is_none()); + assert_eq!(mgr.list_sessions().await.len(), 0); + // Index purged too: the agent_id no longer resolves. + assert!(mgr.get_session_by_agent("reap-agent").await.is_none()); + } + + /// An OFFLINE persistent session still WITHIN the TTL is spared. + #[tokio::test] + async fn offline_persistent_within_ttl_is_spared() { + let mgr = SessionManager::new(); + let ttl = std::time::Duration::from_secs(600); + + let (sid, _f, _i) = mgr + .register_agent("fresh-agent".to_string(), "HOST-FRESH".to_string(), true, None) + .await; + mgr.mark_agent_disconnected(sid).await; + // Aged, but not past the TTL. + mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(60)) + .await; + + let reaped = mgr.reap_stale_persistent(ttl).await; + assert_eq!(reaped, 0, "a within-TTL session must not be reaped"); + assert!(mgr.get_session(sid).await.is_some()); + } + + /// An ONLINE session is NEVER reaped, even if its heartbeat instant is ancient. + /// (Online is the dominant guard: a live agent owns its session.) + #[tokio::test] + async fn online_session_is_never_reaped() { + let mgr = SessionManager::new(); + let ttl = std::time::Duration::from_secs(600); + + let (sid, _f, _i) = mgr + .register_agent("online-agent".to_string(), "HOST-ON".to_string(), true, None) + .await; + // Still online (no disconnect). Backdate anyway to prove online wins. + mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(99999)) + .await; + + let reaped = mgr.reap_stale_persistent(ttl).await; + assert_eq!(reaped, 0, "an online session must never be reaped"); + assert!(mgr.get_session(sid).await.is_some()); + } + + /// A viewer-attached session is spared even when offline AND stale — a tech is + /// mid-session. We mark it offline (preserving viewers) and age it out, then + /// assert the reaper leaves it alone. + #[tokio::test] + async fn viewer_attached_session_is_spared_even_if_stale() { + let mgr = SessionManager::new(); + let ttl = std::time::Duration::from_secs(600); + + let (sid, _f, _i) = mgr + .register_agent("viewed-agent".to_string(), "HOST-V".to_string(), true, None) + .await; + // Attach a viewer (managed session => NotRequired => join succeeds). + assert!(mgr + .join_session(sid, "viewer-1".to_string(), "Tech".to_string()) + .await + .is_some()); + + // Force the session offline WITHOUT clearing its viewers, so we can prove + // the viewer guard (not the online guard) is what spares it. (The normal + // mark_agent_disconnected path clears viewers; here we test the guard in + // isolation by aging a viewer-bearing session directly.) + mgr.force_offline_for_test(sid).await; + mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(1200)) + .await; + + let reaped = mgr.reap_stale_persistent(ttl).await; + assert_eq!(reaped, 0, "a viewer-attached session must be spared"); + assert!(mgr.get_session(sid).await.is_some()); + } + + /// A support (non-persistent) session is never reaped by the persistent reaper. + /// Support sessions are torn down on disconnect, not aged out here. + #[tokio::test] + async fn support_session_is_never_reaped() { + let mgr = SessionManager::new(); + let ttl = std::time::Duration::from_secs(600); + + // Support (attended) session: is_persistent = false. + let (sid, _f, _i) = mgr + .register_agent("support-agent".to_string(), "HOST-S".to_string(), false, None) + .await; + // Force offline + stale (a support session that lingered, hypothetically). + mgr.force_offline_for_test(sid).await; + mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(1200)) + .await; + + let reaped = mgr.reap_stale_persistent(ttl).await; + assert_eq!(reaped, 0, "the persistent reaper must never reap a support session"); + assert!(mgr.get_session(sid).await.is_some()); + } + + /// Same-machine supersede gap (the one Task 2 does NOT cover): a machine first + /// connects as a LEGACY no-uid agent (its session is therefore NOT in the uid + /// reattach index), goes offline, then upgrades and reconnects with a FRESH + /// agent_id AND its uid. Task 2's reattach misses (no uid index entry, different + /// agent_id) and would strand the old session. Supersede collapses it so exactly + /// ONE live session remains for the physical machine. + #[tokio::test] + async fn same_machine_reconnect_supersedes_stranded_legacy_session() { + let mgr = SessionManager::new(); + let uid = "muid-supersede"; + + // Legacy connect: NO uid reported. Session is indexed by agent_id only. + let (sid_old, _f1, _i1) = mgr + .register_agent("legacy-id".to_string(), "HOST-SUP".to_string(), true, None) + .await; + // The legacy session has no recorded uid, so set it as if a later status + // taught us the machine's uid while it was still the active session. This + // models the realistic upgrade path where the uid becomes known. + mgr.set_machine_uid_for_test(sid_old, uid).await; + mgr.mark_agent_disconnected(sid_old).await; // offline, viewers cleared + + // Upgrade + config loss: reconnect with a FRESH agent_id AND the uid. Both + // reattach lookups miss (uid was never indexed; agent_id differs), so a NEW + // session is created — and supersede must remove the stranded old one. + let (sid_new, _f2, _i2) = mgr + .register_agent("fresh-id".to_string(), "HOST-SUP".to_string(), true, Some(uid)) + .await; + + assert_ne!(sid_new, sid_old, "a fresh agent_id with an unindexed uid creates a new session"); + // The stranded old session is gone; exactly one live session remains. + assert!( + mgr.get_session(sid_old).await.is_none(), + "the stranded same-machine session must be superseded" + ); + assert_eq!( + mgr.list_sessions().await.len(), + 1, + "exactly one session must remain for the physical machine" + ); + // The remaining session is the new one and resolves by its agent_id. + assert!(mgr.get_session_by_agent("fresh-id").await.is_some()); + assert!(mgr.get_session_by_agent("legacy-id").await.is_none()); + } + + /// TOCTOU guard (Task 4 hardening): the reaper selects victims under a read + /// lock, drops it, then removes. If a selected session goes back ONLINE in that + /// window, the guarded removal must spare it. We can't interleave the real + /// read-then-remove here, but we exercise the removal predicate directly: + /// `remove_session_if` with the reap predicate must NOT remove a session that + /// is online (the state a reconnect would have produced) and must return false. + #[tokio::test] + async fn guarded_remove_spares_session_that_went_online_after_selection() { + let mgr = SessionManager::new(); + let ttl = std::time::Duration::from_secs(600); + + let (sid, _f, _i) = mgr + .register_agent("race-agent".to_string(), "HOST-RACE".to_string(), true, None) + .await; + // Make it a valid reap candidate first: offline + stale. + mgr.mark_agent_disconnected(sid).await; + mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(1200)) + .await; + + // Simulate the agent reconnecting in the snapshot->remove window: it is now + // ONLINE again (register_agent's reattach marks the session online). + let (sid_re, _f2, _i2) = mgr + .register_agent("race-agent".to_string(), "HOST-RACE".to_string(), true, None) + .await; + assert_eq!(sid_re, sid, "reconnect reattaches the same offline session"); + assert_eq!(mgr.get_session(sid).await.map(|s| s.is_online), Some(true)); + + // The guarded removal re-checks the reap predicate under the write lock and + // must spare the now-online session. + let removed = mgr + .remove_session_if(sid, |data| { + !data.info.is_online + && data.info.is_persistent + && data.viewers.is_empty() + && data.last_heartbeat_instant.elapsed() > ttl + }) + .await; + assert!(!removed, "a session that went online must not be removed by the guard"); + assert!( + mgr.get_session(sid).await.is_some(), + "the now-online session must survive the guarded removal" + ); + } + + /// Supersede must NEVER remove an ONLINE session for the same uid (defensive: + /// it only ever collapses OFFLINE strays). Task 2's reattach reuse only fires + /// for an OFFLINE prior session (`if !is_online`), so when the prior same-uid + /// session is still ONLINE a second concurrent connection falls through to a NEW + /// session — and supersede leaves the live one untouched. The invariant under + /// test is the one that matters: the online session SURVIVES. + #[tokio::test] + async fn supersede_does_not_remove_online_same_machine_session() { + let mgr = SessionManager::new(); + let uid = "muid-online-keep"; + + // First session reports the uid and stays ONLINE. + let (sid_online, _f1, _i1) = mgr + .register_agent("agent-a".to_string(), "HOST-O".to_string(), true, Some(uid)) + .await; + + // A second connection arrives claiming the same uid while the first is still + // online. Reattach reuse is gated on the prior being offline, so this creates + // a NEW session; supersede must not have removed the live one. + let (sid_second, _f2, _i2) = mgr + .register_agent("agent-b".to_string(), "HOST-O".to_string(), true, Some(uid)) + .await; + + // The online session must still be present and resolvable — supersede never + // touches an online session. + assert!( + mgr.get_session(sid_online).await.is_some(), + "an online same-machine session must never be superseded" + ); + assert_ne!( + sid_second, sid_online, + "a concurrent same-uid connection to a still-online session gets its own session" + ); + } }