feat(server): dedup machines on machine_uid (SPEC-004 Task 2)
Some checks failed
Build and Test / Build Server (Linux) (push) Failing after 3m48s
Build and Test / Build Agent (Windows) (push) Successful in 7m34s
Build and Test / Security Audit (push) Successful in 4m44s
Build and Test / Build Summary (push) Has been skipped

Persist the agent-reported machine_uid and dedup connect_machines on it so a
single physical machine can't register duplicate rows when its config-file
agent_id regenerates (the ghost-session root cause).

- migration 008: nullable connect_machines.machine_uid + partial unique index
  (WHERE machine_uid IS NOT NULL); idempotent, startup-applied.
- upsert_machine: two-path dedup (ON CONFLICT machine_uid when present, else
  the legacy ON CONFLICT agent_id path, unchanged).
- session reattach: a machine_uid index consulted before agent_id, with all
  removal paths purging it.
- security: keyed (cak_) agents stay authoritative — their claimed machine_uid
  is dropped (effective_machine_uid=None); uid is dedup-only for un-keyed /
  support-code agents. Startup restore skips uid-indexing keyed machines and
  fails closed if the keyed-set query errors.

74 server tests pass; clippy clean. Implements specs/v2-stable-identity/plan.md Task 2.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-31 12:06:50 -07:00
parent 97780304e7
commit ffca7f0cee
6 changed files with 697 additions and 50 deletions

View File

@@ -0,0 +1,41 @@
-- Migration: 008_machine_uid.sql
-- Purpose: Give connect_machines a deterministic, recomputable hardware identity
-- (machine_uid) and dedup registrations on it (SPEC-004 / v2-stable-identity
-- Task 2).
--
-- Today connect_machines is keyed only on `agent_id` — a random UUID the agent
-- persists in its config (generate_agent_id()). A lost/missing config produces a
-- fresh UUID, and because upsert_machine dedups `ON CONFLICT (agent_id)`, that
-- fresh id inserts a NEW row: the duplicate-registration bug (15 rows for 5 real
-- hosts in production). The agent (Task 1) now derives a stable `machine_uid` from
-- durable hardware/OS identifiers (Windows MachineGuid, hashed; recomputable) and
-- reports it on the connect handshake and on AgentStatus. This migration persists
-- it and adds the uniqueness needed to dedup the un-keyed / shared-key / config-loss
-- fleet on that stable identity.
--
-- SECURITY (SPEC-004): a client-asserted machine_uid is spoofable and is therefore
-- NOT a trust boundary. For per-agent (`cak_`) keyed agents the key's machine
-- binding stays authoritative (the server dedups those on their authenticated
-- agent_id, never on a claimed uid). machine_uid is a *correctness* aid for the
-- un-keyed path only. The UNIQUE index below enforces "one row per machine_uid"
-- at the schema level so a concurrent/racing un-keyed insert cannot create a
-- duplicate behind the application-level ON CONFLICT.
--
-- Idempotent: ADD COLUMN IF NOT EXISTS + CREATE UNIQUE INDEX IF NOT EXISTS. The
-- column is NULLABLE: legacy rows and agents that do not report a uid (older agents,
-- some support-code clients) carry NULL, and the partial index excludes NULLs so any
-- number of un-keyed rows may coexist without a uid. Applied on server startup by
-- sqlx::migrate!(); never pre-applied via psql. Ordered after 007.
-- See .claude/standards/gururmm/sqlx-migrations.md.
-- 1. machine_uid: deterministic hardware identity reported by the agent. NULLABLE
-- so legacy rows and non-reporting agents are unaffected.
ALTER TABLE connect_machines ADD COLUMN IF NOT EXISTS machine_uid TEXT;
-- 2. Enforce one row per machine_uid, but ONLY for rows that actually have one.
-- A partial UNIQUE index (WHERE machine_uid IS NOT NULL) lets unlimited legacy
-- NULL rows coexist while making a non-null machine_uid a true dedup key — this
-- is what upsert_machine's `ON CONFLICT (machine_uid)` arbiter binds to.
CREATE UNIQUE INDEX IF NOT EXISTS idx_connect_machines_machine_uid
ON connect_machines (machine_uid)
WHERE machine_uid IS NOT NULL;

View File

@@ -12,6 +12,7 @@
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use std::collections::HashSet;
use uuid::Uuid;
/// Per-agent key record from the database.
@@ -142,3 +143,27 @@ pub async fn touch_last_used(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error>
.await?;
Ok(())
}
/// Return the set of `connect_machines.id` values that are bound to at least one
/// active (non-revoked) per-agent `cak_` key.
///
/// Used by the startup restore loop (`main.rs`) to enforce the same keyed/un-keyed
/// distinction the relay applies at connect time (SPEC-004 Task 2). For a KEYED
/// machine the key→machine binding is the authoritative identity, so its restored
/// session must NOT be indexed by a client-asserted `machine_uid` — otherwise an
/// un-keyed agent spoofing that uid could reattach the keyed machine's offline
/// session after a restart. Membership here lets the caller pass `machine_uid =
/// None` for keyed machines while still indexing un-keyed ones for legitimate
/// uid-based reattach.
pub async fn keyed_machine_ids(pool: &PgPool) -> Result<HashSet<Uuid>, sqlx::Error> {
let ids = sqlx::query_scalar::<_, Uuid>(
r#"
SELECT DISTINCT machine_id
FROM connect_agent_keys
WHERE revoked_at IS NULL
"#,
)
.fetch_all(pool)
.await?;
Ok(ids.into_iter().collect())
}

