From ffca7f0cee375dc7b4d922159d466049354cc906 Mon Sep 17 00:00:00 2001 From: Mike Swanson Date: Sun, 31 May 2026 12:06:50 -0700 Subject: [PATCH] feat(server): dedup machines on machine_uid (SPEC-004 Task 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Persist the agent-reported machine_uid and dedup connect_machines on it so a single physical machine can't register duplicate rows when its config-file agent_id regenerates (the ghost-session root cause). - migration 008: nullable connect_machines.machine_uid + partial unique index (WHERE machine_uid IS NOT NULL); idempotent, startup-applied. - upsert_machine: two-path dedup (ON CONFLICT machine_uid when present, else the legacy ON CONFLICT agent_id path, unchanged). - session reattach: a machine_uid index consulted before agent_id, with all removal paths purging it. - security: keyed (cak_) agents stay authoritative — their claimed machine_uid is dropped (effective_machine_uid=None); uid is dedup-only for un-keyed / support-code agents. Startup restore skips uid-indexing keyed machines and fails closed if the keyed-set query errors. 74 server tests pass; clippy clean. Implements specs/v2-stable-identity/plan.md Task 2. Co-Authored-By: Claude Opus 4.8 (1M context) --- server/migrations/008_machine_uid.sql | 41 +++ server/src/db/agent_keys.rs | 25 ++ server/src/db/machines.rs | 234 +++++++++++++++-- server/src/main.rs | 32 ++- server/src/relay/mod.rs | 54 +++- server/src/session/mod.rs | 361 +++++++++++++++++++++++--- 6 files changed, 697 insertions(+), 50 deletions(-) create mode 100644 server/migrations/008_machine_uid.sql diff --git a/server/migrations/008_machine_uid.sql b/server/migrations/008_machine_uid.sql new file mode 100644 index 0000000..8488031 --- /dev/null +++ b/server/migrations/008_machine_uid.sql @@ -0,0 +1,41 @@ +-- Migration: 008_machine_uid.sql +-- Purpose: Give connect_machines a deterministic, recomputable hardware identity +-- (machine_uid) and dedup registrations on it (SPEC-004 / v2-stable-identity +-- Task 2). +-- +-- Today connect_machines is keyed only on `agent_id` — a random UUID the agent +-- persists in its config (generate_agent_id()). A lost/missing config produces a +-- fresh UUID, and because upsert_machine dedups `ON CONFLICT (agent_id)`, that +-- fresh id inserts a NEW row: the duplicate-registration bug (15 rows for 5 real +-- hosts in production). The agent (Task 1) now derives a stable `machine_uid` from +-- durable hardware/OS identifiers (Windows MachineGuid, hashed; recomputable) and +-- reports it on the connect handshake and on AgentStatus. This migration persists +-- it and adds the uniqueness needed to dedup the un-keyed / shared-key / config-loss +-- fleet on that stable identity. +-- +-- SECURITY (SPEC-004): a client-asserted machine_uid is spoofable and is therefore +-- NOT a trust boundary. For per-agent (`cak_`) keyed agents the key's machine +-- binding stays authoritative (the server dedups those on their authenticated +-- agent_id, never on a claimed uid). machine_uid is a *correctness* aid for the +-- un-keyed path only. The UNIQUE index below enforces "one row per machine_uid" +-- at the schema level so a concurrent/racing un-keyed insert cannot create a +-- duplicate behind the application-level ON CONFLICT. +-- +-- Idempotent: ADD COLUMN IF NOT EXISTS + CREATE UNIQUE INDEX IF NOT EXISTS. The +-- column is NULLABLE: legacy rows and agents that do not report a uid (older agents, +-- some support-code clients) carry NULL, and the partial index excludes NULLs so any +-- number of un-keyed rows may coexist without a uid. Applied on server startup by +-- sqlx::migrate!(); never pre-applied via psql. Ordered after 007. +-- See .claude/standards/gururmm/sqlx-migrations.md. + +-- 1. machine_uid: deterministic hardware identity reported by the agent. NULLABLE +-- so legacy rows and non-reporting agents are unaffected. +ALTER TABLE connect_machines ADD COLUMN IF NOT EXISTS machine_uid TEXT; + +-- 2. Enforce one row per machine_uid, but ONLY for rows that actually have one. +-- A partial UNIQUE index (WHERE machine_uid IS NOT NULL) lets unlimited legacy +-- NULL rows coexist while making a non-null machine_uid a true dedup key — this +-- is what upsert_machine's `ON CONFLICT (machine_uid)` arbiter binds to. +CREATE UNIQUE INDEX IF NOT EXISTS idx_connect_machines_machine_uid + ON connect_machines (machine_uid) + WHERE machine_uid IS NOT NULL; diff --git a/server/src/db/agent_keys.rs b/server/src/db/agent_keys.rs index 574dcef..26874fa 100644 --- a/server/src/db/agent_keys.rs +++ b/server/src/db/agent_keys.rs @@ -12,6 +12,7 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use sqlx::PgPool; +use std::collections::HashSet; use uuid::Uuid; /// Per-agent key record from the database. @@ -142,3 +143,27 @@ pub async fn touch_last_used(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error> .await?; Ok(()) } + +/// Return the set of `connect_machines.id` values that are bound to at least one +/// active (non-revoked) per-agent `cak_` key. +/// +/// Used by the startup restore loop (`main.rs`) to enforce the same keyed/un-keyed +/// distinction the relay applies at connect time (SPEC-004 Task 2). For a KEYED +/// machine the key→machine binding is the authoritative identity, so its restored +/// session must NOT be indexed by a client-asserted `machine_uid` — otherwise an +/// un-keyed agent spoofing that uid could reattach the keyed machine's offline +/// session after a restart. Membership here lets the caller pass `machine_uid = +/// None` for keyed machines while still indexing un-keyed ones for legitimate +/// uid-based reattach. +pub async fn keyed_machine_ids(pool: &PgPool) -> Result, sqlx::Error> { + let ids = sqlx::query_scalar::<_, Uuid>( + r#" + SELECT DISTINCT machine_id + FROM connect_agent_keys + WHERE revoked_at IS NULL + "#, + ) + .fetch_all(pool) + .await?; + Ok(ids.into_iter().collect()) +} diff --git a/server/src/db/machines.rs b/server/src/db/machines.rs index 15ee14a..a51680f 100644 --- a/server/src/db/machines.rs +++ b/server/src/db/machines.rs @@ -50,6 +50,14 @@ pub struct Machine { /// impl) so it can never error at decode time. #[serde(default)] pub tags: Vec, + /// Deterministic, recomputable hardware identity reported by the agent + /// (`AgentStatus.machine_uid` / connect query param). Column added in migration + /// 008. NULLABLE: legacy rows and agents that do not report a uid carry `None`. + /// For un-keyed agents this is the dedup key (`upsert_machine` keys + /// `ON CONFLICT (machine_uid)` when present); for `cak_`-keyed agents the key's + /// machine binding stays authoritative and the claimed uid is NOT used to dedup + /// (see `upsert_machine`). + pub machine_uid: Option, } impl<'r> FromRow<'r, PgRow> for Machine { @@ -65,6 +73,8 @@ impl<'r> FromRow<'r, PgRow> for Machine { tenant_id: row.try_get("tenant_id")?, organization: row.try_get("organization")?, site: row.try_get("site")?, + // Schema-nullable (migration 008); decode directly as Option. + machine_uid: row.try_get("machine_uid")?, // Nullable-with-default columns mapped to non-`Option` Rust types: read as // `Option` and fall back to the type default so a NULL cell never errors. is_elevated: row @@ -97,29 +107,88 @@ impl<'r> FromRow<'r, PgRow> for Machine { } } -/// Get or create a machine by agent_id (upsert) +/// Get or create a machine record (upsert), deduplicating on the most stable +/// identity available (SPEC-004 / v2-stable-identity Task 2). +/// +/// Two dedup paths, selected by whether the caller passes a `machine_uid`: +/// +/// - **`machine_uid = Some(uid)` (the un-keyed dedup path):** key on the stable +/// hardware identity — `ON CONFLICT (machine_uid)`. The SAME machine reconnecting +/// with a DIFFERENT `agent_id` (e.g. after a config loss minted a fresh random id) +/// updates its EXISTING row instead of inserting a duplicate. `agent_id` and +/// `hostname` are refreshed to the latest reported values. This is what collapses +/// the duplicate-registration fleet down to one row per real machine. +/// +/// - **`machine_uid = None` (legacy / authoritative path):** preserve the original +/// behavior exactly — `ON CONFLICT (agent_id)`. Used for agents that do not report +/// a uid AND, critically, for `cak_`-keyed agents: the caller (`relay`) passes +/// `None` for keyed agents so their AUTHORITATIVE key-bound `agent_id` is the dedup +/// key and a client-claimed `machine_uid` can never repoint a keyed machine's row. +/// +/// SECURITY: a client-asserted `machine_uid` is spoofable, so it is a *correctness* +/// aid, not a trust boundary. Only the un-keyed path supplies it; the keyed path's +/// authority lives in the key→machine binding upstream (see +/// `relay::agent_ws_handler`), never here. pub async fn upsert_machine( pool: &PgPool, agent_id: &str, hostname: &str, is_persistent: bool, + machine_uid: Option<&str>, ) -> Result { - sqlx::query_as::<_, Machine>( - r#" - INSERT INTO connect_machines (agent_id, hostname, is_persistent, status, last_seen) - VALUES ($1, $2, $3, 'online', NOW()) - ON CONFLICT (agent_id) DO UPDATE SET - hostname = EXCLUDED.hostname, - status = 'online', - last_seen = NOW() - RETURNING * - "#, - ) - .bind(agent_id) - .bind(hostname) - .bind(is_persistent) - .fetch_one(pool) - .await + match machine_uid { + // Un-keyed dedup path: stable hardware identity is the conflict arbiter. + // A new agent_id for the same physical machine updates the existing row. + // + // Edge case: if this INSERT's new random `agent_id` happens to collide with a + // DIFFERENT legacy row's value under the `agent_id UNIQUE` constraint, Postgres + // raises the unique violation on `agent_id` BEFORE the `ON CONFLICT + // (machine_uid)` arbiter is consulted, so the upsert errors instead of merging + // on uid. This is non-fatal: the caller logs the error and the live in-memory + // session is unaffected, and the agent simply retries with a freshly minted + // UUID. The window closes on its own as legacy rows age out (SPEC-004 Task 3). + Some(uid) => { + sqlx::query_as::<_, Machine>( + r#" + INSERT INTO connect_machines (agent_id, hostname, is_persistent, status, last_seen, machine_uid) + VALUES ($1, $2, $3, 'online', NOW(), $4) + ON CONFLICT (machine_uid) DO UPDATE SET + agent_id = EXCLUDED.agent_id, + hostname = EXCLUDED.hostname, + status = 'online', + last_seen = NOW() + RETURNING * + "#, + ) + .bind(agent_id) + .bind(hostname) + .bind(is_persistent) + .bind(uid) + .fetch_one(pool) + .await + } + // Legacy / authoritative path: dedup on agent_id exactly as before. Leaves + // machine_uid NULL (the partial unique index excludes NULLs, so any number + // of these may coexist). + None => { + sqlx::query_as::<_, Machine>( + r#" + INSERT INTO connect_machines (agent_id, hostname, is_persistent, status, last_seen) + VALUES ($1, $2, $3, 'online', NOW()) + ON CONFLICT (agent_id) DO UPDATE SET + hostname = EXCLUDED.hostname, + status = 'online', + last_seen = NOW() + RETURNING * + "#, + ) + .bind(agent_id) + .bind(hostname) + .bind(is_persistent) + .fetch_one(pool) + .await + } + } } /// Update machine status and info @@ -239,3 +308,134 @@ pub async fn update_machine_metadata( .await?; Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::postgres::PgPoolOptions; + + /// Connect to a throwaway test Postgres and apply migrations, or return `None` + /// when `TEST_DATABASE_URL` is unset so the suite is a no-op on workstations + /// without a database. CI sets `TEST_DATABASE_URL` against an ephemeral Postgres, + /// where these run for real. (The server crate is Linux-targeted and validated + /// in Gitea CI; these DB tests run there.) + async fn test_pool() -> Option { + let url = std::env::var("TEST_DATABASE_URL").ok()?; + let pool = PgPoolOptions::new() + .max_connections(2) + .connect(&url) + .await + .expect("connect to TEST_DATABASE_URL"); + sqlx::migrate!("./migrations") + .run(&pool) + .await + .expect("apply migrations to the test database"); + Some(pool) + } + + /// Remove any rows this test created so reruns are clean and tests don't collide. + async fn cleanup(pool: &PgPool, agent_ids: &[&str], machine_uids: &[&str]) { + for id in agent_ids { + let _ = sqlx::query("DELETE FROM connect_machines WHERE agent_id = $1") + .bind(id) + .execute(pool) + .await; + } + for uid in machine_uids { + let _ = sqlx::query("DELETE FROM connect_machines WHERE machine_uid = $1") + .bind(uid) + .execute(pool) + .await; + } + } + + /// (a) Same `machine_uid` with two DIFFERENT `agent_id`s collapses to ONE row. + /// The second upsert updates the existing row (and repoints its agent_id) rather + /// than inserting a duplicate — the core dedup guarantee for the un-keyed fleet. + #[tokio::test] + async fn same_machine_uid_two_agent_ids_one_row() { + let Some(pool) = test_pool().await else { + return; // no TEST_DATABASE_URL: skip (runs in CI) + }; + let uid = "test-muid-dedup-001"; + cleanup(&pool, &["agent-A", "agent-B"], &[uid]).await; + + let m1 = upsert_machine(&pool, "agent-A", "HOST-A", true, Some(uid)) + .await + .expect("first upsert"); + let m2 = upsert_machine(&pool, "agent-B", "HOST-A2", true, Some(uid)) + .await + .expect("second upsert with same uid, different agent_id"); + + // Same physical row, agent_id and hostname refreshed to the latest. + assert_eq!(m1.id, m2.id, "same machine_uid must update the same row"); + assert_eq!(m2.agent_id, "agent-B"); + assert_eq!(m2.hostname, "HOST-A2"); + assert_eq!(m2.machine_uid.as_deref(), Some(uid)); + + // Exactly one row carries this uid. + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM connect_machines WHERE machine_uid = $1") + .bind(uid) + .fetch_one(&pool) + .await + .expect("count rows for uid"); + assert_eq!(count, 1, "must be exactly one row for the machine_uid"); + + cleanup(&pool, &["agent-A", "agent-B"], &[uid]).await; + } + + /// (b) Legacy NULL-`machine_uid` path is unchanged: dedup keys on `agent_id`, + /// the row's `machine_uid` stays NULL, and re-upserting the same agent_id with + /// no uid updates the same row (no crash, no duplicate). + #[tokio::test] + async fn legacy_null_machine_uid_dedups_on_agent_id() { + let Some(pool) = test_pool().await else { + return; // no TEST_DATABASE_URL: skip (runs in CI) + }; + let agent = "test-legacy-agent-001"; + cleanup(&pool, &[agent], &[]).await; + + let m1 = upsert_machine(&pool, agent, "LEGACY-HOST", true, None) + .await + .expect("legacy upsert (no uid)"); + assert_eq!(m1.machine_uid, None, "legacy row must have NULL machine_uid"); + + let m2 = upsert_machine(&pool, agent, "LEGACY-HOST-RENAMED", true, None) + .await + .expect("legacy re-upsert (no uid)"); + assert_eq!(m1.id, m2.id, "legacy agent_id must dedup to the same row"); + assert_eq!(m2.hostname, "LEGACY-HOST-RENAMED"); + assert_eq!(m2.machine_uid, None); + + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM connect_machines WHERE agent_id = $1") + .bind(agent) + .fetch_one(&pool) + .await + .expect("count legacy rows"); + assert_eq!(count, 1, "legacy path must not duplicate the row"); + + cleanup(&pool, &[agent], &[]).await; + } + + /// Multiple legacy rows with NULL machine_uid coexist — the partial unique index + /// excludes NULLs, so distinct un-keyed agents are independent rows. + #[tokio::test] + async fn multiple_null_machine_uid_rows_coexist() { + let Some(pool) = test_pool().await else { + return; // no TEST_DATABASE_URL: skip (runs in CI) + }; + cleanup(&pool, &["null-1", "null-2"], &[]).await; + + let a = upsert_machine(&pool, "null-1", "H1", true, None) + .await + .expect("first null-uid row"); + let b = upsert_machine(&pool, "null-2", "H2", true, None) + .await + .expect("second null-uid row"); + assert_ne!(a.id, b.id, "distinct legacy agents must be distinct rows"); + + cleanup(&pool, &["null-1", "null-2"], &[]).await; + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 374181f..1a3b28a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -245,9 +245,39 @@ async fn main() -> Result<()> { "Reconciling {} managed session(s) from database", machines.len() ); + // Machines bound to an active `cak_` key. For these the key→machine + // binding is authoritative (SPEC-004 Task 2), so we must NOT index a + // restored keyed session by its stored `machine_uid`: doing so would + // let an un-keyed agent spoofing that uid reattach the keyed machine's + // offline session after a restart. The connect path never writes a uid + // for keyed agents, so a non-NULL uid on a keyed row can only come from + // a legacy pre-keying row — but close the gap regardless. On query + // failure, fail closed (treat all machines as keyed: index none by uid) + // rather than risk indexing a keyed machine. + let keyed_ids = match db::agent_keys::keyed_machine_ids(db.pool()).await { + Ok(ids) => ids, + Err(e) => { + tracing::warn!( + "Could not load keyed-machine set; suppressing uid reattach index for all restored machines: {}", + e + ); + machines.iter().map(|m| m.id).collect() + } + }; for machine in machines { + // Keyed machines get None (uid reattach disabled); un-keyed + // machines keep their stored uid for legitimate reattach. + let restore_uid = if keyed_ids.contains(&machine.id) { + None + } else { + machine.machine_uid.as_deref() + }; sessions - .restore_offline_machine(&machine.agent_id, &machine.hostname) + .restore_offline_machine( + &machine.agent_id, + &machine.hostname, + restore_uid, + ) .await; } } diff --git a/server/src/relay/mod.rs b/server/src/relay/mod.rs index 4cc69c1..51b2021 100644 --- a/server/src/relay/mod.rs +++ b/server/src/relay/mod.rs @@ -74,6 +74,13 @@ pub struct AgentParams { /// API key for persistent (managed) agents #[serde(default)] api_key: Option, + /// Deterministic, recomputable hardware identity reported by the agent + /// (v2-stable-identity Task 1; `transport/websocket.rs`). Used to dedup + /// registrations for UN-KEYED agents only — see the security note where it is + /// consumed below. CLIENT-SUPPLIED and therefore spoofable: it is never used to + /// override a `cak_` key's authoritative machine binding. + #[serde(default)] + machine_uid: Option, } #[derive(Debug, Deserialize)] @@ -110,6 +117,12 @@ pub async fn agent_ws_handler( .unwrap_or_else(|| agent_id.clone()); let support_code = params.support_code.clone(); let api_key = params.api_key.clone(); + // CLIENT-SUPPLIED machine_uid (v2-stable-identity Task 1). Spoofable; see the + // security gate below. It is used for dedup ONLY on the un-keyed path; for + // `cak_`-keyed agents it is suppressed so the key's machine binding stays + // authoritative. `is_keyed_agent` records which path authenticated us. + let claimed_machine_uid = params.machine_uid.clone(); + let mut is_keyed_agent = false; // Real client IP via the trusted-proxy-aware extractor (shared with the rate // limiter and audit log). Behind NPM on loopback, `addr.ip()` is 127.0.0.1; // this resolves the actual remote agent IP from forwarding headers when the @@ -268,6 +281,11 @@ pub async fn agent_ws_handler( ); } agent_id = trusted_agent_id; + // KEYED agent: the key→machine binding is AUTHORITATIVE. Suppress + // the client-claimed machine_uid for dedup (below) so a valid key + // for machine X can never repoint machine Y's row by claiming Y's + // uid. Dedup for this agent stays on its authenticated agent_id. + is_keyed_agent = true; info!( "Agent {} from {} authenticated via per-agent key", agent_id, client_ip @@ -324,6 +342,20 @@ pub async fn agent_ws_handler( let support_codes = state.support_codes.clone(); let db = state.db.clone(); + // SECURITY GATE for machine_uid dedup (v2-stable-identity Task 2): + // - KEYED (`cak_`) agents: dedup stays on the authenticated agent_id; the + // claimed uid is DROPPED so it cannot override the key's machine binding. + // - UN-KEYED agents (support-code, deprecated shared key, no auth-identity): + // the claimed uid is a dedup-only correctness aid and is passed through. + // A client-asserted machine_uid is spoofable, so it must never be the dedup key + // for a keyed agent. This is the only place the distinction is enforced. + let effective_machine_uid = if is_keyed_agent { + None + } else { + // Treat an empty/whitespace-only claim as absent. + claimed_machine_uid.filter(|u| !u.trim().is_empty()) + }; + // Bounded relay: cap inbound frame/message size before the socket is upgraded // so oversized agent frames are rejected by the WS layer, never `to_vec()`'d // and broadcast. (WS-OOM HIGH.) @@ -340,6 +372,7 @@ pub async fn agent_ws_handler( agent_id, agent_name, support_code, + effective_machine_uid, Some(client_ip), ) })) @@ -548,6 +581,10 @@ async fn handle_agent_connection( agent_id: String, agent_name: String, support_code: Option, + // Dedup identity for the UN-KEYED path only (already security-gated to `None` + // for `cak_`-keyed agents by `agent_ws_handler`). Drives both the in-memory + // session reattach key and the DB `ON CONFLICT (machine_uid)` upsert. + machine_uid: Option, client_ip: Option, ) { info!( @@ -581,14 +618,27 @@ async fn handle_agent_connection( // Persistent agents (no support code) keep their session when disconnected let is_persistent = support_code.is_none(); let (session_id, frame_tx, mut input_rx) = sessions - .register_agent(agent_id.clone(), agent_name.clone(), is_persistent) + .register_agent( + agent_id.clone(), + agent_name.clone(), + is_persistent, + machine_uid.as_deref(), + ) .await; info!("Session created: {} (agent in idle mode)", session_id); // Database: upsert machine and create session record let _machine_id = if let Some(ref db) = db { - match db::machines::upsert_machine(db.pool(), &agent_id, &agent_name, is_persistent).await { + match db::machines::upsert_machine( + db.pool(), + &agent_id, + &agent_name, + is_persistent, + machine_uid.as_deref(), + ) + .await + { Ok(machine) => { // Create session record let _ = db::sessions::create_session( diff --git a/server/src/session/mod.rs b/server/src/session/mod.rs index 8eac258..104e518 100644 --- a/server/src/session/mod.rs +++ b/server/src/session/mod.rs @@ -174,6 +174,17 @@ struct SessionData { pub struct SessionManager { sessions: Arc>>, agents: Arc>>, + /// Reattach index keyed by the stable hardware identity (`machine_uid`), for + /// the UN-KEYED dedup path (v2-stable-identity Task 2). Populated only when an + /// agent reports a (security-gated) machine_uid. Consulted BEFORE the + /// `agent_id` index on reconnect, so the SAME physical machine reconnecting + /// with a FRESH random agent_id (config loss) reattaches to its existing + /// session instead of stranding it and creating a duplicate. + /// + /// Keyed (`cak_`) agents are passed `machine_uid = None` (the relay suppresses + /// the claimed uid for them), so they never appear here — their reattach stays + /// on the authoritative `agent_id` index, preserving the key's machine binding. + machine_uids: Arc>>, } impl SessionManager { @@ -181,43 +192,97 @@ impl SessionManager { Self { sessions: Arc::new(RwLock::new(HashMap::new())), agents: Arc::new(RwLock::new(HashMap::new())), + machine_uids: Arc::new(RwLock::new(HashMap::new())), } } - /// Register a new agent and create a session - /// If agent was previously connected (offline session exists), reuse that session + /// Register a new agent and create a session. + /// + /// Reattach (reconnect to an existing offline session) prefers the stable + /// `machine_uid` identity when one is supplied, falling back to `agent_id` + /// (v2-stable-identity Task 2): + /// + /// - `machine_uid = Some(uid)` (the UN-KEYED dedup path): look up the prior + /// session by `uid` FIRST. The same physical machine reconnecting with a + /// FRESH random `agent_id` (config loss) reattaches to its existing session + /// rather than stranding it and minting a duplicate. On reattach the session's + /// `agent_id` is repointed to the current one and the `agents` index is fixed + /// up to match. + /// - `machine_uid = None` (legacy / keyed path): behave exactly as before — + /// reattach by `agent_id` only. The relay passes `None` for `cak_`-keyed + /// agents so their authoritative key-bound identity drives reattach and a + /// spoofable claimed uid can never seize another machine's session. pub async fn register_agent( &self, agent_id: AgentId, agent_name: String, is_persistent: bool, + machine_uid: Option<&str>, ) -> (SessionId, FrameSender, InputReceiver) { - // Check if this agent already has an offline session (reconnecting) - { - let agents = self.agents.read().await; - if let Some(&existing_session_id) = agents.get(&agent_id) { - let mut sessions = self.sessions.write().await; - if let Some(session_data) = sessions.get_mut(&existing_session_id) { - if !session_data.info.is_online { - // Reuse existing session - mark as online and create new channels - tracing::info!( - "Agent {} reconnecting to existing session {}", - agent_id, - existing_session_id - ); + // Resolve a prior offline session to reattach to. Prefer the stable + // machine_uid identity; fall back to agent_id. `reattach_via_uid` records + // whether we matched on uid so we can fix up the agent_id index below. + let (existing_session_id, reattach_via_uid) = { + let by_uid = match machine_uid { + Some(uid) => self.machine_uids.read().await.get(uid).copied(), + None => None, + }; + match by_uid { + Some(sid) => (Some(sid), true), + None => (self.agents.read().await.get(&agent_id).copied(), false), + } + }; - let (frame_tx, _) = broadcast::channel(16); - let (input_tx, input_rx) = tokio::sync::mpsc::channel(64); + if let Some(existing_session_id) = existing_session_id { + let mut sessions = self.sessions.write().await; + if let Some(session_data) = sessions.get_mut(&existing_session_id) { + if !session_data.info.is_online { + // Reuse existing session - mark as online and create new channels + tracing::info!( + "Agent {} reconnecting to existing session {} (matched on {})", + agent_id, + existing_session_id, + if reattach_via_uid { + "machine_uid" + } else { + "agent_id" + } + ); - session_data.info.is_online = true; - session_data.info.last_heartbeat = chrono::Utc::now(); - session_data.info.agent_name = agent_name; // Update name in case it changed - session_data.frame_tx = frame_tx.clone(); - session_data.input_tx = input_tx; - session_data.last_heartbeat_instant = Instant::now(); + let (frame_tx, _) = broadcast::channel(16); + let (input_tx, input_rx) = tokio::sync::mpsc::channel(64); - return (existing_session_id, frame_tx, input_rx); + // If we matched on machine_uid, the agent_id may have changed + // (config loss minted a fresh one). Repoint the session and fix + // up the agent_id index: drop the stale agent_id mapping (only + // if it still pointed at THIS session) and add the current one. + let previous_agent_id = session_data.info.agent_id.clone(); + if reattach_via_uid && previous_agent_id != agent_id { + let mut agents = self.agents.write().await; + if agents.get(&previous_agent_id) == Some(&existing_session_id) { + agents.remove(&previous_agent_id); + } + agents.insert(agent_id.clone(), existing_session_id); + session_data.info.agent_id = agent_id.clone(); } + + session_data.info.is_online = true; + session_data.info.last_heartbeat = chrono::Utc::now(); + session_data.info.agent_name = agent_name; // Update name in case it changed + session_data.frame_tx = frame_tx.clone(); + session_data.input_tx = input_tx; + session_data.last_heartbeat_instant = Instant::now(); + + // Ensure the machine_uid index points at this session (it may be + // the first time we learn this uid for an agent_id-restored row). + if let Some(uid) = machine_uid { + self.machine_uids + .write() + .await + .insert(uid.to_string(), existing_session_id); + } + + return (existing_session_id, frame_tx, input_rx); } } } @@ -274,6 +339,15 @@ impl SessionManager { let mut agents = self.agents.write().await; agents.insert(agent_id, session_id); + // Index the new session by its stable machine_uid (un-keyed path) so a later + // reconnect with a different agent_id reattaches here instead of duplicating. + if let Some(uid) = machine_uid { + self.machine_uids + .write() + .await + .insert(uid.to_string(), session_id); + } + (session_id, frame_tx, input_rx) } @@ -561,6 +635,11 @@ impl SessionManager { drop(sessions); // Release sessions lock before acquiring agents lock let mut agents = self.agents.write().await; agents.remove(&agent_id); + drop(agents); + // Purge any machine_uid index entry for this removed support session + // so a stale uid can never reattach it. + let mut machine_uids = self.machine_uids.write().await; + machine_uids.retain(|_, &mut sid| sid != session_id); tracing::info!("Support session {} removed", session_id); } } @@ -573,6 +652,12 @@ impl SessionManager { drop(sessions); let mut agents = self.agents.write().await; agents.remove(&session_data.info.agent_id); + drop(agents); + // Purge any machine_uid index entry pointing at this session so a stale + // uid can never reattach a removed session. The map holds one entry per + // real machine, so a value scan is cheap. + let mut machine_uids = self.machine_uids.write().await; + machine_uids.retain(|_, &mut sid| sid != session_id); } } @@ -661,8 +746,19 @@ impl Default for SessionManager { } impl SessionManager { - /// Restore a machine as an offline session (called on startup from database) - pub async fn restore_offline_machine(&self, agent_id: &str, hostname: &str) -> SessionId { + /// Restore a machine as an offline session (called on startup from database). + /// + /// `machine_uid` is the row's stored stable identity (migration 008), or `None` + /// for legacy rows. When present it is indexed so a reconnecting un-keyed agent + /// reattaches to this restored session by `machine_uid` even if it presents a + /// fresh `agent_id` (config loss) after a server restart — matching the live + /// reattach path in `register_agent`. + pub async fn restore_offline_machine( + &self, + agent_id: &str, + hostname: &str, + machine_uid: Option<&str>, + ) -> SessionId { let session_id = Uuid::new_v4(); let now = chrono::Utc::now(); @@ -707,10 +803,26 @@ impl SessionManager { let mut agents = self.agents.write().await; agents.insert(agent_id.to_string(), session_id); + drop(agents); + + if let Some(uid) = machine_uid { + self.machine_uids + .write() + .await + .insert(uid.to_string(), session_id); + } tracing::info!("Restored offline machine: {} ({})", hostname, agent_id); session_id } + + /// Test-only: report whether a `machine_uid` is present in the reattach index. + /// Lets the restore tests assert the keyed/un-keyed indexing contract that the + /// startup loop in `main.rs` depends on (SPEC-004 Task 2 / L1 hardening). + #[cfg(test)] + async fn is_machine_uid_indexed(&self, uid: &str) -> bool { + self.machine_uids.read().await.contains_key(uid) + } } #[cfg(test)] @@ -733,13 +845,46 @@ mod tests { assert!(!ConsentState::Denied.allows_viewer()); } + /// L1 hardening (SPEC-004 Task 2): the startup restore loop hands `machine_uid = + /// None` to `restore_offline_machine` for KEYED machines, so a restored keyed + /// session is NEVER reachable via the uid reattach index — closing the spoofing + /// path where an un-keyed agent could reattach a keyed machine's offline session + /// by claiming its uid. This asserts the index-control contract the loop relies on. + #[tokio::test] + async fn keyed_machine_restored_with_none_is_not_uid_indexed() { + let mgr = SessionManager::new(); + // Mirrors the loop's decision for a keyed machine: pass None even if the row + // carries a (legacy) machine_uid. + let legacy_uid = "muid-keyed-legacy"; + mgr.restore_offline_machine("agent-keyed", "HOST-KEYED", None) + .await; + assert!( + !mgr.is_machine_uid_indexed(legacy_uid).await, + "a keyed machine restored with None must not be reachable by machine_uid" + ); + } + + /// Counterpart: an UN-keyed machine restored WITH its uid IS indexed, so a + /// reconnecting un-keyed agent can legitimately reattach by stable identity. + #[tokio::test] + async fn unkeyed_machine_restored_with_uid_is_indexed() { + let mgr = SessionManager::new(); + let uid = "muid-unkeyed"; + mgr.restore_offline_machine("agent-unkeyed", "HOST-UNKEYED", Some(uid)) + .await; + assert!( + mgr.is_machine_uid_indexed(uid).await, + "an un-keyed machine restored with its uid must be reattachable by uid" + ); + } + #[tokio::test] async fn attended_session_starts_pending_and_blocks_viewer_until_granted() { let mgr = SessionManager::new(); // Attended (support-code) session: is_persistent = false. let (session_id, _frame_tx, _input_rx) = mgr - .register_agent("agent-att".to_string(), "Attended PC".to_string(), false) + .register_agent("agent-att".to_string(), "Attended PC".to_string(), false, None) .await; // Starts Pending. @@ -771,7 +916,7 @@ mod tests { // Managed/persistent session: is_persistent = true. let (session_id, _frame_tx, _input_rx) = mgr - .register_agent("agent-mgd".to_string(), "Managed PC".to_string(), true) + .register_agent("agent-mgd".to_string(), "Managed PC".to_string(), true, None) .await; assert_eq!( @@ -861,7 +1006,7 @@ mod tests { async fn agent_status_updates_h264_capability() { let mgr = SessionManager::new(); let (session_id, _frame_tx, _input_rx) = mgr - .register_agent("agent-cap".to_string(), "Cap PC".to_string(), true) + .register_agent("agent-cap".to_string(), "Cap PC".to_string(), true, None) .await; // Default is false until a status reports capability. @@ -895,7 +1040,7 @@ mod tests { async fn denied_attended_session_keeps_viewer_blocked() { let mgr = SessionManager::new(); let (session_id, _frame_tx, _input_rx) = mgr - .register_agent("agent-deny".to_string(), "Deny PC".to_string(), false) + .register_agent("agent-deny".to_string(), "Deny PC".to_string(), false, None) .await; mgr.set_consent_state(session_id, ConsentState::Denied) @@ -909,4 +1054,160 @@ mod tests { .await .is_none()); } + + // ---- v2-stable-identity Task 2: machine_uid reattach (un-keyed dedup path) ---- + + /// The SAME physical machine (same `machine_uid`) reconnecting with a DIFFERENT + /// `agent_id` — exactly the config-loss / fresh-UUID case that produced the + /// duplicate rows — must reattach to its existing session, NOT create a second. + #[tokio::test] + async fn same_machine_uid_different_agent_id_reattaches_one_session() { + let mgr = SessionManager::new(); + let uid = "muid-abc"; + + // First connect: agent_id #1, persistent (managed). + let (sid1, _f1, _i1) = mgr + .register_agent("agent-uuid-1".to_string(), "HOST-A".to_string(), true, Some(uid)) + .await; + + // Agent disconnects; persistent session is preserved offline. + mgr.mark_agent_disconnected(sid1).await; + assert_eq!( + mgr.get_session(sid1).await.map(|s| s.is_online), + Some(false), + "persistent session must be preserved offline after disconnect" + ); + + // Reconnect with a FRESH agent_id but the SAME machine_uid (config loss). + let (sid2, _f2, _i2) = mgr + .register_agent("agent-uuid-2".to_string(), "HOST-A".to_string(), true, Some(uid)) + .await; + + // Reattached to the SAME session, now back online. + assert_eq!(sid2, sid1, "same machine_uid must reattach the existing session"); + assert_eq!(mgr.get_session(sid1).await.map(|s| s.is_online), Some(true)); + + // Exactly ONE session total — no duplicate was created. + assert_eq!(mgr.list_sessions().await.len(), 1, "must not duplicate the session"); + + // The session's agent_id was repointed to the current one, and the agent_id + // index resolves the new id (and no longer the stale one). + assert_eq!( + mgr.get_session(sid1).await.map(|s| s.agent_id), + Some("agent-uuid-2".to_string()) + ); + assert!(mgr.get_session_by_agent("agent-uuid-2").await.is_some()); + assert!( + mgr.get_session_by_agent("agent-uuid-1").await.is_none(), + "stale agent_id must no longer resolve after machine_uid reattach" + ); + } + + /// Legacy path: agents that report NO machine_uid keep the original behavior — + /// reattach keys on `agent_id`, and a different agent_id with no uid is a + /// genuinely different machine (separate session). No crash, no regression. + #[tokio::test] + async fn legacy_no_machine_uid_dedups_on_agent_id() { + let mgr = SessionManager::new(); + + // Connect with NO machine_uid (legacy agent), persistent. + let (sid1, _f1, _i1) = mgr + .register_agent("legacy-agent".to_string(), "HOST-L".to_string(), true, None) + .await; + mgr.mark_agent_disconnected(sid1).await; + + // Same agent_id, still no uid -> reattaches by agent_id (original behavior). + let (sid2, _f2, _i2) = mgr + .register_agent("legacy-agent".to_string(), "HOST-L".to_string(), true, None) + .await; + assert_eq!(sid2, sid1, "legacy agent_id reattach must be unchanged"); + assert_eq!(mgr.list_sessions().await.len(), 1); + + // A DIFFERENT legacy agent_id (no uid) is a distinct machine -> new session. + let (sid3, _f3, _i3) = mgr + .register_agent("legacy-other".to_string(), "HOST-M".to_string(), true, None) + .await; + assert_ne!(sid3, sid1); + assert_eq!(mgr.list_sessions().await.len(), 2); + } + + /// A keyed agent is modeled here by the relay passing `machine_uid = None` + /// (the security gate suppresses the claimed uid). Its reattach therefore keys + /// on the authoritative `agent_id` only — a second machine cannot seize its + /// session by claiming the same uid, because no uid index entry exists for it. + #[tokio::test] + async fn keyed_agent_modeled_as_none_reattaches_on_agent_id_only() { + let mgr = SessionManager::new(); + + // Keyed agent: relay passes None for machine_uid. + let (sid1, _f1, _i1) = mgr + .register_agent("keyed-machine".to_string(), "HOST-K".to_string(), true, None) + .await; + mgr.mark_agent_disconnected(sid1).await; + + // Another connection CLAIMS a uid (as an un-keyed agent would). Because the + // keyed session was indexed by agent_id only (uid suppressed upstream), + // this claim cannot reattach the keyed session — it gets its own. + let (sid2, _f2, _i2) = mgr + .register_agent( + "attacker-agent".to_string(), + "EVIL".to_string(), + false, + Some("muid-claimed"), + ) + .await; + assert_ne!( + sid2, sid1, + "a claimed machine_uid must not reattach a keyed agent's session" + ); + + // The keyed agent reconnects (relay again passes None) and reattaches its OWN + // session by agent_id. + let (sid3, _f3, _i3) = mgr + .register_agent("keyed-machine".to_string(), "HOST-K".to_string(), true, None) + .await; + assert_eq!(sid3, sid1, "keyed agent reattaches on its authoritative agent_id"); + } + + /// Removing a session purges its machine_uid index entry so a later reconnect + /// with the same uid starts fresh instead of pointing at a dead session. + #[tokio::test] + async fn removing_session_purges_machine_uid_index() { + let mgr = SessionManager::new(); + let uid = "muid-purge"; + + let (sid1, _f1, _i1) = mgr + .register_agent("purge-agent".to_string(), "HOST-P".to_string(), true, Some(uid)) + .await; + mgr.remove_session(sid1).await; + assert!(mgr.get_session(sid1).await.is_none()); + + // Reconnect with the same uid -> a brand-new session (no stale reattach). + let (sid2, _f2, _i2) = mgr + .register_agent("purge-agent-2".to_string(), "HOST-P".to_string(), true, Some(uid)) + .await; + assert_ne!(sid2, sid1, "removed session's uid must not reattach"); + assert_eq!(mgr.list_sessions().await.len(), 1); + } + + /// After a server restart, a restored offline row indexed by machine_uid must + /// let an un-keyed agent reattach by uid even if it presents a fresh agent_id. + #[tokio::test] + async fn restored_offline_machine_reattaches_by_machine_uid() { + let mgr = SessionManager::new(); + let uid = "muid-restored"; + + // Simulate startup reconcile: restore an offline persistent machine with uid. + let sid = mgr + .restore_offline_machine("orig-agent", "HOST-R", Some(uid)) + .await; + assert_eq!(mgr.get_session(sid).await.map(|s| s.is_online), Some(false)); + + // Agent reconnects post-restart with a FRESH agent_id but the same uid. + let (sid2, _f2, _i2) = mgr + .register_agent("fresh-agent".to_string(), "HOST-R".to_string(), true, Some(uid)) + .await; + assert_eq!(sid2, sid, "restored machine must reattach by machine_uid"); + assert_eq!(mgr.list_sessions().await.len(), 1); + } }