feat(server): reap stale persistent sessions + same-machine supersede (SPEC-004 Task 4)
A periodic reaper removes persistent, offline, viewerless sessions whose last heartbeat is older than a 10-minute TTL (60s sweep spawned at startup), and a same-machine supersede on the new-session path drops a stranded prior session when a legacy no-uid agent upgrades to a fresh agent_id + machine_uid. Both removals re-assert the predicate under the write lock (remove_session_if) to close a snapshot->remove TOCTOU. Security: keyed (cak_) agents pass machine_uid=None, so they never trigger supersede and are never reaped as a uid victim; online, viewer-attached, and support sessions are never reaped. 82 server tests pass; clippy clean. Implements specs/v2-stable-identity/plan.md Task 4. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -57,6 +57,16 @@ const SPA_DIR: &str = "static/app";
|
|||||||
/// non-WS, non-asset GET so `BrowserRouter` deep links (`/machines`,
|
/// non-WS, non-asset GET so `BrowserRouter` deep links (`/machines`,
|
||||||
/// `/sessions`, `/login`) survive a hard reload.
|
/// `/sessions`, `/login`) survive a hard reload.
|
||||||
const SPA_INDEX: &str = "static/app/index.html";
|
const SPA_INDEX: &str = "static/app/index.html";
|
||||||
|
|
||||||
|
/// How long an OFFLINE persistent session may sit idle before the reaper removes
|
||||||
|
/// it (v2-stable-identity Task 4). Measured on the session's monotonic
|
||||||
|
/// `last_heartbeat_instant`. Ten minutes is well past the agent's 30s heartbeat /
|
||||||
|
/// 90s timeout, so only genuinely-gone machines age out — a brief reconnect blip
|
||||||
|
/// never reaps a real session.
|
||||||
|
const PERSISTENT_SESSION_TTL: std::time::Duration = std::time::Duration::from_secs(10 * 60);
|
||||||
|
|
||||||
|
/// Cadence of the stale-session reaper sweep (v2-stable-identity Task 4).
|
||||||
|
const PERSISTENT_SESSION_REAP_INTERVAL: std::time::Duration = std::time::Duration::from_secs(60);
|
||||||
use metrics::SharedMetrics;
|
use metrics::SharedMetrics;
|
||||||
use prometheus_client::registry::Registry;
|
use prometheus_client::registry::Registry;
|
||||||
use support_codes::{CodeValidation, CreateCodeRequest, SupportCode, SupportCodeManager};
|
use support_codes::{CodeValidation, CreateCodeRequest, SupportCode, SupportCodeManager};
|
||||||
@@ -287,6 +297,37 @@ async fn main() -> Result<()> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Spawn the stale-session reaper (v2-stable-identity Task 4). Periodically
|
||||||
|
// removes OFFLINE persistent sessions that have aged out past
|
||||||
|
// PERSISTENT_SESSION_TTL with no viewer attached, purging both the agent_id and
|
||||||
|
// machine_uid indexes via SessionManager::remove_session. Spawned AFTER the
|
||||||
|
// startup restore so restored-offline rows are present (they age out once they
|
||||||
|
// pass the TTL, per plan). The task holds only a clone of the SessionManager
|
||||||
|
// (Arc-backed internally), so it shares the live session map.
|
||||||
|
{
|
||||||
|
let reaper_sessions = sessions.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut interval = tokio::time::interval(PERSISTENT_SESSION_REAP_INTERVAL);
|
||||||
|
// Skip the immediate first tick so we do not sweep before the server is
|
||||||
|
// even serving; the first real sweep happens one interval in.
|
||||||
|
interval.tick().await;
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
let reaped = reaper_sessions
|
||||||
|
.reap_stale_persistent(PERSISTENT_SESSION_TTL)
|
||||||
|
.await;
|
||||||
|
if reaped > 0 {
|
||||||
|
info!("Stale-session reaper removed {} offline session(s)", reaped);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
info!(
|
||||||
|
"Stale-session reaper started (ttl {}s, sweep every {}s)",
|
||||||
|
PERSISTENT_SESSION_TTL.as_secs(),
|
||||||
|
PERSISTENT_SESSION_REAP_INTERVAL.as_secs()
|
||||||
|
);
|
||||||
|
|
||||||
// Agent API key for persistent agents (optional)
|
// Agent API key for persistent agents (optional)
|
||||||
let agent_api_key = std::env::var("AGENT_API_KEY").ok();
|
let agent_api_key = std::env::var("AGENT_API_KEY").ok();
|
||||||
if let Some(ref key) = agent_api_key {
|
if let Some(ref key) = agent_api_key {
|
||||||
|
|||||||
@@ -81,6 +81,14 @@ pub struct Session {
|
|||||||
pub is_streaming: bool,
|
pub is_streaming: bool,
|
||||||
pub is_online: bool, // Whether agent is currently connected
|
pub is_online: bool, // Whether agent is currently connected
|
||||||
pub is_persistent: bool, // Persistent agent (no support code) vs support session
|
pub is_persistent: bool, // Persistent agent (no support code) vs support session
|
||||||
|
/// Stable hardware identity (`machine_uid`) this session belongs to, when the
|
||||||
|
/// agent reported one over the UN-KEYED dedup path (v2-stable-identity Task 2).
|
||||||
|
/// `None` for keyed agents (uid suppressed by the relay) and for legacy agents
|
||||||
|
/// that report no uid. Recorded on the session so the reaper can log which
|
||||||
|
/// physical machine it reaped and so same-machine supersede (Task 4) can find an
|
||||||
|
/// older offline session for the SAME uid that was never placed in the reattach
|
||||||
|
/// index (e.g. a legacy no-uid session that predates the agent's uid upgrade).
|
||||||
|
pub machine_uid: Option<String>,
|
||||||
/// Attended-consent state (Task 5). Gates viewer join for attended sessions.
|
/// Attended-consent state (Task 5). Gates viewer join for attended sessions.
|
||||||
pub consent_state: ConsentState,
|
pub consent_state: ConsentState,
|
||||||
pub last_heartbeat: chrono::DateTime<chrono::Utc>,
|
pub last_heartbeat: chrono::DateTime<chrono::Utc>,
|
||||||
@@ -269,6 +277,12 @@ impl SessionManager {
|
|||||||
session_data.info.is_online = true;
|
session_data.info.is_online = true;
|
||||||
session_data.info.last_heartbeat = chrono::Utc::now();
|
session_data.info.last_heartbeat = chrono::Utc::now();
|
||||||
session_data.info.agent_name = agent_name; // Update name in case it changed
|
session_data.info.agent_name = agent_name; // Update name in case it changed
|
||||||
|
// Record the reported uid on the session so the reaper can log it
|
||||||
|
// and supersede can match it later. Only set it from a reported
|
||||||
|
// uid; never clear an existing one on a None (legacy) reconnect.
|
||||||
|
if let Some(uid) = machine_uid {
|
||||||
|
session_data.info.machine_uid = Some(uid.to_string());
|
||||||
|
}
|
||||||
session_data.frame_tx = frame_tx.clone();
|
session_data.frame_tx = frame_tx.clone();
|
||||||
session_data.input_tx = input_tx;
|
session_data.input_tx = input_tx;
|
||||||
session_data.last_heartbeat_instant = Instant::now();
|
session_data.last_heartbeat_instant = Instant::now();
|
||||||
@@ -287,6 +301,64 @@ impl SessionManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Same-machine supersede (v2-stable-identity Task 4).
|
||||||
|
//
|
||||||
|
// We only reach here (a NEW session) when BOTH reattach lookups missed: no
|
||||||
|
// session was indexed under this `machine_uid` AND none under this `agent_id`.
|
||||||
|
// Task 2's uid reattach already prevents stranding whenever the prior session
|
||||||
|
// was indexed by uid; the residual gap it does NOT cover is a prior session
|
||||||
|
// that carries this machine's uid on its record but is NOT in the reattach
|
||||||
|
// index — e.g. a legacy no-uid session created before the agent learned its
|
||||||
|
// uid, then later upgraded and reconnected with a FRESH agent_id + uid. That
|
||||||
|
// older session would otherwise be stranded (it ages out via the reaper, but
|
||||||
|
// we can collapse it deterministically here).
|
||||||
|
//
|
||||||
|
// Scope is deliberately narrow and conservative: only when THIS connection
|
||||||
|
// reports a uid, and only superseding OFFLINE, PERSISTENT, VIEWERLESS sessions
|
||||||
|
// whose recorded `machine_uid` equals it. We never supersede an online session,
|
||||||
|
// a viewer-attached session, a support session, or one with a different/absent
|
||||||
|
// uid. Keyed agents pass `machine_uid = None` (relay-suppressed) so they never
|
||||||
|
// trigger or fall victim to this — the Task-2 security gate is untouched.
|
||||||
|
if let Some(uid) = machine_uid {
|
||||||
|
let stranded: Vec<SessionId> = {
|
||||||
|
let sessions = self.sessions.read().await;
|
||||||
|
sessions
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, data)| {
|
||||||
|
!data.info.is_online
|
||||||
|
&& data.info.is_persistent
|
||||||
|
&& data.viewers.is_empty()
|
||||||
|
&& data.info.machine_uid.as_deref() == Some(uid)
|
||||||
|
})
|
||||||
|
.map(|(id, _)| *id)
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
for sid in stranded {
|
||||||
|
// Re-assert the supersede predicate (offline + persistent +
|
||||||
|
// viewerless + same-uid) UNDER the write lock at removal time. In the
|
||||||
|
// window since we snapshotted the candidate the victim's agent could
|
||||||
|
// have reconnected (gone online / attached a viewer); the guard spares
|
||||||
|
// a now-live session. remove_session_if purges BOTH the agent_id and
|
||||||
|
// machine_uid indexes consistently, like remove_session.
|
||||||
|
let removed = self
|
||||||
|
.remove_session_if(sid, |data| {
|
||||||
|
!data.info.is_online
|
||||||
|
&& data.info.is_persistent
|
||||||
|
&& data.viewers.is_empty()
|
||||||
|
&& data.info.machine_uid.as_deref() == Some(uid)
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
if removed {
|
||||||
|
tracing::info!(
|
||||||
|
"Superseded stranded same-machine session {} (machine_uid {}) for reconnecting agent {}",
|
||||||
|
sid,
|
||||||
|
uid,
|
||||||
|
agent_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create new session
|
// Create new session
|
||||||
let session_id = Uuid::new_v4();
|
let session_id = Uuid::new_v4();
|
||||||
|
|
||||||
@@ -305,6 +377,7 @@ impl SessionManager {
|
|||||||
is_streaming: false,
|
is_streaming: false,
|
||||||
is_online: true,
|
is_online: true,
|
||||||
is_persistent,
|
is_persistent,
|
||||||
|
machine_uid: machine_uid.map(|s| s.to_string()),
|
||||||
// Attended (support-code) sessions require consent before a viewer
|
// Attended (support-code) sessions require consent before a viewer
|
||||||
// may join; managed/persistent sessions do not (Phase-1 policy).
|
// may join; managed/persistent sessions do not (Phase-1 policy).
|
||||||
consent_state: if is_persistent {
|
consent_state: if is_persistent {
|
||||||
@@ -661,6 +734,47 @@ impl SessionManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Remove a session only if it still satisfies `pred` when re-checked under
|
||||||
|
/// the write lock — closes the snapshot->remove TOCTOU in reaping/supersede
|
||||||
|
/// (v2-stable-identity Task 4 hardening).
|
||||||
|
///
|
||||||
|
/// The reap/supersede paths select victims under a READ lock, then drop it
|
||||||
|
/// before removal. In that window a victim's agent could reconnect (go online
|
||||||
|
/// / attach a viewer); an unconditional [`remove_session`](Self::remove_session)
|
||||||
|
/// would then drop a now-live session. This helper re-asserts the removal
|
||||||
|
/// predicate atomically under the SAME write lock it removes with, so a session
|
||||||
|
/// that became ineligible since selection is spared. Returns `true` iff removed.
|
||||||
|
///
|
||||||
|
/// On removal it purges the `sessions` map AND both indexes (`agents` by
|
||||||
|
/// `agent_id`, `machine_uids` by value) exactly as `remove_session` does. The
|
||||||
|
/// `agents`/`machine_uids` locks are taken nested INSIDE the held `sessions`
|
||||||
|
/// write lock, never the reverse — matching `register_agent`'s acquisition order
|
||||||
|
/// — so there is no lock-order inversion. `pred` is synchronous, so no lock is
|
||||||
|
/// held across an `.await`. Does NOT call `remove_session` (which would re-take
|
||||||
|
/// the `sessions` lock and deadlock); it inlines the same purge.
|
||||||
|
async fn remove_session_if<F>(&self, session_id: SessionId, pred: F) -> bool
|
||||||
|
where
|
||||||
|
F: FnOnce(&SessionData) -> bool,
|
||||||
|
{
|
||||||
|
let mut sessions = self.sessions.write().await;
|
||||||
|
let Some(session_data) = sessions.get(&session_id) else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
if !pred(session_data) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Predicate still holds under the write lock — remove and purge indexes,
|
||||||
|
// mirroring remove_session exactly so no index entry leaks.
|
||||||
|
let agent_id = session_data.info.agent_id.clone();
|
||||||
|
sessions.remove(&session_id);
|
||||||
|
let mut agents = self.agents.write().await;
|
||||||
|
agents.remove(&agent_id);
|
||||||
|
drop(agents);
|
||||||
|
let mut machine_uids = self.machine_uids.write().await;
|
||||||
|
machine_uids.retain(|_, &mut sid| sid != session_id);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
|
||||||
/// Disconnect a session by sending a disconnect message to the agent
|
/// Disconnect a session by sending a disconnect message to the agent
|
||||||
/// Returns true if the message was sent successfully
|
/// Returns true if the message was sent successfully
|
||||||
pub async fn disconnect_session(&self, session_id: SessionId, reason: &str) -> bool {
|
pub async fn disconnect_session(&self, session_id: SessionId, reason: &str) -> bool {
|
||||||
@@ -737,6 +851,82 @@ impl SessionManager {
|
|||||||
self.remove_session(session_id).await;
|
self.remove_session(session_id).await;
|
||||||
Some(session_id)
|
Some(session_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Reap stale OFFLINE persistent sessions (v2-stable-identity Task 4).
|
||||||
|
///
|
||||||
|
/// Sweeps all sessions and removes those that are persistent, currently
|
||||||
|
/// OFFLINE, have NO viewer attached, and whose last heartbeat is older than
|
||||||
|
/// `ttl`. Age is measured on the monotonic `last_heartbeat_instant` (immune to
|
||||||
|
/// wall-clock changes), which is refreshed on every heartbeat/status update,
|
||||||
|
/// on (re)attach, and on session creation/restore — so a restored-offline row
|
||||||
|
/// becomes eligible only after `ttl` elapses past the restore, as the plan
|
||||||
|
/// permits.
|
||||||
|
///
|
||||||
|
/// Conservative by construction — a session is reaped ONLY if every condition
|
||||||
|
/// holds; when in doubt it is spared:
|
||||||
|
/// - ONLINE sessions are never reaped (a live agent owns its session).
|
||||||
|
/// - Viewer-attached sessions are never reaped (a tech is mid-session).
|
||||||
|
/// - Non-persistent (support-code) sessions are never reaped here; they are
|
||||||
|
/// torn down on disconnect by `mark_agent_disconnected`, not aged out.
|
||||||
|
///
|
||||||
|
/// Removal is routed through [`remove_session`](Self::remove_session) so BOTH
|
||||||
|
/// the `agents` (by `agent_id`) and `machine_uids` indexes are purged together,
|
||||||
|
/// never leaving a dangling index entry. Returns the number of sessions reaped.
|
||||||
|
pub async fn reap_stale_persistent(&self, ttl: std::time::Duration) -> usize {
|
||||||
|
// Snapshot the victims under a read lock, then remove them with the normal
|
||||||
|
// write path. We collect the identifying fields for logging up front so the
|
||||||
|
// log line is accurate even though the session is gone by the time we log.
|
||||||
|
let victims: Vec<(SessionId, String, Option<String>, std::time::Duration)> = {
|
||||||
|
let sessions = self.sessions.read().await;
|
||||||
|
sessions
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, data)| {
|
||||||
|
!data.info.is_online
|
||||||
|
&& data.info.is_persistent
|
||||||
|
&& data.viewers.is_empty()
|
||||||
|
&& data.last_heartbeat_instant.elapsed() > ttl
|
||||||
|
})
|
||||||
|
.map(|(id, data)| {
|
||||||
|
(
|
||||||
|
*id,
|
||||||
|
data.info.agent_id.clone(),
|
||||||
|
data.info.machine_uid.clone(),
|
||||||
|
data.last_heartbeat_instant.elapsed(),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut count = 0usize;
|
||||||
|
for (session_id, agent_id, machine_uid, age) in victims {
|
||||||
|
// Re-assert the reap predicate UNDER the write lock at removal time. The
|
||||||
|
// candidate was selected under a read lock that we have since dropped; in
|
||||||
|
// that window the agent could have reconnected (gone online / attached a
|
||||||
|
// viewer) or been heartbeated within the TTL. The guard spares a session
|
||||||
|
// that is no longer eligible. Only log/count when a removal actually
|
||||||
|
// happened. remove_session_if purges BOTH indexes like remove_session.
|
||||||
|
let removed = self
|
||||||
|
.remove_session_if(session_id, |data| {
|
||||||
|
!data.info.is_online
|
||||||
|
&& data.info.is_persistent
|
||||||
|
&& data.viewers.is_empty()
|
||||||
|
&& data.last_heartbeat_instant.elapsed() > ttl
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
if removed {
|
||||||
|
count += 1;
|
||||||
|
tracing::info!(
|
||||||
|
"Reaped stale offline persistent session {} (agent_id {}, machine_uid {:?}, idle {}s > ttl {}s)",
|
||||||
|
session_id,
|
||||||
|
agent_id,
|
||||||
|
machine_uid.as_deref().unwrap_or("none"),
|
||||||
|
age.as_secs(),
|
||||||
|
ttl.as_secs()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
count
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for SessionManager {
|
impl Default for SessionManager {
|
||||||
@@ -772,6 +962,7 @@ impl SessionManager {
|
|||||||
is_streaming: false,
|
is_streaming: false,
|
||||||
is_online: false, // Offline until agent reconnects
|
is_online: false, // Offline until agent reconnects
|
||||||
is_persistent: true,
|
is_persistent: true,
|
||||||
|
machine_uid: machine_uid.map(|s| s.to_string()),
|
||||||
consent_state: ConsentState::NotRequired, // managed/persistent
|
consent_state: ConsentState::NotRequired, // managed/persistent
|
||||||
last_heartbeat: now,
|
last_heartbeat: now,
|
||||||
os_version: None,
|
os_version: None,
|
||||||
@@ -823,6 +1014,42 @@ impl SessionManager {
|
|||||||
async fn is_machine_uid_indexed(&self, uid: &str) -> bool {
|
async fn is_machine_uid_indexed(&self, uid: &str) -> bool {
|
||||||
self.machine_uids.read().await.contains_key(uid)
|
self.machine_uids.read().await.contains_key(uid)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Test-only: backdate a session's monotonic heartbeat instant so the reaper
|
||||||
|
/// (Task 4) sees it as stale without a real sleep. Subtracts `age` from "now";
|
||||||
|
/// no-op if the session does not exist or `age` overflows the instant.
|
||||||
|
#[cfg(test)]
|
||||||
|
async fn set_last_heartbeat_age(&self, session_id: SessionId, age: std::time::Duration) {
|
||||||
|
let mut sessions = self.sessions.write().await;
|
||||||
|
if let Some(data) = sessions.get_mut(&session_id) {
|
||||||
|
if let Some(backdated) = Instant::now().checked_sub(age) {
|
||||||
|
data.last_heartbeat_instant = backdated;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test-only: mark a session OFFLINE without touching its viewers. The live
|
||||||
|
/// `mark_agent_disconnected` path clears viewers for persistent sessions, so
|
||||||
|
/// this lets a reaper test exercise the viewer/support guards in isolation
|
||||||
|
/// (an offline+stale session that still carries a viewer, or a support session).
|
||||||
|
#[cfg(test)]
|
||||||
|
async fn force_offline_for_test(&self, session_id: SessionId) {
|
||||||
|
let mut sessions = self.sessions.write().await;
|
||||||
|
if let Some(data) = sessions.get_mut(&session_id) {
|
||||||
|
data.info.is_online = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Test-only: record a `machine_uid` on an existing session's info without
|
||||||
|
/// going through the index. Lets the supersede test set up a legacy session
|
||||||
|
/// that later learned its uid, so the new-session supersede sweep can match it.
|
||||||
|
#[cfg(test)]
|
||||||
|
async fn set_machine_uid_for_test(&self, session_id: SessionId, uid: &str) {
|
||||||
|
let mut sessions = self.sessions.write().await;
|
||||||
|
if let Some(data) = sessions.get_mut(&session_id) {
|
||||||
|
data.info.machine_uid = Some(uid.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -1210,4 +1437,241 @@ mod tests {
|
|||||||
assert_eq!(sid2, sid, "restored machine must reattach by machine_uid");
|
assert_eq!(sid2, sid, "restored machine must reattach by machine_uid");
|
||||||
assert_eq!(mgr.list_sessions().await.len(), 1);
|
assert_eq!(mgr.list_sessions().await.len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- v2-stable-identity Task 4: session reaping + same-machine supersede ----
|
||||||
|
|
||||||
|
/// An OFFLINE persistent session whose last heartbeat is older than the TTL is
|
||||||
|
/// reaped. We backdate its monotonic heartbeat instant rather than sleep.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn offline_persistent_past_ttl_is_reaped() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let ttl = std::time::Duration::from_secs(600);
|
||||||
|
|
||||||
|
let (sid, _f, _i) = mgr
|
||||||
|
.register_agent("reap-agent".to_string(), "HOST-REAP".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
mgr.mark_agent_disconnected(sid).await; // now offline
|
||||||
|
// Age it well past the TTL.
|
||||||
|
mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(1200))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let reaped = mgr.reap_stale_persistent(ttl).await;
|
||||||
|
assert_eq!(reaped, 1, "a stale offline persistent session must be reaped");
|
||||||
|
assert!(mgr.get_session(sid).await.is_none());
|
||||||
|
assert_eq!(mgr.list_sessions().await.len(), 0);
|
||||||
|
// Index purged too: the agent_id no longer resolves.
|
||||||
|
assert!(mgr.get_session_by_agent("reap-agent").await.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An OFFLINE persistent session still WITHIN the TTL is spared.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn offline_persistent_within_ttl_is_spared() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let ttl = std::time::Duration::from_secs(600);
|
||||||
|
|
||||||
|
let (sid, _f, _i) = mgr
|
||||||
|
.register_agent("fresh-agent".to_string(), "HOST-FRESH".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
mgr.mark_agent_disconnected(sid).await;
|
||||||
|
// Aged, but not past the TTL.
|
||||||
|
mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(60))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let reaped = mgr.reap_stale_persistent(ttl).await;
|
||||||
|
assert_eq!(reaped, 0, "a within-TTL session must not be reaped");
|
||||||
|
assert!(mgr.get_session(sid).await.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An ONLINE session is NEVER reaped, even if its heartbeat instant is ancient.
|
||||||
|
/// (Online is the dominant guard: a live agent owns its session.)
|
||||||
|
#[tokio::test]
|
||||||
|
async fn online_session_is_never_reaped() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let ttl = std::time::Duration::from_secs(600);
|
||||||
|
|
||||||
|
let (sid, _f, _i) = mgr
|
||||||
|
.register_agent("online-agent".to_string(), "HOST-ON".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
// Still online (no disconnect). Backdate anyway to prove online wins.
|
||||||
|
mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(99999))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let reaped = mgr.reap_stale_persistent(ttl).await;
|
||||||
|
assert_eq!(reaped, 0, "an online session must never be reaped");
|
||||||
|
assert!(mgr.get_session(sid).await.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A viewer-attached session is spared even when offline AND stale — a tech is
|
||||||
|
/// mid-session. We mark it offline (preserving viewers) and age it out, then
|
||||||
|
/// assert the reaper leaves it alone.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn viewer_attached_session_is_spared_even_if_stale() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let ttl = std::time::Duration::from_secs(600);
|
||||||
|
|
||||||
|
let (sid, _f, _i) = mgr
|
||||||
|
.register_agent("viewed-agent".to_string(), "HOST-V".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
// Attach a viewer (managed session => NotRequired => join succeeds).
|
||||||
|
assert!(mgr
|
||||||
|
.join_session(sid, "viewer-1".to_string(), "Tech".to_string())
|
||||||
|
.await
|
||||||
|
.is_some());
|
||||||
|
|
||||||
|
// Force the session offline WITHOUT clearing its viewers, so we can prove
|
||||||
|
// the viewer guard (not the online guard) is what spares it. (The normal
|
||||||
|
// mark_agent_disconnected path clears viewers; here we test the guard in
|
||||||
|
// isolation by aging a viewer-bearing session directly.)
|
||||||
|
mgr.force_offline_for_test(sid).await;
|
||||||
|
mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(1200))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let reaped = mgr.reap_stale_persistent(ttl).await;
|
||||||
|
assert_eq!(reaped, 0, "a viewer-attached session must be spared");
|
||||||
|
assert!(mgr.get_session(sid).await.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A support (non-persistent) session is never reaped by the persistent reaper.
|
||||||
|
/// Support sessions are torn down on disconnect, not aged out here.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn support_session_is_never_reaped() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let ttl = std::time::Duration::from_secs(600);
|
||||||
|
|
||||||
|
// Support (attended) session: is_persistent = false.
|
||||||
|
let (sid, _f, _i) = mgr
|
||||||
|
.register_agent("support-agent".to_string(), "HOST-S".to_string(), false, None)
|
||||||
|
.await;
|
||||||
|
// Force offline + stale (a support session that lingered, hypothetically).
|
||||||
|
mgr.force_offline_for_test(sid).await;
|
||||||
|
mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(1200))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let reaped = mgr.reap_stale_persistent(ttl).await;
|
||||||
|
assert_eq!(reaped, 0, "the persistent reaper must never reap a support session");
|
||||||
|
assert!(mgr.get_session(sid).await.is_some());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Same-machine supersede gap (the one Task 2 does NOT cover): a machine first
|
||||||
|
/// connects as a LEGACY no-uid agent (its session is therefore NOT in the uid
|
||||||
|
/// reattach index), goes offline, then upgrades and reconnects with a FRESH
|
||||||
|
/// agent_id AND its uid. Task 2's reattach misses (no uid index entry, different
|
||||||
|
/// agent_id) and would strand the old session. Supersede collapses it so exactly
|
||||||
|
/// ONE live session remains for the physical machine.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn same_machine_reconnect_supersedes_stranded_legacy_session() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let uid = "muid-supersede";
|
||||||
|
|
||||||
|
// Legacy connect: NO uid reported. Session is indexed by agent_id only.
|
||||||
|
let (sid_old, _f1, _i1) = mgr
|
||||||
|
.register_agent("legacy-id".to_string(), "HOST-SUP".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
// The legacy session has no recorded uid, so set it as if a later status
|
||||||
|
// taught us the machine's uid while it was still the active session. This
|
||||||
|
// models the realistic upgrade path where the uid becomes known.
|
||||||
|
mgr.set_machine_uid_for_test(sid_old, uid).await;
|
||||||
|
mgr.mark_agent_disconnected(sid_old).await; // offline, viewers cleared
|
||||||
|
|
||||||
|
// Upgrade + config loss: reconnect with a FRESH agent_id AND the uid. Both
|
||||||
|
// reattach lookups miss (uid was never indexed; agent_id differs), so a NEW
|
||||||
|
// session is created — and supersede must remove the stranded old one.
|
||||||
|
let (sid_new, _f2, _i2) = mgr
|
||||||
|
.register_agent("fresh-id".to_string(), "HOST-SUP".to_string(), true, Some(uid))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert_ne!(sid_new, sid_old, "a fresh agent_id with an unindexed uid creates a new session");
|
||||||
|
// The stranded old session is gone; exactly one live session remains.
|
||||||
|
assert!(
|
||||||
|
mgr.get_session(sid_old).await.is_none(),
|
||||||
|
"the stranded same-machine session must be superseded"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
mgr.list_sessions().await.len(),
|
||||||
|
1,
|
||||||
|
"exactly one session must remain for the physical machine"
|
||||||
|
);
|
||||||
|
// The remaining session is the new one and resolves by its agent_id.
|
||||||
|
assert!(mgr.get_session_by_agent("fresh-id").await.is_some());
|
||||||
|
assert!(mgr.get_session_by_agent("legacy-id").await.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TOCTOU guard (Task 4 hardening): the reaper selects victims under a read
|
||||||
|
/// lock, drops it, then removes. If a selected session goes back ONLINE in that
|
||||||
|
/// window, the guarded removal must spare it. We can't interleave the real
|
||||||
|
/// read-then-remove here, but we exercise the removal predicate directly:
|
||||||
|
/// `remove_session_if` with the reap predicate must NOT remove a session that
|
||||||
|
/// is online (the state a reconnect would have produced) and must return false.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn guarded_remove_spares_session_that_went_online_after_selection() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let ttl = std::time::Duration::from_secs(600);
|
||||||
|
|
||||||
|
let (sid, _f, _i) = mgr
|
||||||
|
.register_agent("race-agent".to_string(), "HOST-RACE".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
// Make it a valid reap candidate first: offline + stale.
|
||||||
|
mgr.mark_agent_disconnected(sid).await;
|
||||||
|
mgr.set_last_heartbeat_age(sid, std::time::Duration::from_secs(1200))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Simulate the agent reconnecting in the snapshot->remove window: it is now
|
||||||
|
// ONLINE again (register_agent's reattach marks the session online).
|
||||||
|
let (sid_re, _f2, _i2) = mgr
|
||||||
|
.register_agent("race-agent".to_string(), "HOST-RACE".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
assert_eq!(sid_re, sid, "reconnect reattaches the same offline session");
|
||||||
|
assert_eq!(mgr.get_session(sid).await.map(|s| s.is_online), Some(true));
|
||||||
|
|
||||||
|
// The guarded removal re-checks the reap predicate under the write lock and
|
||||||
|
// must spare the now-online session.
|
||||||
|
let removed = mgr
|
||||||
|
.remove_session_if(sid, |data| {
|
||||||
|
!data.info.is_online
|
||||||
|
&& data.info.is_persistent
|
||||||
|
&& data.viewers.is_empty()
|
||||||
|
&& data.last_heartbeat_instant.elapsed() > ttl
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
assert!(!removed, "a session that went online must not be removed by the guard");
|
||||||
|
assert!(
|
||||||
|
mgr.get_session(sid).await.is_some(),
|
||||||
|
"the now-online session must survive the guarded removal"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Supersede must NEVER remove an ONLINE session for the same uid (defensive:
|
||||||
|
/// it only ever collapses OFFLINE strays). Task 2's reattach reuse only fires
|
||||||
|
/// for an OFFLINE prior session (`if !is_online`), so when the prior same-uid
|
||||||
|
/// session is still ONLINE a second concurrent connection falls through to a NEW
|
||||||
|
/// session — and supersede leaves the live one untouched. The invariant under
|
||||||
|
/// test is the one that matters: the online session SURVIVES.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn supersede_does_not_remove_online_same_machine_session() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let uid = "muid-online-keep";
|
||||||
|
|
||||||
|
// First session reports the uid and stays ONLINE.
|
||||||
|
let (sid_online, _f1, _i1) = mgr
|
||||||
|
.register_agent("agent-a".to_string(), "HOST-O".to_string(), true, Some(uid))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// A second connection arrives claiming the same uid while the first is still
|
||||||
|
// online. Reattach reuse is gated on the prior being offline, so this creates
|
||||||
|
// a NEW session; supersede must not have removed the live one.
|
||||||
|
let (sid_second, _f2, _i2) = mgr
|
||||||
|
.register_agent("agent-b".to_string(), "HOST-O".to_string(), true, Some(uid))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// The online session must still be present and resolvable — supersede never
|
||||||
|
// touches an online session.
|
||||||
|
assert!(
|
||||||
|
mgr.get_session(sid_online).await.is_some(),
|
||||||
|
"an online same-machine session must never be superseded"
|
||||||
|
);
|
||||||
|
assert_ne!(
|
||||||
|
sid_second, sid_online,
|
||||||
|
"a concurrent same-uid connection to a still-online session gets its own session"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user