View File

@@ -50,6 +50,14 @@ pub struct Machine {
/// impl) so it can never error at decode time.
#[serde(default)]
pub tags: Vec<String>,
/// Deterministic, recomputable hardware identity reported by the agent
/// (`AgentStatus.machine_uid` / connect query param). Column added in migration
/// 008. NULLABLE: legacy rows and agents that do not report a uid carry `None`.
/// For un-keyed agents this is the dedup key (`upsert_machine` keys
/// `ON CONFLICT (machine_uid)` when present); for `cak_`-keyed agents the key's
/// machine binding stays authoritative and the claimed uid is NOT used to dedup
/// (see `upsert_machine`).
pub machine_uid: Option<String>,
}
impl<'r> FromRow<'r, PgRow> for Machine {
@@ -65,6 +73,8 @@ impl<'r> FromRow<'r, PgRow> for Machine {
tenant_id: row.try_get("tenant_id")?,
organization: row.try_get("organization")?,
site: row.try_get("site")?,
// Schema-nullable (migration 008); decode directly as Option.
machine_uid: row.try_get("machine_uid")?,
// Nullable-with-default columns mapped to non-`Option` Rust types: read as
// `Option<T>` and fall back to the type default so a NULL cell never errors.
is_elevated: row
@@ -97,29 +107,88 @@ impl<'r> FromRow<'r, PgRow> for Machine {
}
}
/// Get or create a machine by agent_id (upsert)
/// Get or create a machine record (upsert), deduplicating on the most stable
/// identity available (SPEC-004 / v2-stable-identity Task 2).
///
/// Two dedup paths, selected by whether the caller passes a `machine_uid`:
///
/// - **`machine_uid = Some(uid)` (the un-keyed dedup path):** key on the stable
/// hardware identity — `ON CONFLICT (machine_uid)`. The SAME machine reconnecting
/// with a DIFFERENT `agent_id` (e.g. after a config loss minted a fresh random id)
/// updates its EXISTING row instead of inserting a duplicate. `agent_id` and
/// `hostname` are refreshed to the latest reported values. This is what collapses
/// the duplicate-registration fleet down to one row per real machine.
///
/// - **`machine_uid = None` (legacy / authoritative path):** preserve the original
/// behavior exactly — `ON CONFLICT (agent_id)`. Used for agents that do not report
/// a uid AND, critically, for `cak_`-keyed agents: the caller (`relay`) passes
/// `None` for keyed agents so their AUTHORITATIVE key-bound `agent_id` is the dedup
/// key and a client-claimed `machine_uid` can never repoint a keyed machine's row.
///
/// SECURITY: a client-asserted `machine_uid` is spoofable, so it is a *correctness*
/// aid, not a trust boundary. Only the un-keyed path supplies it; the keyed path's
/// authority lives in the key→machine binding upstream (see
/// `relay::agent_ws_handler`), never here.
pub async fn upsert_machine(
pool: &PgPool,
agent_id: &str,
hostname: &str,
is_persistent: bool,
machine_uid: Option<&str>,
) -> Result<Machine, sqlx::Error> {
sqlx::query_as::<_, Machine>(
r#"
INSERT INTO connect_machines (agent_id, hostname, is_persistent, status, last_seen)
VALUES ($1, $2, $3, 'online', NOW())
ON CONFLICT (agent_id) DO UPDATE SET
hostname = EXCLUDED.hostname,
status = 'online',
last_seen = NOW()
RETURNING *
"#,
)
.bind(agent_id)
.bind(hostname)
.bind(is_persistent)
.fetch_one(pool)
.await
match machine_uid {
// Un-keyed dedup path: stable hardware identity is the conflict arbiter.
// A new agent_id for the same physical machine updates the existing row.
//
// Edge case: if this INSERT's new random `agent_id` happens to collide with a
// DIFFERENT legacy row's value under the `agent_id UNIQUE` constraint, Postgres
// raises the unique violation on `agent_id` BEFORE the `ON CONFLICT
// (machine_uid)` arbiter is consulted, so the upsert errors instead of merging
// on uid. This is non-fatal: the caller logs the error and the live in-memory
// session is unaffected, and the agent simply retries with a freshly minted
// UUID. The window closes on its own as legacy rows age out (SPEC-004 Task 3).
Some(uid) => {
sqlx::query_as::<_, Machine>(
r#"
INSERT INTO connect_machines (agent_id, hostname, is_persistent, status, last_seen, machine_uid)
VALUES ($1, $2, $3, 'online', NOW(), $4)
ON CONFLICT (machine_uid) DO UPDATE SET
agent_id = EXCLUDED.agent_id,
hostname = EXCLUDED.hostname,
status = 'online',
last_seen = NOW()
RETURNING *
"#,
)
.bind(agent_id)
.bind(hostname)
.bind(is_persistent)
.bind(uid)
.fetch_one(pool)
.await
}
// Legacy / authoritative path: dedup on agent_id exactly as before. Leaves
// machine_uid NULL (the partial unique index excludes NULLs, so any number
// of these may coexist).
None => {
sqlx::query_as::<_, Machine>(
r#"
INSERT INTO connect_machines (agent_id, hostname, is_persistent, status, last_seen)
VALUES ($1, $2, $3, 'online', NOW())
ON CONFLICT (agent_id) DO UPDATE SET
hostname = EXCLUDED.hostname,
status = 'online',
last_seen = NOW()
RETURNING *
"#,
)
.bind(agent_id)
.bind(hostname)
.bind(is_persistent)
.fetch_one(pool)
.await
}
}
}
/// Update machine status and info
@@ -239,3 +308,134 @@ pub async fn update_machine_metadata(
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::postgres::PgPoolOptions;
/// Connect to a throwaway test Postgres and apply migrations, or return `None`
/// when `TEST_DATABASE_URL` is unset so the suite is a no-op on workstations
/// without a database. CI sets `TEST_DATABASE_URL` against an ephemeral Postgres,
/// where these run for real. (The server crate is Linux-targeted and validated
/// in Gitea CI; these DB tests run there.)
async fn test_pool() -> Option<PgPool> {
let url = std::env::var("TEST_DATABASE_URL").ok()?;
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&url)
.await
.expect("connect to TEST_DATABASE_URL");
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("apply migrations to the test database");
Some(pool)
}
/// Remove any rows this test created so reruns are clean and tests don't collide.
async fn cleanup(pool: &PgPool, agent_ids: &[&str], machine_uids: &[&str]) {
for id in agent_ids {
let _ = sqlx::query("DELETE FROM connect_machines WHERE agent_id = $1")
.bind(id)
.execute(pool)
.await;
}
for uid in machine_uids {
let _ = sqlx::query("DELETE FROM connect_machines WHERE machine_uid = $1")
.bind(uid)
.execute(pool)
.await;
}
}
/// (a) Same `machine_uid` with two DIFFERENT `agent_id`s collapses to ONE row.
/// The second upsert updates the existing row (and repoints its agent_id) rather
/// than inserting a duplicate — the core dedup guarantee for the un-keyed fleet.
#[tokio::test]
async fn same_machine_uid_two_agent_ids_one_row() {
let Some(pool) = test_pool().await else {
return; // no TEST_DATABASE_URL: skip (runs in CI)
};
let uid = "test-muid-dedup-001";
cleanup(&pool, &["agent-A", "agent-B"], &[uid]).await;
let m1 = upsert_machine(&pool, "agent-A", "HOST-A", true, Some(uid))
.await
.expect("first upsert");
let m2 = upsert_machine(&pool, "agent-B", "HOST-A2", true, Some(uid))
.await
.expect("second upsert with same uid, different agent_id");
// Same physical row, agent_id and hostname refreshed to the latest.
assert_eq!(m1.id, m2.id, "same machine_uid must update the same row");
assert_eq!(m2.agent_id, "agent-B");
assert_eq!(m2.hostname, "HOST-A2");
assert_eq!(m2.machine_uid.as_deref(), Some(uid));
// Exactly one row carries this uid.
let count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM connect_machines WHERE machine_uid = $1")
.bind(uid)
.fetch_one(&pool)
.await
.expect("count rows for uid");
assert_eq!(count, 1, "must be exactly one row for the machine_uid");
cleanup(&pool, &["agent-A", "agent-B"], &[uid]).await;
}
/// (b) Legacy NULL-`machine_uid` path is unchanged: dedup keys on `agent_id`,
/// the row's `machine_uid` stays NULL, and re-upserting the same agent_id with
/// no uid updates the same row (no crash, no duplicate).
#[tokio::test]
async fn legacy_null_machine_uid_dedups_on_agent_id() {
let Some(pool) = test_pool().await else {
return; // no TEST_DATABASE_URL: skip (runs in CI)
};
let agent = "test-legacy-agent-001";
cleanup(&pool, &[agent], &[]).await;
let m1 = upsert_machine(&pool, agent, "LEGACY-HOST", true, None)
.await
.expect("legacy upsert (no uid)");
assert_eq!(m1.machine_uid, None, "legacy row must have NULL machine_uid");
let m2 = upsert_machine(&pool, agent, "LEGACY-HOST-RENAMED", true, None)
.await
.expect("legacy re-upsert (no uid)");
assert_eq!(m1.id, m2.id, "legacy agent_id must dedup to the same row");
assert_eq!(m2.hostname, "LEGACY-HOST-RENAMED");
assert_eq!(m2.machine_uid, None);
let count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM connect_machines WHERE agent_id = $1")
.bind(agent)
.fetch_one(&pool)
.await
.expect("count legacy rows");
assert_eq!(count, 1, "legacy path must not duplicate the row");
cleanup(&pool, &[agent], &[]).await;
}
/// Multiple legacy rows with NULL machine_uid coexist — the partial unique index
/// excludes NULLs, so distinct un-keyed agents are independent rows.
#[tokio::test]
async fn multiple_null_machine_uid_rows_coexist() {
let Some(pool) = test_pool().await else {
return; // no TEST_DATABASE_URL: skip (runs in CI)
};
cleanup(&pool, &["null-1", "null-2"], &[]).await;
let a = upsert_machine(&pool, "null-1", "H1", true, None)
.await
.expect("first null-uid row");
let b = upsert_machine(&pool, "null-2", "H2", true, None)
.await
.expect("second null-uid row");
assert_ne!(a.id, b.id, "distinct legacy agents must be distinct rows");
cleanup(&pool, &["null-1", "null-2"], &[]).await;
}
}

View File

@@ -245,9 +245,39 @@ async fn main() -> Result<()> {
"Reconciling {} managed session(s) from database",
machines.len()
);
// Machines bound to an active `cak_` key. For these the key→machine
// binding is authoritative (SPEC-004 Task 2), so we must NOT index a
// restored keyed session by its stored `machine_uid`: doing so would
// let an un-keyed agent spoofing that uid reattach the keyed machine's
// offline session after a restart. The connect path never writes a uid
// for keyed agents, so a non-NULL uid on a keyed row can only come from
// a legacy pre-keying row — but close the gap regardless. On query
// failure, fail closed (treat all machines as keyed: index none by uid)
// rather than risk indexing a keyed machine.
let keyed_ids = match db::agent_keys::keyed_machine_ids(db.pool()).await {
Ok(ids) => ids,
Err(e) => {
tracing::warn!(
"Could not load keyed-machine set; suppressing uid reattach index for all restored machines: {}",
e
);
machines.iter().map(|m| m.id).collect()
}
};
for machine in machines {
// Keyed machines get None (uid reattach disabled); un-keyed
// machines keep their stored uid for legitimate reattach.
let restore_uid = if keyed_ids.contains(&machine.id) {
None
} else {
machine.machine_uid.as_deref()
};
sessions
.restore_offline_machine(&machine.agent_id, &machine.hostname)
.restore_offline_machine(
&machine.agent_id,
&machine.hostname,
restore_uid,
)
.await;
}
}

View File

@@ -74,6 +74,13 @@ pub struct AgentParams {
/// API key for persistent (managed) agents
#[serde(default)]
api_key: Option<String>,
/// Deterministic, recomputable hardware identity reported by the agent
/// (v2-stable-identity Task 1; `transport/websocket.rs`). Used to dedup
/// registrations for UN-KEYED agents only — see the security note where it is
/// consumed below. CLIENT-SUPPLIED and therefore spoofable: it is never used to
/// override a `cak_` key's authoritative machine binding.
#[serde(default)]
machine_uid: Option<String>,
}
#[derive(Debug, Deserialize)]
@@ -110,6 +117,12 @@ pub async fn agent_ws_handler(
.unwrap_or_else(|| agent_id.clone());
let support_code = params.support_code.clone();
let api_key = params.api_key.clone();
// CLIENT-SUPPLIED machine_uid (v2-stable-identity Task 1). Spoofable; see the
// security gate below. It is used for dedup ONLY on the un-keyed path; for
// `cak_`-keyed agents it is suppressed so the key's machine binding stays
// authoritative. `is_keyed_agent` records which path authenticated us.
let claimed_machine_uid = params.machine_uid.clone();
let mut is_keyed_agent = false;
// Real client IP via the trusted-proxy-aware extractor (shared with the rate
// limiter and audit log). Behind NPM on loopback, `addr.ip()` is 127.0.0.1;
// this resolves the actual remote agent IP from forwarding headers when the
@@ -268,6 +281,11 @@ pub async fn agent_ws_handler(
);
}
agent_id = trusted_agent_id;
// KEYED agent: the key→machine binding is AUTHORITATIVE. Suppress
// the client-claimed machine_uid for dedup (below) so a valid key
// for machine X can never repoint machine Y's row by claiming Y's
// uid. Dedup for this agent stays on its authenticated agent_id.
is_keyed_agent = true;
info!(
"Agent {} from {} authenticated via per-agent key",
agent_id, client_ip
@@ -324,6 +342,20 @@ pub async fn agent_ws_handler(
let support_codes = state.support_codes.clone();
let db = state.db.clone();
// SECURITY GATE for machine_uid dedup (v2-stable-identity Task 2):
// - KEYED (`cak_`) agents: dedup stays on the authenticated agent_id; the
// claimed uid is DROPPED so it cannot override the key's machine binding.
// - UN-KEYED agents (support-code, deprecated shared key, no auth-identity):
// the claimed uid is a dedup-only correctness aid and is passed through.
// A client-asserted machine_uid is spoofable, so it must never be the dedup key
// for a keyed agent. This is the only place the distinction is enforced.
let effective_machine_uid = if is_keyed_agent {
None
} else {
// Treat an empty/whitespace-only claim as absent.
claimed_machine_uid.filter(|u| !u.trim().is_empty())
};
// Bounded relay: cap inbound frame/message size before the socket is upgraded
// so oversized agent frames are rejected by the WS layer, never `to_vec()`'d
// and broadcast. (WS-OOM HIGH.)
@@ -340,6 +372,7 @@ pub async fn agent_ws_handler(
agent_id,
agent_name,
support_code,
effective_machine_uid,
Some(client_ip),
)
}))
@@ -548,6 +581,10 @@ async fn handle_agent_connection(
agent_id: String,
agent_name: String,
support_code: Option<String>,
// Dedup identity for the UN-KEYED path only (already security-gated to `None`
// for `cak_`-keyed agents by `agent_ws_handler`). Drives both the in-memory
// session reattach key and the DB `ON CONFLICT (machine_uid)` upsert.
machine_uid: Option<String>,
client_ip: Option<std::net::IpAddr>,
) {
info!(
@@ -581,14 +618,27 @@ async fn handle_agent_connection(
// Persistent agents (no support code) keep their session when disconnected
let is_persistent = support_code.is_none();
let (session_id, frame_tx, mut input_rx) = sessions
.register_agent(agent_id.clone(), agent_name.clone(), is_persistent)
.register_agent(
agent_id.clone(),
agent_name.clone(),
is_persistent,
machine_uid.as_deref(),
)
.await;
info!("Session created: {} (agent in idle mode)", session_id);
// Database: upsert machine and create session record
let _machine_id = if let Some(ref db) = db {
match db::machines::upsert_machine(db.pool(), &agent_id, &agent_name, is_persistent).await {
match db::machines::upsert_machine(
db.pool(),
&agent_id,
&agent_name,
is_persistent,
machine_uid.as_deref(),
)
.await
{
Ok(machine) => {
// Create session record
let _ = db::sessions::create_session(

View File

@@ -174,6 +174,17 @@ struct SessionData {
pub struct SessionManager {
sessions: Arc<RwLock<HashMap<SessionId, SessionData>>>,
agents: Arc<RwLock<HashMap<AgentId, SessionId>>>,
/// Reattach index keyed by the stable hardware identity (`machine_uid`), for
/// the UN-KEYED dedup path (v2-stable-identity Task 2). Populated only when an
/// agent reports a (security-gated) machine_uid. Consulted BEFORE the
/// `agent_id` index on reconnect, so the SAME physical machine reconnecting
/// with a FRESH random agent_id (config loss) reattaches to its existing
/// session instead of stranding it and creating a duplicate.
///
/// Keyed (`cak_`) agents are passed `machine_uid = None` (the relay suppresses
/// the claimed uid for them), so they never appear here — their reattach stays
/// on the authoritative `agent_id` index, preserving the key's machine binding.
machine_uids: Arc<RwLock<HashMap<String, SessionId>>>,
}
impl SessionManager {
@@ -181,43 +192,97 @@ impl SessionManager {
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
agents: Arc::new(RwLock::new(HashMap::new())),
machine_uids: Arc::new(RwLock::new(HashMap::new())),
}
}
/// Register a new agent and create a session
/// If agent was previously connected (offline session exists), reuse that session
/// Register a new agent and create a session.
///
/// Reattach (reconnect to an existing offline session) prefers the stable
/// `machine_uid` identity when one is supplied, falling back to `agent_id`
/// (v2-stable-identity Task 2):
///
/// - `machine_uid = Some(uid)` (the UN-KEYED dedup path): look up the prior
/// session by `uid` FIRST. The same physical machine reconnecting with a
/// FRESH random `agent_id` (config loss) reattaches to its existing session
/// rather than stranding it and minting a duplicate. On reattach the session's
/// `agent_id` is repointed to the current one and the `agents` index is fixed
/// up to match.
/// - `machine_uid = None` (legacy / keyed path): behave exactly as before —
/// reattach by `agent_id` only. The relay passes `None` for `cak_`-keyed
/// agents so their authoritative key-bound identity drives reattach and a
/// spoofable claimed uid can never seize another machine's session.
pub async fn register_agent(
&self,
agent_id: AgentId,
agent_name: String,
is_persistent: bool,
machine_uid: Option<&str>,
) -> (SessionId, FrameSender, InputReceiver) {
// Check if this agent already has an offline session (reconnecting)
{
let agents = self.agents.read().await;
if let Some(&existing_session_id) = agents.get(&agent_id) {
let mut sessions = self.sessions.write().await;
if let Some(session_data) = sessions.get_mut(&existing_session_id) {
if !session_data.info.is_online {
// Reuse existing session - mark as online and create new channels
tracing::info!(
"Agent {} reconnecting to existing session {}",
agent_id,
existing_session_id
);
// Resolve a prior offline session to reattach to. Prefer the stable
// machine_uid identity; fall back to agent_id. `reattach_via_uid` records
// whether we matched on uid so we can fix up the agent_id index below.
let (existing_session_id, reattach_via_uid) = {
let by_uid = match machine_uid {
Some(uid) => self.machine_uids.read().await.get(uid).copied(),
None => None,
};
match by_uid {
Some(sid) => (Some(sid), true),
None => (self.agents.read().await.get(&agent_id).copied(), false),
}
};
let (frame_tx, _) = broadcast::channel(16);
let (input_tx, input_rx) = tokio::sync::mpsc::channel(64);
if let Some(existing_session_id) = existing_session_id {
let mut sessions = self.sessions.write().await;
if let Some(session_data) = sessions.get_mut(&existing_session_id) {
if !session_data.info.is_online {
// Reuse existing session - mark as online and create new channels
tracing::info!(
"Agent {} reconnecting to existing session {} (matched on {})",
agent_id,
existing_session_id,
if reattach_via_uid {
"machine_uid"
} else {
"agent_id"
}
);
session_data.info.is_online = true;
session_data.info.last_heartbeat = chrono::Utc::now();
session_data.info.agent_name = agent_name; // Update name in case it changed
session_data.frame_tx = frame_tx.clone();
session_data.input_tx = input_tx;
session_data.last_heartbeat_instant = Instant::now();
let (frame_tx, _) = broadcast::channel(16);
let (input_tx, input_rx) = tokio::sync::mpsc::channel(64);
return (existing_session_id, frame_tx, input_rx);
// If we matched on machine_uid, the agent_id may have changed
// (config loss minted a fresh one). Repoint the session and fix
// up the agent_id index: drop the stale agent_id mapping (only
// if it still pointed at THIS session) and add the current one.
let previous_agent_id = session_data.info.agent_id.clone();
if reattach_via_uid && previous_agent_id != agent_id {
let mut agents = self.agents.write().await;
if agents.get(&previous_agent_id) == Some(&existing_session_id) {
agents.remove(&previous_agent_id);
}
agents.insert(agent_id.clone(), existing_session_id);
session_data.info.agent_id = agent_id.clone();
}
session_data.info.is_online = true;
session_data.info.last_heartbeat = chrono::Utc::now();
session_data.info.agent_name = agent_name; // Update name in case it changed
session_data.frame_tx = frame_tx.clone();
session_data.input_tx = input_tx;
session_data.last_heartbeat_instant = Instant::now();
// Ensure the machine_uid index points at this session (it may be
// the first time we learn this uid for an agent_id-restored row).
if let Some(uid) = machine_uid {
self.machine_uids
.write()
.await
.insert(uid.to_string(), existing_session_id);
}
return (existing_session_id, frame_tx, input_rx);
}
}
}
@@ -274,6 +339,15 @@ impl SessionManager {
let mut agents = self.agents.write().await;
agents.insert(agent_id, session_id);
// Index the new session by its stable machine_uid (un-keyed path) so a later
// reconnect with a different agent_id reattaches here instead of duplicating.
if let Some(uid) = machine_uid {
self.machine_uids
.write()
.await
.insert(uid.to_string(), session_id);
}
(session_id, frame_tx, input_rx)
}
@@ -561,6 +635,11 @@ impl SessionManager {
drop(sessions); // Release sessions lock before acquiring agents lock
let mut agents = self.agents.write().await;
agents.remove(&agent_id);
drop(agents);
// Purge any machine_uid index entry for this removed support session
// so a stale uid can never reattach it.
let mut machine_uids = self.machine_uids.write().await;
machine_uids.retain(|_, &mut sid| sid != session_id);
tracing::info!("Support session {} removed", session_id);
}
}
@@ -573,6 +652,12 @@ impl SessionManager {
drop(sessions);
let mut agents = self.agents.write().await;
agents.remove(&session_data.info.agent_id);
drop(agents);
// Purge any machine_uid index entry pointing at this session so a stale
// uid can never reattach a removed session. The map holds one entry per
// real machine, so a value scan is cheap.
let mut machine_uids = self.machine_uids.write().await;
machine_uids.retain(|_, &mut sid| sid != session_id);
}
}
@@ -661,8 +746,19 @@ impl Default for SessionManager {
}
impl SessionManager {
/// Restore a machine as an offline session (called on startup from database)
pub async fn restore_offline_machine(&self, agent_id: &str, hostname: &str) -> SessionId {
/// Restore a machine as an offline session (called on startup from database).
///
/// `machine_uid` is the row's stored stable identity (migration 008), or `None`
/// for legacy rows. When present it is indexed so a reconnecting un-keyed agent
/// reattaches to this restored session by `machine_uid` even if it presents a
/// fresh `agent_id` (config loss) after a server restart — matching the live
/// reattach path in `register_agent`.
pub async fn restore_offline_machine(
&self,
agent_id: &str,
hostname: &str,
machine_uid: Option<&str>,
) -> SessionId {
let session_id = Uuid::new_v4();
let now = chrono::Utc::now();
@@ -707,10 +803,26 @@ impl SessionManager {
let mut agents = self.agents.write().await;
agents.insert(agent_id.to_string(), session_id);
drop(agents);
if let Some(uid) = machine_uid {
self.machine_uids
.write()
.await
.insert(uid.to_string(), session_id);
}
tracing::info!("Restored offline machine: {} ({})", hostname, agent_id);
session_id
}
/// Test-only: report whether a `machine_uid` is present in the reattach index.
/// Lets the restore tests assert the keyed/un-keyed indexing contract that the
/// startup loop in `main.rs` depends on (SPEC-004 Task 2 / L1 hardening).
#[cfg(test)]
async fn is_machine_uid_indexed(&self, uid: &str) -> bool {
self.machine_uids.read().await.contains_key(uid)
}
}
#[cfg(test)]
@@ -733,13 +845,46 @@ mod tests {
assert!(!ConsentState::Denied.allows_viewer());
}
/// L1 hardening (SPEC-004 Task 2): the startup restore loop hands `machine_uid =
/// None` to `restore_offline_machine` for KEYED machines, so a restored keyed
/// session is NEVER reachable via the uid reattach index — closing the spoofing
/// path where an un-keyed agent could reattach a keyed machine's offline session
/// by claiming its uid. This asserts the index-control contract the loop relies on.
#[tokio::test]
async fn keyed_machine_restored_with_none_is_not_uid_indexed() {
let mgr = SessionManager::new();
// Mirrors the loop's decision for a keyed machine: pass None even if the row
// carries a (legacy) machine_uid.
let legacy_uid = "muid-keyed-legacy";
mgr.restore_offline_machine("agent-keyed", "HOST-KEYED", None)
.await;
assert!(
!mgr.is_machine_uid_indexed(legacy_uid).await,
"a keyed machine restored with None must not be reachable by machine_uid"
);
}
/// Counterpart: an UN-keyed machine restored WITH its uid IS indexed, so a
/// reconnecting un-keyed agent can legitimately reattach by stable identity.
#[tokio::test]
async fn unkeyed_machine_restored_with_uid_is_indexed() {
let mgr = SessionManager::new();
let uid = "muid-unkeyed";
mgr.restore_offline_machine("agent-unkeyed", "HOST-UNKEYED", Some(uid))
.await;
assert!(
mgr.is_machine_uid_indexed(uid).await,
"an un-keyed machine restored with its uid must be reattachable by uid"
);
}
#[tokio::test]
async fn attended_session_starts_pending_and_blocks_viewer_until_granted() {
let mgr = SessionManager::new();
// Attended (support-code) session: is_persistent = false.
let (session_id, _frame_tx, _input_rx) = mgr
.register_agent("agent-att".to_string(), "Attended PC".to_string(), false)
.register_agent("agent-att".to_string(), "Attended PC".to_string(), false, None)
.await;
// Starts Pending.
@@ -771,7 +916,7 @@ mod tests {
// Managed/persistent session: is_persistent = true.
let (session_id, _frame_tx, _input_rx) = mgr
.register_agent("agent-mgd".to_string(), "Managed PC".to_string(), true)
.register_agent("agent-mgd".to_string(), "Managed PC".to_string(), true, None)
.await;
assert_eq!(
@@ -861,7 +1006,7 @@ mod tests {
async fn agent_status_updates_h264_capability() {
let mgr = SessionManager::new();
let (session_id, _frame_tx, _input_rx) = mgr
.register_agent("agent-cap".to_string(), "Cap PC".to_string(), true)
.register_agent("agent-cap".to_string(), "Cap PC".to_string(), true, None)
.await;
// Default is false until a status reports capability.
@@ -895,7 +1040,7 @@ mod tests {
async fn denied_attended_session_keeps_viewer_blocked() {
let mgr = SessionManager::new();
let (session_id, _frame_tx, _input_rx) = mgr
.register_agent("agent-deny".to_string(), "Deny PC".to_string(), false)
.register_agent("agent-deny".to_string(), "Deny PC".to_string(), false, None)
.await;
mgr.set_consent_state(session_id, ConsentState::Denied)
@@ -909,4 +1054,160 @@ mod tests {
.await
.is_none());
}
// ---- v2-stable-identity Task 2: machine_uid reattach (un-keyed dedup path) ----
/// The SAME physical machine (same `machine_uid`) reconnecting with a DIFFERENT
/// `agent_id` — exactly the config-loss / fresh-UUID case that produced the
/// duplicate rows — must reattach to its existing session, NOT create a second.
#[tokio::test]
async fn same_machine_uid_different_agent_id_reattaches_one_session() {
let mgr = SessionManager::new();
let uid = "muid-abc";
// First connect: agent_id #1, persistent (managed).
let (sid1, _f1, _i1) = mgr
.register_agent("agent-uuid-1".to_string(), "HOST-A".to_string(), true, Some(uid))
.await;
// Agent disconnects; persistent session is preserved offline.
mgr.mark_agent_disconnected(sid1).await;
assert_eq!(
mgr.get_session(sid1).await.map(|s| s.is_online),
Some(false),
"persistent session must be preserved offline after disconnect"
);
// Reconnect with a FRESH agent_id but the SAME machine_uid (config loss).
let (sid2, _f2, _i2) = mgr
.register_agent("agent-uuid-2".to_string(), "HOST-A".to_string(), true, Some(uid))
.await;
// Reattached to the SAME session, now back online.
assert_eq!(sid2, sid1, "same machine_uid must reattach the existing session");
assert_eq!(mgr.get_session(sid1).await.map(|s| s.is_online), Some(true));
// Exactly ONE session total — no duplicate was created.
assert_eq!(mgr.list_sessions().await.len(), 1, "must not duplicate the session");
// The session's agent_id was repointed to the current one, and the agent_id
// index resolves the new id (and no longer the stale one).
assert_eq!(
mgr.get_session(sid1).await.map(|s| s.agent_id),
Some("agent-uuid-2".to_string())
);
assert!(mgr.get_session_by_agent("agent-uuid-2").await.is_some());
assert!(
mgr.get_session_by_agent("agent-uuid-1").await.is_none(),
"stale agent_id must no longer resolve after machine_uid reattach"
);
}
/// Legacy path: agents that report NO machine_uid keep the original behavior —
/// reattach keys on `agent_id`, and a different agent_id with no uid is a
/// genuinely different machine (separate session). No crash, no regression.
#[tokio::test]
async fn legacy_no_machine_uid_dedups_on_agent_id() {
let mgr = SessionManager::new();
// Connect with NO machine_uid (legacy agent), persistent.
let (sid1, _f1, _i1) = mgr
.register_agent("legacy-agent".to_string(), "HOST-L".to_string(), true, None)
.await;
mgr.mark_agent_disconnected(sid1).await;
// Same agent_id, still no uid -> reattaches by agent_id (original behavior).
let (sid2, _f2, _i2) = mgr
.register_agent("legacy-agent".to_string(), "HOST-L".to_string(), true, None)
.await;
assert_eq!(sid2, sid1, "legacy agent_id reattach must be unchanged");
assert_eq!(mgr.list_sessions().await.len(), 1);
// A DIFFERENT legacy agent_id (no uid) is a distinct machine -> new session.
let (sid3, _f3, _i3) = mgr
.register_agent("legacy-other".to_string(), "HOST-M".to_string(), true, None)
.await;
assert_ne!(sid3, sid1);
assert_eq!(mgr.list_sessions().await.len(), 2);
}
/// A keyed agent is modeled here by the relay passing `machine_uid = None`
/// (the security gate suppresses the claimed uid). Its reattach therefore keys
/// on the authoritative `agent_id` only — a second machine cannot seize its
/// session by claiming the same uid, because no uid index entry exists for it.
#[tokio::test]
async fn keyed_agent_modeled_as_none_reattaches_on_agent_id_only() {
let mgr = SessionManager::new();
// Keyed agent: relay passes None for machine_uid.
let (sid1, _f1, _i1) = mgr
.register_agent("keyed-machine".to_string(), "HOST-K".to_string(), true, None)
.await;
mgr.mark_agent_disconnected(sid1).await;
// Another connection CLAIMS a uid (as an un-keyed agent would). Because the
// keyed session was indexed by agent_id only (uid suppressed upstream),
// this claim cannot reattach the keyed session — it gets its own.
let (sid2, _f2, _i2) = mgr
.register_agent(
"attacker-agent".to_string(),
"EVIL".to_string(),
false,
Some("muid-claimed"),
)
.await;
assert_ne!(
sid2, sid1,
"a claimed machine_uid must not reattach a keyed agent's session"
);
// The keyed agent reconnects (relay again passes None) and reattaches its OWN
// session by agent_id.
let (sid3, _f3, _i3) = mgr
.register_agent("keyed-machine".to_string(), "HOST-K".to_string(), true, None)
.await;
assert_eq!(sid3, sid1, "keyed agent reattaches on its authoritative agent_id");
}
/// Removing a session purges its machine_uid index entry so a later reconnect
/// with the same uid starts fresh instead of pointing at a dead session.
#[tokio::test]
async fn removing_session_purges_machine_uid_index() {
let mgr = SessionManager::new();
let uid = "muid-purge";
let (sid1, _f1, _i1) = mgr
.register_agent("purge-agent".to_string(), "HOST-P".to_string(), true, Some(uid))
.await;
mgr.remove_session(sid1).await;
assert!(mgr.get_session(sid1).await.is_none());
// Reconnect with the same uid -> a brand-new session (no stale reattach).
let (sid2, _f2, _i2) = mgr
.register_agent("purge-agent-2".to_string(), "HOST-P".to_string(), true, Some(uid))
.await;
assert_ne!(sid2, sid1, "removed session's uid must not reattach");
assert_eq!(mgr.list_sessions().await.len(), 1);
}
/// After a server restart, a restored offline row indexed by machine_uid must
/// let an un-keyed agent reattach by uid even if it presents a fresh agent_id.
#[tokio::test]
async fn restored_offline_machine_reattaches_by_machine_uid() {
let mgr = SessionManager::new();
let uid = "muid-restored";
// Simulate startup reconcile: restore an offline persistent machine with uid.
let sid = mgr
.restore_offline_machine("orig-agent", "HOST-R", Some(uid))
.await;
assert_eq!(mgr.get_session(sid).await.map(|s| s.is_online), Some(false));
// Agent reconnects post-restart with a FRESH agent_id but the same uid.
let (sid2, _f2, _i2) = mgr
.register_agent("fresh-agent".to_string(), "HOST-R".to_string(), true, Some(uid))
.await;
assert_eq!(sid2, sid, "restored machine must reattach by machine_uid");
assert_eq!(mgr.list_sessions().await.len(), 1);
}
}