feat(server): dedup machines on machine_uid (SPEC-004 Task 2)
Persist the agent-reported machine_uid and dedup connect_machines on it so a single physical machine can't register duplicate rows when its config-file agent_id regenerates (the ghost-session root cause). - migration 008: nullable connect_machines.machine_uid + partial unique index (WHERE machine_uid IS NOT NULL); idempotent, startup-applied. - upsert_machine: two-path dedup (ON CONFLICT machine_uid when present, else the legacy ON CONFLICT agent_id path, unchanged). - session reattach: a machine_uid index consulted before agent_id, with all removal paths purging it. - security: keyed (cak_) agents stay authoritative — their claimed machine_uid is dropped (effective_machine_uid=None); uid is dedup-only for un-keyed / support-code agents. Startup restore skips uid-indexing keyed machines and fails closed if the keyed-set query errors. 74 server tests pass; clippy clean. Implements specs/v2-stable-identity/plan.md Task 2. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
41
server/migrations/008_machine_uid.sql
Normal file
41
server/migrations/008_machine_uid.sql
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
-- Migration: 008_machine_uid.sql
|
||||||
|
-- Purpose: Give connect_machines a deterministic, recomputable hardware identity
|
||||||
|
-- (machine_uid) and dedup registrations on it (SPEC-004 / v2-stable-identity
|
||||||
|
-- Task 2).
|
||||||
|
--
|
||||||
|
-- Today connect_machines is keyed only on `agent_id` — a random UUID the agent
|
||||||
|
-- persists in its config (generate_agent_id()). A lost/missing config produces a
|
||||||
|
-- fresh UUID, and because upsert_machine dedups `ON CONFLICT (agent_id)`, that
|
||||||
|
-- fresh id inserts a NEW row: the duplicate-registration bug (15 rows for 5 real
|
||||||
|
-- hosts in production). The agent (Task 1) now derives a stable `machine_uid` from
|
||||||
|
-- durable hardware/OS identifiers (Windows MachineGuid, hashed; recomputable) and
|
||||||
|
-- reports it on the connect handshake and on AgentStatus. This migration persists
|
||||||
|
-- it and adds the uniqueness needed to dedup the un-keyed / shared-key / config-loss
|
||||||
|
-- fleet on that stable identity.
|
||||||
|
--
|
||||||
|
-- SECURITY (SPEC-004): a client-asserted machine_uid is spoofable and is therefore
|
||||||
|
-- NOT a trust boundary. For per-agent (`cak_`) keyed agents the key's machine
|
||||||
|
-- binding stays authoritative (the server dedups those on their authenticated
|
||||||
|
-- agent_id, never on a claimed uid). machine_uid is a *correctness* aid for the
|
||||||
|
-- un-keyed path only. The UNIQUE index below enforces "one row per machine_uid"
|
||||||
|
-- at the schema level so a concurrent/racing un-keyed insert cannot create a
|
||||||
|
-- duplicate behind the application-level ON CONFLICT.
|
||||||
|
--
|
||||||
|
-- Idempotent: ADD COLUMN IF NOT EXISTS + CREATE UNIQUE INDEX IF NOT EXISTS. The
|
||||||
|
-- column is NULLABLE: legacy rows and agents that do not report a uid (older agents,
|
||||||
|
-- some support-code clients) carry NULL, and the partial index excludes NULLs so any
|
||||||
|
-- number of un-keyed rows may coexist without a uid. Applied on server startup by
|
||||||
|
-- sqlx::migrate!(); never pre-applied via psql. Ordered after 007.
|
||||||
|
-- See .claude/standards/gururmm/sqlx-migrations.md.
|
||||||
|
|
||||||
|
-- 1. machine_uid: deterministic hardware identity reported by the agent. NULLABLE
|
||||||
|
-- so legacy rows and non-reporting agents are unaffected.
|
||||||
|
ALTER TABLE connect_machines ADD COLUMN IF NOT EXISTS machine_uid TEXT;
|
||||||
|
|
||||||
|
-- 2. Enforce one row per machine_uid, but ONLY for rows that actually have one.
|
||||||
|
-- A partial UNIQUE index (WHERE machine_uid IS NOT NULL) lets unlimited legacy
|
||||||
|
-- NULL rows coexist while making a non-null machine_uid a true dedup key — this
|
||||||
|
-- is what upsert_machine's `ON CONFLICT (machine_uid)` arbiter binds to.
|
||||||
|
CREATE UNIQUE INDEX IF NOT EXISTS idx_connect_machines_machine_uid
|
||||||
|
ON connect_machines (machine_uid)
|
||||||
|
WHERE machine_uid IS NOT NULL;
|
||||||
@@ -12,6 +12,7 @@
|
|||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
|
use std::collections::HashSet;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
/// Per-agent key record from the database.
|
/// Per-agent key record from the database.
|
||||||
@@ -142,3 +143,27 @@ pub async fn touch_last_used(pool: &PgPool, id: Uuid) -> Result<(), sqlx::Error>
|
|||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the set of `connect_machines.id` values that are bound to at least one
|
||||||
|
/// active (non-revoked) per-agent `cak_` key.
|
||||||
|
///
|
||||||
|
/// Used by the startup restore loop (`main.rs`) to enforce the same keyed/un-keyed
|
||||||
|
/// distinction the relay applies at connect time (SPEC-004 Task 2). For a KEYED
|
||||||
|
/// machine the key→machine binding is the authoritative identity, so its restored
|
||||||
|
/// session must NOT be indexed by a client-asserted `machine_uid` — otherwise an
|
||||||
|
/// un-keyed agent spoofing that uid could reattach the keyed machine's offline
|
||||||
|
/// session after a restart. Membership here lets the caller pass `machine_uid =
|
||||||
|
/// None` for keyed machines while still indexing un-keyed ones for legitimate
|
||||||
|
/// uid-based reattach.
|
||||||
|
pub async fn keyed_machine_ids(pool: &PgPool) -> Result<HashSet<Uuid>, sqlx::Error> {
|
||||||
|
let ids = sqlx::query_scalar::<_, Uuid>(
|
||||||
|
r#"
|
||||||
|
SELECT DISTINCT machine_id
|
||||||
|
FROM connect_agent_keys
|
||||||
|
WHERE revoked_at IS NULL
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.fetch_all(pool)
|
||||||
|
.await?;
|
||||||
|
Ok(ids.into_iter().collect())
|
||||||
|
}
|
||||||
|
|||||||
@@ -50,6 +50,14 @@ pub struct Machine {
|
|||||||
/// impl) so it can never error at decode time.
|
/// impl) so it can never error at decode time.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub tags: Vec<String>,
|
pub tags: Vec<String>,
|
||||||
|
/// Deterministic, recomputable hardware identity reported by the agent
|
||||||
|
/// (`AgentStatus.machine_uid` / connect query param). Column added in migration
|
||||||
|
/// 008. NULLABLE: legacy rows and agents that do not report a uid carry `None`.
|
||||||
|
/// For un-keyed agents this is the dedup key (`upsert_machine` keys
|
||||||
|
/// `ON CONFLICT (machine_uid)` when present); for `cak_`-keyed agents the key's
|
||||||
|
/// machine binding stays authoritative and the claimed uid is NOT used to dedup
|
||||||
|
/// (see `upsert_machine`).
|
||||||
|
pub machine_uid: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'r> FromRow<'r, PgRow> for Machine {
|
impl<'r> FromRow<'r, PgRow> for Machine {
|
||||||
@@ -65,6 +73,8 @@ impl<'r> FromRow<'r, PgRow> for Machine {
|
|||||||
tenant_id: row.try_get("tenant_id")?,
|
tenant_id: row.try_get("tenant_id")?,
|
||||||
organization: row.try_get("organization")?,
|
organization: row.try_get("organization")?,
|
||||||
site: row.try_get("site")?,
|
site: row.try_get("site")?,
|
||||||
|
// Schema-nullable (migration 008); decode directly as Option.
|
||||||
|
machine_uid: row.try_get("machine_uid")?,
|
||||||
// Nullable-with-default columns mapped to non-`Option` Rust types: read as
|
// Nullable-with-default columns mapped to non-`Option` Rust types: read as
|
||||||
// `Option<T>` and fall back to the type default so a NULL cell never errors.
|
// `Option<T>` and fall back to the type default so a NULL cell never errors.
|
||||||
is_elevated: row
|
is_elevated: row
|
||||||
@@ -97,13 +107,70 @@ impl<'r> FromRow<'r, PgRow> for Machine {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get or create a machine by agent_id (upsert)
|
/// Get or create a machine record (upsert), deduplicating on the most stable
|
||||||
|
/// identity available (SPEC-004 / v2-stable-identity Task 2).
|
||||||
|
///
|
||||||
|
/// Two dedup paths, selected by whether the caller passes a `machine_uid`:
|
||||||
|
///
|
||||||
|
/// - **`machine_uid = Some(uid)` (the un-keyed dedup path):** key on the stable
|
||||||
|
/// hardware identity — `ON CONFLICT (machine_uid)`. The SAME machine reconnecting
|
||||||
|
/// with a DIFFERENT `agent_id` (e.g. after a config loss minted a fresh random id)
|
||||||
|
/// updates its EXISTING row instead of inserting a duplicate. `agent_id` and
|
||||||
|
/// `hostname` are refreshed to the latest reported values. This is what collapses
|
||||||
|
/// the duplicate-registration fleet down to one row per real machine.
|
||||||
|
///
|
||||||
|
/// - **`machine_uid = None` (legacy / authoritative path):** preserve the original
|
||||||
|
/// behavior exactly — `ON CONFLICT (agent_id)`. Used for agents that do not report
|
||||||
|
/// a uid AND, critically, for `cak_`-keyed agents: the caller (`relay`) passes
|
||||||
|
/// `None` for keyed agents so their AUTHORITATIVE key-bound `agent_id` is the dedup
|
||||||
|
/// key and a client-claimed `machine_uid` can never repoint a keyed machine's row.
|
||||||
|
///
|
||||||
|
/// SECURITY: a client-asserted `machine_uid` is spoofable, so it is a *correctness*
|
||||||
|
/// aid, not a trust boundary. Only the un-keyed path supplies it; the keyed path's
|
||||||
|
/// authority lives in the key→machine binding upstream (see
|
||||||
|
/// `relay::agent_ws_handler`), never here.
|
||||||
pub async fn upsert_machine(
|
pub async fn upsert_machine(
|
||||||
pool: &PgPool,
|
pool: &PgPool,
|
||||||
agent_id: &str,
|
agent_id: &str,
|
||||||
hostname: &str,
|
hostname: &str,
|
||||||
is_persistent: bool,
|
is_persistent: bool,
|
||||||
|
machine_uid: Option<&str>,
|
||||||
) -> Result<Machine, sqlx::Error> {
|
) -> Result<Machine, sqlx::Error> {
|
||||||
|
match machine_uid {
|
||||||
|
// Un-keyed dedup path: stable hardware identity is the conflict arbiter.
|
||||||
|
// A new agent_id for the same physical machine updates the existing row.
|
||||||
|
//
|
||||||
|
// Edge case: if this INSERT's new random `agent_id` happens to collide with a
|
||||||
|
// DIFFERENT legacy row's value under the `agent_id UNIQUE` constraint, Postgres
|
||||||
|
// raises the unique violation on `agent_id` BEFORE the `ON CONFLICT
|
||||||
|
// (machine_uid)` arbiter is consulted, so the upsert errors instead of merging
|
||||||
|
// on uid. This is non-fatal: the caller logs the error and the live in-memory
|
||||||
|
// session is unaffected, and the agent simply retries with a freshly minted
|
||||||
|
// UUID. The window closes on its own as legacy rows age out (SPEC-004 Task 3).
|
||||||
|
Some(uid) => {
|
||||||
|
sqlx::query_as::<_, Machine>(
|
||||||
|
r#"
|
||||||
|
INSERT INTO connect_machines (agent_id, hostname, is_persistent, status, last_seen, machine_uid)
|
||||||
|
VALUES ($1, $2, $3, 'online', NOW(), $4)
|
||||||
|
ON CONFLICT (machine_uid) DO UPDATE SET
|
||||||
|
agent_id = EXCLUDED.agent_id,
|
||||||
|
hostname = EXCLUDED.hostname,
|
||||||
|
status = 'online',
|
||||||
|
last_seen = NOW()
|
||||||
|
RETURNING *
|
||||||
|
"#,
|
||||||
|
)
|
||||||
|
.bind(agent_id)
|
||||||
|
.bind(hostname)
|
||||||
|
.bind(is_persistent)
|
||||||
|
.bind(uid)
|
||||||
|
.fetch_one(pool)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
// Legacy / authoritative path: dedup on agent_id exactly as before. Leaves
|
||||||
|
// machine_uid NULL (the partial unique index excludes NULLs, so any number
|
||||||
|
// of these may coexist).
|
||||||
|
None => {
|
||||||
sqlx::query_as::<_, Machine>(
|
sqlx::query_as::<_, Machine>(
|
||||||
r#"
|
r#"
|
||||||
INSERT INTO connect_machines (agent_id, hostname, is_persistent, status, last_seen)
|
INSERT INTO connect_machines (agent_id, hostname, is_persistent, status, last_seen)
|
||||||
@@ -121,6 +188,8 @@ pub async fn upsert_machine(
|
|||||||
.fetch_one(pool)
|
.fetch_one(pool)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Update machine status and info
|
/// Update machine status and info
|
||||||
#[allow(dead_code)] // TODO(native-remote-control): consumed by the integration API; see docs/specs/native-remote-control/
|
#[allow(dead_code)] // TODO(native-remote-control): consumed by the integration API; see docs/specs/native-remote-control/
|
||||||
@@ -239,3 +308,134 @@ pub async fn update_machine_metadata(
|
|||||||
.await?;
|
.await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use sqlx::postgres::PgPoolOptions;
|
||||||
|
|
||||||
|
/// Connect to a throwaway test Postgres and apply migrations, or return `None`
|
||||||
|
/// when `TEST_DATABASE_URL` is unset so the suite is a no-op on workstations
|
||||||
|
/// without a database. CI sets `TEST_DATABASE_URL` against an ephemeral Postgres,
|
||||||
|
/// where these run for real. (The server crate is Linux-targeted and validated
|
||||||
|
/// in Gitea CI; these DB tests run there.)
|
||||||
|
async fn test_pool() -> Option<PgPool> {
|
||||||
|
let url = std::env::var("TEST_DATABASE_URL").ok()?;
|
||||||
|
let pool = PgPoolOptions::new()
|
||||||
|
.max_connections(2)
|
||||||
|
.connect(&url)
|
||||||
|
.await
|
||||||
|
.expect("connect to TEST_DATABASE_URL");
|
||||||
|
sqlx::migrate!("./migrations")
|
||||||
|
.run(&pool)
|
||||||
|
.await
|
||||||
|
.expect("apply migrations to the test database");
|
||||||
|
Some(pool)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove any rows this test created so reruns are clean and tests don't collide.
|
||||||
|
async fn cleanup(pool: &PgPool, agent_ids: &[&str], machine_uids: &[&str]) {
|
||||||
|
for id in agent_ids {
|
||||||
|
let _ = sqlx::query("DELETE FROM connect_machines WHERE agent_id = $1")
|
||||||
|
.bind(id)
|
||||||
|
.execute(pool)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
for uid in machine_uids {
|
||||||
|
let _ = sqlx::query("DELETE FROM connect_machines WHERE machine_uid = $1")
|
||||||
|
.bind(uid)
|
||||||
|
.execute(pool)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// (a) Same `machine_uid` with two DIFFERENT `agent_id`s collapses to ONE row.
|
||||||
|
/// The second upsert updates the existing row (and repoints its agent_id) rather
|
||||||
|
/// than inserting a duplicate — the core dedup guarantee for the un-keyed fleet.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn same_machine_uid_two_agent_ids_one_row() {
|
||||||
|
let Some(pool) = test_pool().await else {
|
||||||
|
return; // no TEST_DATABASE_URL: skip (runs in CI)
|
||||||
|
};
|
||||||
|
let uid = "test-muid-dedup-001";
|
||||||
|
cleanup(&pool, &["agent-A", "agent-B"], &[uid]).await;
|
||||||
|
|
||||||
|
let m1 = upsert_machine(&pool, "agent-A", "HOST-A", true, Some(uid))
|
||||||
|
.await
|
||||||
|
.expect("first upsert");
|
||||||
|
let m2 = upsert_machine(&pool, "agent-B", "HOST-A2", true, Some(uid))
|
||||||
|
.await
|
||||||
|
.expect("second upsert with same uid, different agent_id");
|
||||||
|
|
||||||
|
// Same physical row, agent_id and hostname refreshed to the latest.
|
||||||
|
assert_eq!(m1.id, m2.id, "same machine_uid must update the same row");
|
||||||
|
assert_eq!(m2.agent_id, "agent-B");
|
||||||
|
assert_eq!(m2.hostname, "HOST-A2");
|
||||||
|
assert_eq!(m2.machine_uid.as_deref(), Some(uid));
|
||||||
|
|
||||||
|
// Exactly one row carries this uid.
|
||||||
|
let count: i64 =
|
||||||
|
sqlx::query_scalar("SELECT COUNT(*) FROM connect_machines WHERE machine_uid = $1")
|
||||||
|
.bind(uid)
|
||||||
|
.fetch_one(&pool)
|
||||||
|
.await
|
||||||
|
.expect("count rows for uid");
|
||||||
|
assert_eq!(count, 1, "must be exactly one row for the machine_uid");
|
||||||
|
|
||||||
|
cleanup(&pool, &["agent-A", "agent-B"], &[uid]).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// (b) Legacy NULL-`machine_uid` path is unchanged: dedup keys on `agent_id`,
|
||||||
|
/// the row's `machine_uid` stays NULL, and re-upserting the same agent_id with
|
||||||
|
/// no uid updates the same row (no crash, no duplicate).
|
||||||
|
#[tokio::test]
|
||||||
|
async fn legacy_null_machine_uid_dedups_on_agent_id() {
|
||||||
|
let Some(pool) = test_pool().await else {
|
||||||
|
return; // no TEST_DATABASE_URL: skip (runs in CI)
|
||||||
|
};
|
||||||
|
let agent = "test-legacy-agent-001";
|
||||||
|
cleanup(&pool, &[agent], &[]).await;
|
||||||
|
|
||||||
|
let m1 = upsert_machine(&pool, agent, "LEGACY-HOST", true, None)
|
||||||
|
.await
|
||||||
|
.expect("legacy upsert (no uid)");
|
||||||
|
assert_eq!(m1.machine_uid, None, "legacy row must have NULL machine_uid");
|
||||||
|
|
||||||
|
let m2 = upsert_machine(&pool, agent, "LEGACY-HOST-RENAMED", true, None)
|
||||||
|
.await
|
||||||
|
.expect("legacy re-upsert (no uid)");
|
||||||
|
assert_eq!(m1.id, m2.id, "legacy agent_id must dedup to the same row");
|
||||||
|
assert_eq!(m2.hostname, "LEGACY-HOST-RENAMED");
|
||||||
|
assert_eq!(m2.machine_uid, None);
|
||||||
|
|
||||||
|
let count: i64 =
|
||||||
|
sqlx::query_scalar("SELECT COUNT(*) FROM connect_machines WHERE agent_id = $1")
|
||||||
|
.bind(agent)
|
||||||
|
.fetch_one(&pool)
|
||||||
|
.await
|
||||||
|
.expect("count legacy rows");
|
||||||
|
assert_eq!(count, 1, "legacy path must not duplicate the row");
|
||||||
|
|
||||||
|
cleanup(&pool, &[agent], &[]).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Multiple legacy rows with NULL machine_uid coexist — the partial unique index
|
||||||
|
/// excludes NULLs, so distinct un-keyed agents are independent rows.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn multiple_null_machine_uid_rows_coexist() {
|
||||||
|
let Some(pool) = test_pool().await else {
|
||||||
|
return; // no TEST_DATABASE_URL: skip (runs in CI)
|
||||||
|
};
|
||||||
|
cleanup(&pool, &["null-1", "null-2"], &[]).await;
|
||||||
|
|
||||||
|
let a = upsert_machine(&pool, "null-1", "H1", true, None)
|
||||||
|
.await
|
||||||
|
.expect("first null-uid row");
|
||||||
|
let b = upsert_machine(&pool, "null-2", "H2", true, None)
|
||||||
|
.await
|
||||||
|
.expect("second null-uid row");
|
||||||
|
assert_ne!(a.id, b.id, "distinct legacy agents must be distinct rows");
|
||||||
|
|
||||||
|
cleanup(&pool, &["null-1", "null-2"], &[]).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -245,9 +245,39 @@ async fn main() -> Result<()> {
|
|||||||
"Reconciling {} managed session(s) from database",
|
"Reconciling {} managed session(s) from database",
|
||||||
machines.len()
|
machines.len()
|
||||||
);
|
);
|
||||||
|
// Machines bound to an active `cak_` key. For these the key→machine
|
||||||
|
// binding is authoritative (SPEC-004 Task 2), so we must NOT index a
|
||||||
|
// restored keyed session by its stored `machine_uid`: doing so would
|
||||||
|
// let an un-keyed agent spoofing that uid reattach the keyed machine's
|
||||||
|
// offline session after a restart. The connect path never writes a uid
|
||||||
|
// for keyed agents, so a non-NULL uid on a keyed row can only come from
|
||||||
|
// a legacy pre-keying row — but close the gap regardless. On query
|
||||||
|
// failure, fail closed (treat all machines as keyed: index none by uid)
|
||||||
|
// rather than risk indexing a keyed machine.
|
||||||
|
let keyed_ids = match db::agent_keys::keyed_machine_ids(db.pool()).await {
|
||||||
|
Ok(ids) => ids,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(
|
||||||
|
"Could not load keyed-machine set; suppressing uid reattach index for all restored machines: {}",
|
||||||
|
e
|
||||||
|
);
|
||||||
|
machines.iter().map(|m| m.id).collect()
|
||||||
|
}
|
||||||
|
};
|
||||||
for machine in machines {
|
for machine in machines {
|
||||||
|
// Keyed machines get None (uid reattach disabled); un-keyed
|
||||||
|
// machines keep their stored uid for legitimate reattach.
|
||||||
|
let restore_uid = if keyed_ids.contains(&machine.id) {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
machine.machine_uid.as_deref()
|
||||||
|
};
|
||||||
sessions
|
sessions
|
||||||
.restore_offline_machine(&machine.agent_id, &machine.hostname)
|
.restore_offline_machine(
|
||||||
|
&machine.agent_id,
|
||||||
|
&machine.hostname,
|
||||||
|
restore_uid,
|
||||||
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,6 +74,13 @@ pub struct AgentParams {
|
|||||||
/// API key for persistent (managed) agents
|
/// API key for persistent (managed) agents
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
api_key: Option<String>,
|
api_key: Option<String>,
|
||||||
|
/// Deterministic, recomputable hardware identity reported by the agent
|
||||||
|
/// (v2-stable-identity Task 1; `transport/websocket.rs`). Used to dedup
|
||||||
|
/// registrations for UN-KEYED agents only — see the security note where it is
|
||||||
|
/// consumed below. CLIENT-SUPPLIED and therefore spoofable: it is never used to
|
||||||
|
/// override a `cak_` key's authoritative machine binding.
|
||||||
|
#[serde(default)]
|
||||||
|
machine_uid: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
@@ -110,6 +117,12 @@ pub async fn agent_ws_handler(
|
|||||||
.unwrap_or_else(|| agent_id.clone());
|
.unwrap_or_else(|| agent_id.clone());
|
||||||
let support_code = params.support_code.clone();
|
let support_code = params.support_code.clone();
|
||||||
let api_key = params.api_key.clone();
|
let api_key = params.api_key.clone();
|
||||||
|
// CLIENT-SUPPLIED machine_uid (v2-stable-identity Task 1). Spoofable; see the
|
||||||
|
// security gate below. It is used for dedup ONLY on the un-keyed path; for
|
||||||
|
// `cak_`-keyed agents it is suppressed so the key's machine binding stays
|
||||||
|
// authoritative. `is_keyed_agent` records which path authenticated us.
|
||||||
|
let claimed_machine_uid = params.machine_uid.clone();
|
||||||
|
let mut is_keyed_agent = false;
|
||||||
// Real client IP via the trusted-proxy-aware extractor (shared with the rate
|
// Real client IP via the trusted-proxy-aware extractor (shared with the rate
|
||||||
// limiter and audit log). Behind NPM on loopback, `addr.ip()` is 127.0.0.1;
|
// limiter and audit log). Behind NPM on loopback, `addr.ip()` is 127.0.0.1;
|
||||||
// this resolves the actual remote agent IP from forwarding headers when the
|
// this resolves the actual remote agent IP from forwarding headers when the
|
||||||
@@ -268,6 +281,11 @@ pub async fn agent_ws_handler(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
agent_id = trusted_agent_id;
|
agent_id = trusted_agent_id;
|
||||||
|
// KEYED agent: the key→machine binding is AUTHORITATIVE. Suppress
|
||||||
|
// the client-claimed machine_uid for dedup (below) so a valid key
|
||||||
|
// for machine X can never repoint machine Y's row by claiming Y's
|
||||||
|
// uid. Dedup for this agent stays on its authenticated agent_id.
|
||||||
|
is_keyed_agent = true;
|
||||||
info!(
|
info!(
|
||||||
"Agent {} from {} authenticated via per-agent key",
|
"Agent {} from {} authenticated via per-agent key",
|
||||||
agent_id, client_ip
|
agent_id, client_ip
|
||||||
@@ -324,6 +342,20 @@ pub async fn agent_ws_handler(
|
|||||||
let support_codes = state.support_codes.clone();
|
let support_codes = state.support_codes.clone();
|
||||||
let db = state.db.clone();
|
let db = state.db.clone();
|
||||||
|
|
||||||
|
// SECURITY GATE for machine_uid dedup (v2-stable-identity Task 2):
|
||||||
|
// - KEYED (`cak_`) agents: dedup stays on the authenticated agent_id; the
|
||||||
|
// claimed uid is DROPPED so it cannot override the key's machine binding.
|
||||||
|
// - UN-KEYED agents (support-code, deprecated shared key, no auth-identity):
|
||||||
|
// the claimed uid is a dedup-only correctness aid and is passed through.
|
||||||
|
// A client-asserted machine_uid is spoofable, so it must never be the dedup key
|
||||||
|
// for a keyed agent. This is the only place the distinction is enforced.
|
||||||
|
let effective_machine_uid = if is_keyed_agent {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
// Treat an empty/whitespace-only claim as absent.
|
||||||
|
claimed_machine_uid.filter(|u| !u.trim().is_empty())
|
||||||
|
};
|
||||||
|
|
||||||
// Bounded relay: cap inbound frame/message size before the socket is upgraded
|
// Bounded relay: cap inbound frame/message size before the socket is upgraded
|
||||||
// so oversized agent frames are rejected by the WS layer, never `to_vec()`'d
|
// so oversized agent frames are rejected by the WS layer, never `to_vec()`'d
|
||||||
// and broadcast. (WS-OOM HIGH.)
|
// and broadcast. (WS-OOM HIGH.)
|
||||||
@@ -340,6 +372,7 @@ pub async fn agent_ws_handler(
|
|||||||
agent_id,
|
agent_id,
|
||||||
agent_name,
|
agent_name,
|
||||||
support_code,
|
support_code,
|
||||||
|
effective_machine_uid,
|
||||||
Some(client_ip),
|
Some(client_ip),
|
||||||
)
|
)
|
||||||
}))
|
}))
|
||||||
@@ -548,6 +581,10 @@ async fn handle_agent_connection(
|
|||||||
agent_id: String,
|
agent_id: String,
|
||||||
agent_name: String,
|
agent_name: String,
|
||||||
support_code: Option<String>,
|
support_code: Option<String>,
|
||||||
|
// Dedup identity for the UN-KEYED path only (already security-gated to `None`
|
||||||
|
// for `cak_`-keyed agents by `agent_ws_handler`). Drives both the in-memory
|
||||||
|
// session reattach key and the DB `ON CONFLICT (machine_uid)` upsert.
|
||||||
|
machine_uid: Option<String>,
|
||||||
client_ip: Option<std::net::IpAddr>,
|
client_ip: Option<std::net::IpAddr>,
|
||||||
) {
|
) {
|
||||||
info!(
|
info!(
|
||||||
@@ -581,14 +618,27 @@ async fn handle_agent_connection(
|
|||||||
// Persistent agents (no support code) keep their session when disconnected
|
// Persistent agents (no support code) keep their session when disconnected
|
||||||
let is_persistent = support_code.is_none();
|
let is_persistent = support_code.is_none();
|
||||||
let (session_id, frame_tx, mut input_rx) = sessions
|
let (session_id, frame_tx, mut input_rx) = sessions
|
||||||
.register_agent(agent_id.clone(), agent_name.clone(), is_persistent)
|
.register_agent(
|
||||||
|
agent_id.clone(),
|
||||||
|
agent_name.clone(),
|
||||||
|
is_persistent,
|
||||||
|
machine_uid.as_deref(),
|
||||||
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
info!("Session created: {} (agent in idle mode)", session_id);
|
info!("Session created: {} (agent in idle mode)", session_id);
|
||||||
|
|
||||||
// Database: upsert machine and create session record
|
// Database: upsert machine and create session record
|
||||||
let _machine_id = if let Some(ref db) = db {
|
let _machine_id = if let Some(ref db) = db {
|
||||||
match db::machines::upsert_machine(db.pool(), &agent_id, &agent_name, is_persistent).await {
|
match db::machines::upsert_machine(
|
||||||
|
db.pool(),
|
||||||
|
&agent_id,
|
||||||
|
&agent_name,
|
||||||
|
is_persistent,
|
||||||
|
machine_uid.as_deref(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(machine) => {
|
Ok(machine) => {
|
||||||
// Create session record
|
// Create session record
|
||||||
let _ = db::sessions::create_session(
|
let _ = db::sessions::create_session(
|
||||||
|
|||||||
@@ -174,6 +174,17 @@ struct SessionData {
|
|||||||
pub struct SessionManager {
|
pub struct SessionManager {
|
||||||
sessions: Arc<RwLock<HashMap<SessionId, SessionData>>>,
|
sessions: Arc<RwLock<HashMap<SessionId, SessionData>>>,
|
||||||
agents: Arc<RwLock<HashMap<AgentId, SessionId>>>,
|
agents: Arc<RwLock<HashMap<AgentId, SessionId>>>,
|
||||||
|
/// Reattach index keyed by the stable hardware identity (`machine_uid`), for
|
||||||
|
/// the UN-KEYED dedup path (v2-stable-identity Task 2). Populated only when an
|
||||||
|
/// agent reports a (security-gated) machine_uid. Consulted BEFORE the
|
||||||
|
/// `agent_id` index on reconnect, so the SAME physical machine reconnecting
|
||||||
|
/// with a FRESH random agent_id (config loss) reattaches to its existing
|
||||||
|
/// session instead of stranding it and creating a duplicate.
|
||||||
|
///
|
||||||
|
/// Keyed (`cak_`) agents are passed `machine_uid = None` (the relay suppresses
|
||||||
|
/// the claimed uid for them), so they never appear here — their reattach stays
|
||||||
|
/// on the authoritative `agent_id` index, preserving the key's machine binding.
|
||||||
|
machine_uids: Arc<RwLock<HashMap<String, SessionId>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SessionManager {
|
impl SessionManager {
|
||||||
@@ -181,34 +192,80 @@ impl SessionManager {
|
|||||||
Self {
|
Self {
|
||||||
sessions: Arc::new(RwLock::new(HashMap::new())),
|
sessions: Arc::new(RwLock::new(HashMap::new())),
|
||||||
agents: Arc::new(RwLock::new(HashMap::new())),
|
agents: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
machine_uids: Arc::new(RwLock::new(HashMap::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a new agent and create a session
|
/// Register a new agent and create a session.
|
||||||
/// If agent was previously connected (offline session exists), reuse that session
|
///
|
||||||
|
/// Reattach (reconnect to an existing offline session) prefers the stable
|
||||||
|
/// `machine_uid` identity when one is supplied, falling back to `agent_id`
|
||||||
|
/// (v2-stable-identity Task 2):
|
||||||
|
///
|
||||||
|
/// - `machine_uid = Some(uid)` (the UN-KEYED dedup path): look up the prior
|
||||||
|
/// session by `uid` FIRST. The same physical machine reconnecting with a
|
||||||
|
/// FRESH random `agent_id` (config loss) reattaches to its existing session
|
||||||
|
/// rather than stranding it and minting a duplicate. On reattach the session's
|
||||||
|
/// `agent_id` is repointed to the current one and the `agents` index is fixed
|
||||||
|
/// up to match.
|
||||||
|
/// - `machine_uid = None` (legacy / keyed path): behave exactly as before —
|
||||||
|
/// reattach by `agent_id` only. The relay passes `None` for `cak_`-keyed
|
||||||
|
/// agents so their authoritative key-bound identity drives reattach and a
|
||||||
|
/// spoofable claimed uid can never seize another machine's session.
|
||||||
pub async fn register_agent(
|
pub async fn register_agent(
|
||||||
&self,
|
&self,
|
||||||
agent_id: AgentId,
|
agent_id: AgentId,
|
||||||
agent_name: String,
|
agent_name: String,
|
||||||
is_persistent: bool,
|
is_persistent: bool,
|
||||||
|
machine_uid: Option<&str>,
|
||||||
) -> (SessionId, FrameSender, InputReceiver) {
|
) -> (SessionId, FrameSender, InputReceiver) {
|
||||||
// Check if this agent already has an offline session (reconnecting)
|
// Resolve a prior offline session to reattach to. Prefer the stable
|
||||||
{
|
// machine_uid identity; fall back to agent_id. `reattach_via_uid` records
|
||||||
let agents = self.agents.read().await;
|
// whether we matched on uid so we can fix up the agent_id index below.
|
||||||
if let Some(&existing_session_id) = agents.get(&agent_id) {
|
let (existing_session_id, reattach_via_uid) = {
|
||||||
|
let by_uid = match machine_uid {
|
||||||
|
Some(uid) => self.machine_uids.read().await.get(uid).copied(),
|
||||||
|
None => None,
|
||||||
|
};
|
||||||
|
match by_uid {
|
||||||
|
Some(sid) => (Some(sid), true),
|
||||||
|
None => (self.agents.read().await.get(&agent_id).copied(), false),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(existing_session_id) = existing_session_id {
|
||||||
let mut sessions = self.sessions.write().await;
|
let mut sessions = self.sessions.write().await;
|
||||||
if let Some(session_data) = sessions.get_mut(&existing_session_id) {
|
if let Some(session_data) = sessions.get_mut(&existing_session_id) {
|
||||||
if !session_data.info.is_online {
|
if !session_data.info.is_online {
|
||||||
// Reuse existing session - mark as online and create new channels
|
// Reuse existing session - mark as online and create new channels
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
"Agent {} reconnecting to existing session {}",
|
"Agent {} reconnecting to existing session {} (matched on {})",
|
||||||
agent_id,
|
agent_id,
|
||||||
existing_session_id
|
existing_session_id,
|
||||||
|
if reattach_via_uid {
|
||||||
|
"machine_uid"
|
||||||
|
} else {
|
||||||
|
"agent_id"
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
let (frame_tx, _) = broadcast::channel(16);
|
let (frame_tx, _) = broadcast::channel(16);
|
||||||
let (input_tx, input_rx) = tokio::sync::mpsc::channel(64);
|
let (input_tx, input_rx) = tokio::sync::mpsc::channel(64);
|
||||||
|
|
||||||
|
// If we matched on machine_uid, the agent_id may have changed
|
||||||
|
// (config loss minted a fresh one). Repoint the session and fix
|
||||||
|
// up the agent_id index: drop the stale agent_id mapping (only
|
||||||
|
// if it still pointed at THIS session) and add the current one.
|
||||||
|
let previous_agent_id = session_data.info.agent_id.clone();
|
||||||
|
if reattach_via_uid && previous_agent_id != agent_id {
|
||||||
|
let mut agents = self.agents.write().await;
|
||||||
|
if agents.get(&previous_agent_id) == Some(&existing_session_id) {
|
||||||
|
agents.remove(&previous_agent_id);
|
||||||
|
}
|
||||||
|
agents.insert(agent_id.clone(), existing_session_id);
|
||||||
|
session_data.info.agent_id = agent_id.clone();
|
||||||
|
}
|
||||||
|
|
||||||
session_data.info.is_online = true;
|
session_data.info.is_online = true;
|
||||||
session_data.info.last_heartbeat = chrono::Utc::now();
|
session_data.info.last_heartbeat = chrono::Utc::now();
|
||||||
session_data.info.agent_name = agent_name; // Update name in case it changed
|
session_data.info.agent_name = agent_name; // Update name in case it changed
|
||||||
@@ -216,8 +273,16 @@ impl SessionManager {
|
|||||||
session_data.input_tx = input_tx;
|
session_data.input_tx = input_tx;
|
||||||
session_data.last_heartbeat_instant = Instant::now();
|
session_data.last_heartbeat_instant = Instant::now();
|
||||||
|
|
||||||
return (existing_session_id, frame_tx, input_rx);
|
// Ensure the machine_uid index points at this session (it may be
|
||||||
|
// the first time we learn this uid for an agent_id-restored row).
|
||||||
|
if let Some(uid) = machine_uid {
|
||||||
|
self.machine_uids
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.insert(uid.to_string(), existing_session_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return (existing_session_id, frame_tx, input_rx);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -274,6 +339,15 @@ impl SessionManager {
|
|||||||
let mut agents = self.agents.write().await;
|
let mut agents = self.agents.write().await;
|
||||||
agents.insert(agent_id, session_id);
|
agents.insert(agent_id, session_id);
|
||||||
|
|
||||||
|
// Index the new session by its stable machine_uid (un-keyed path) so a later
|
||||||
|
// reconnect with a different agent_id reattaches here instead of duplicating.
|
||||||
|
if let Some(uid) = machine_uid {
|
||||||
|
self.machine_uids
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.insert(uid.to_string(), session_id);
|
||||||
|
}
|
||||||
|
|
||||||
(session_id, frame_tx, input_rx)
|
(session_id, frame_tx, input_rx)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -561,6 +635,11 @@ impl SessionManager {
|
|||||||
drop(sessions); // Release sessions lock before acquiring agents lock
|
drop(sessions); // Release sessions lock before acquiring agents lock
|
||||||
let mut agents = self.agents.write().await;
|
let mut agents = self.agents.write().await;
|
||||||
agents.remove(&agent_id);
|
agents.remove(&agent_id);
|
||||||
|
drop(agents);
|
||||||
|
// Purge any machine_uid index entry for this removed support session
|
||||||
|
// so a stale uid can never reattach it.
|
||||||
|
let mut machine_uids = self.machine_uids.write().await;
|
||||||
|
machine_uids.retain(|_, &mut sid| sid != session_id);
|
||||||
tracing::info!("Support session {} removed", session_id);
|
tracing::info!("Support session {} removed", session_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -573,6 +652,12 @@ impl SessionManager {
|
|||||||
drop(sessions);
|
drop(sessions);
|
||||||
let mut agents = self.agents.write().await;
|
let mut agents = self.agents.write().await;
|
||||||
agents.remove(&session_data.info.agent_id);
|
agents.remove(&session_data.info.agent_id);
|
||||||
|
drop(agents);
|
||||||
|
// Purge any machine_uid index entry pointing at this session so a stale
|
||||||
|
// uid can never reattach a removed session. The map holds one entry per
|
||||||
|
// real machine, so a value scan is cheap.
|
||||||
|
let mut machine_uids = self.machine_uids.write().await;
|
||||||
|
machine_uids.retain(|_, &mut sid| sid != session_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -661,8 +746,19 @@ impl Default for SessionManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SessionManager {
|
impl SessionManager {
|
||||||
/// Restore a machine as an offline session (called on startup from database)
|
/// Restore a machine as an offline session (called on startup from database).
|
||||||
pub async fn restore_offline_machine(&self, agent_id: &str, hostname: &str) -> SessionId {
|
///
|
||||||
|
/// `machine_uid` is the row's stored stable identity (migration 008), or `None`
|
||||||
|
/// for legacy rows. When present it is indexed so a reconnecting un-keyed agent
|
||||||
|
/// reattaches to this restored session by `machine_uid` even if it presents a
|
||||||
|
/// fresh `agent_id` (config loss) after a server restart — matching the live
|
||||||
|
/// reattach path in `register_agent`.
|
||||||
|
pub async fn restore_offline_machine(
|
||||||
|
&self,
|
||||||
|
agent_id: &str,
|
||||||
|
hostname: &str,
|
||||||
|
machine_uid: Option<&str>,
|
||||||
|
) -> SessionId {
|
||||||
let session_id = Uuid::new_v4();
|
let session_id = Uuid::new_v4();
|
||||||
let now = chrono::Utc::now();
|
let now = chrono::Utc::now();
|
||||||
|
|
||||||
@@ -707,10 +803,26 @@ impl SessionManager {
|
|||||||
|
|
||||||
let mut agents = self.agents.write().await;
|
let mut agents = self.agents.write().await;
|
||||||
agents.insert(agent_id.to_string(), session_id);
|
agents.insert(agent_id.to_string(), session_id);
|
||||||
|
drop(agents);
|
||||||
|
|
||||||
|
if let Some(uid) = machine_uid {
|
||||||
|
self.machine_uids
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.insert(uid.to_string(), session_id);
|
||||||
|
}
|
||||||
|
|
||||||
tracing::info!("Restored offline machine: {} ({})", hostname, agent_id);
|
tracing::info!("Restored offline machine: {} ({})", hostname, agent_id);
|
||||||
session_id
|
session_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Test-only: report whether a `machine_uid` is present in the reattach index.
|
||||||
|
/// Lets the restore tests assert the keyed/un-keyed indexing contract that the
|
||||||
|
/// startup loop in `main.rs` depends on (SPEC-004 Task 2 / L1 hardening).
|
||||||
|
#[cfg(test)]
|
||||||
|
async fn is_machine_uid_indexed(&self, uid: &str) -> bool {
|
||||||
|
self.machine_uids.read().await.contains_key(uid)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -733,13 +845,46 @@ mod tests {
|
|||||||
assert!(!ConsentState::Denied.allows_viewer());
|
assert!(!ConsentState::Denied.allows_viewer());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// L1 hardening (SPEC-004 Task 2): the startup restore loop hands `machine_uid =
|
||||||
|
/// None` to `restore_offline_machine` for KEYED machines, so a restored keyed
|
||||||
|
/// session is NEVER reachable via the uid reattach index — closing the spoofing
|
||||||
|
/// path where an un-keyed agent could reattach a keyed machine's offline session
|
||||||
|
/// by claiming its uid. This asserts the index-control contract the loop relies on.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn keyed_machine_restored_with_none_is_not_uid_indexed() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
// Mirrors the loop's decision for a keyed machine: pass None even if the row
|
||||||
|
// carries a (legacy) machine_uid.
|
||||||
|
let legacy_uid = "muid-keyed-legacy";
|
||||||
|
mgr.restore_offline_machine("agent-keyed", "HOST-KEYED", None)
|
||||||
|
.await;
|
||||||
|
assert!(
|
||||||
|
!mgr.is_machine_uid_indexed(legacy_uid).await,
|
||||||
|
"a keyed machine restored with None must not be reachable by machine_uid"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Counterpart: an UN-keyed machine restored WITH its uid IS indexed, so a
|
||||||
|
/// reconnecting un-keyed agent can legitimately reattach by stable identity.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn unkeyed_machine_restored_with_uid_is_indexed() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let uid = "muid-unkeyed";
|
||||||
|
mgr.restore_offline_machine("agent-unkeyed", "HOST-UNKEYED", Some(uid))
|
||||||
|
.await;
|
||||||
|
assert!(
|
||||||
|
mgr.is_machine_uid_indexed(uid).await,
|
||||||
|
"an un-keyed machine restored with its uid must be reattachable by uid"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn attended_session_starts_pending_and_blocks_viewer_until_granted() {
|
async fn attended_session_starts_pending_and_blocks_viewer_until_granted() {
|
||||||
let mgr = SessionManager::new();
|
let mgr = SessionManager::new();
|
||||||
|
|
||||||
// Attended (support-code) session: is_persistent = false.
|
// Attended (support-code) session: is_persistent = false.
|
||||||
let (session_id, _frame_tx, _input_rx) = mgr
|
let (session_id, _frame_tx, _input_rx) = mgr
|
||||||
.register_agent("agent-att".to_string(), "Attended PC".to_string(), false)
|
.register_agent("agent-att".to_string(), "Attended PC".to_string(), false, None)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Starts Pending.
|
// Starts Pending.
|
||||||
@@ -771,7 +916,7 @@ mod tests {
|
|||||||
|
|
||||||
// Managed/persistent session: is_persistent = true.
|
// Managed/persistent session: is_persistent = true.
|
||||||
let (session_id, _frame_tx, _input_rx) = mgr
|
let (session_id, _frame_tx, _input_rx) = mgr
|
||||||
.register_agent("agent-mgd".to_string(), "Managed PC".to_string(), true)
|
.register_agent("agent-mgd".to_string(), "Managed PC".to_string(), true, None)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
@@ -861,7 +1006,7 @@ mod tests {
|
|||||||
async fn agent_status_updates_h264_capability() {
|
async fn agent_status_updates_h264_capability() {
|
||||||
let mgr = SessionManager::new();
|
let mgr = SessionManager::new();
|
||||||
let (session_id, _frame_tx, _input_rx) = mgr
|
let (session_id, _frame_tx, _input_rx) = mgr
|
||||||
.register_agent("agent-cap".to_string(), "Cap PC".to_string(), true)
|
.register_agent("agent-cap".to_string(), "Cap PC".to_string(), true, None)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Default is false until a status reports capability.
|
// Default is false until a status reports capability.
|
||||||
@@ -895,7 +1040,7 @@ mod tests {
|
|||||||
async fn denied_attended_session_keeps_viewer_blocked() {
|
async fn denied_attended_session_keeps_viewer_blocked() {
|
||||||
let mgr = SessionManager::new();
|
let mgr = SessionManager::new();
|
||||||
let (session_id, _frame_tx, _input_rx) = mgr
|
let (session_id, _frame_tx, _input_rx) = mgr
|
||||||
.register_agent("agent-deny".to_string(), "Deny PC".to_string(), false)
|
.register_agent("agent-deny".to_string(), "Deny PC".to_string(), false, None)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
mgr.set_consent_state(session_id, ConsentState::Denied)
|
mgr.set_consent_state(session_id, ConsentState::Denied)
|
||||||
@@ -909,4 +1054,160 @@ mod tests {
|
|||||||
.await
|
.await
|
||||||
.is_none());
|
.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- v2-stable-identity Task 2: machine_uid reattach (un-keyed dedup path) ----
|
||||||
|
|
||||||
|
/// The SAME physical machine (same `machine_uid`) reconnecting with a DIFFERENT
|
||||||
|
/// `agent_id` — exactly the config-loss / fresh-UUID case that produced the
|
||||||
|
/// duplicate rows — must reattach to its existing session, NOT create a second.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn same_machine_uid_different_agent_id_reattaches_one_session() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let uid = "muid-abc";
|
||||||
|
|
||||||
|
// First connect: agent_id #1, persistent (managed).
|
||||||
|
let (sid1, _f1, _i1) = mgr
|
||||||
|
.register_agent("agent-uuid-1".to_string(), "HOST-A".to_string(), true, Some(uid))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Agent disconnects; persistent session is preserved offline.
|
||||||
|
mgr.mark_agent_disconnected(sid1).await;
|
||||||
|
assert_eq!(
|
||||||
|
mgr.get_session(sid1).await.map(|s| s.is_online),
|
||||||
|
Some(false),
|
||||||
|
"persistent session must be preserved offline after disconnect"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Reconnect with a FRESH agent_id but the SAME machine_uid (config loss).
|
||||||
|
let (sid2, _f2, _i2) = mgr
|
||||||
|
.register_agent("agent-uuid-2".to_string(), "HOST-A".to_string(), true, Some(uid))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// Reattached to the SAME session, now back online.
|
||||||
|
assert_eq!(sid2, sid1, "same machine_uid must reattach the existing session");
|
||||||
|
assert_eq!(mgr.get_session(sid1).await.map(|s| s.is_online), Some(true));
|
||||||
|
|
||||||
|
// Exactly ONE session total — no duplicate was created.
|
||||||
|
assert_eq!(mgr.list_sessions().await.len(), 1, "must not duplicate the session");
|
||||||
|
|
||||||
|
// The session's agent_id was repointed to the current one, and the agent_id
|
||||||
|
// index resolves the new id (and no longer the stale one).
|
||||||
|
assert_eq!(
|
||||||
|
mgr.get_session(sid1).await.map(|s| s.agent_id),
|
||||||
|
Some("agent-uuid-2".to_string())
|
||||||
|
);
|
||||||
|
assert!(mgr.get_session_by_agent("agent-uuid-2").await.is_some());
|
||||||
|
assert!(
|
||||||
|
mgr.get_session_by_agent("agent-uuid-1").await.is_none(),
|
||||||
|
"stale agent_id must no longer resolve after machine_uid reattach"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Legacy path: agents that report NO machine_uid keep the original behavior —
|
||||||
|
/// reattach keys on `agent_id`, and a different agent_id with no uid is a
|
||||||
|
/// genuinely different machine (separate session). No crash, no regression.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn legacy_no_machine_uid_dedups_on_agent_id() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
|
||||||
|
// Connect with NO machine_uid (legacy agent), persistent.
|
||||||
|
let (sid1, _f1, _i1) = mgr
|
||||||
|
.register_agent("legacy-agent".to_string(), "HOST-L".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
mgr.mark_agent_disconnected(sid1).await;
|
||||||
|
|
||||||
|
// Same agent_id, still no uid -> reattaches by agent_id (original behavior).
|
||||||
|
let (sid2, _f2, _i2) = mgr
|
||||||
|
.register_agent("legacy-agent".to_string(), "HOST-L".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
assert_eq!(sid2, sid1, "legacy agent_id reattach must be unchanged");
|
||||||
|
assert_eq!(mgr.list_sessions().await.len(), 1);
|
||||||
|
|
||||||
|
// A DIFFERENT legacy agent_id (no uid) is a distinct machine -> new session.
|
||||||
|
let (sid3, _f3, _i3) = mgr
|
||||||
|
.register_agent("legacy-other".to_string(), "HOST-M".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
assert_ne!(sid3, sid1);
|
||||||
|
assert_eq!(mgr.list_sessions().await.len(), 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A keyed agent is modeled here by the relay passing `machine_uid = None`
|
||||||
|
/// (the security gate suppresses the claimed uid). Its reattach therefore keys
|
||||||
|
/// on the authoritative `agent_id` only — a second machine cannot seize its
|
||||||
|
/// session by claiming the same uid, because no uid index entry exists for it.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn keyed_agent_modeled_as_none_reattaches_on_agent_id_only() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
|
||||||
|
// Keyed agent: relay passes None for machine_uid.
|
||||||
|
let (sid1, _f1, _i1) = mgr
|
||||||
|
.register_agent("keyed-machine".to_string(), "HOST-K".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
mgr.mark_agent_disconnected(sid1).await;
|
||||||
|
|
||||||
|
// Another connection CLAIMS a uid (as an un-keyed agent would). Because the
|
||||||
|
// keyed session was indexed by agent_id only (uid suppressed upstream),
|
||||||
|
// this claim cannot reattach the keyed session — it gets its own.
|
||||||
|
let (sid2, _f2, _i2) = mgr
|
||||||
|
.register_agent(
|
||||||
|
"attacker-agent".to_string(),
|
||||||
|
"EVIL".to_string(),
|
||||||
|
false,
|
||||||
|
Some("muid-claimed"),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_ne!(
|
||||||
|
sid2, sid1,
|
||||||
|
"a claimed machine_uid must not reattach a keyed agent's session"
|
||||||
|
);
|
||||||
|
|
||||||
|
// The keyed agent reconnects (relay again passes None) and reattaches its OWN
|
||||||
|
// session by agent_id.
|
||||||
|
let (sid3, _f3, _i3) = mgr
|
||||||
|
.register_agent("keyed-machine".to_string(), "HOST-K".to_string(), true, None)
|
||||||
|
.await;
|
||||||
|
assert_eq!(sid3, sid1, "keyed agent reattaches on its authoritative agent_id");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removing a session purges its machine_uid index entry so a later reconnect
|
||||||
|
/// with the same uid starts fresh instead of pointing at a dead session.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn removing_session_purges_machine_uid_index() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let uid = "muid-purge";
|
||||||
|
|
||||||
|
let (sid1, _f1, _i1) = mgr
|
||||||
|
.register_agent("purge-agent".to_string(), "HOST-P".to_string(), true, Some(uid))
|
||||||
|
.await;
|
||||||
|
mgr.remove_session(sid1).await;
|
||||||
|
assert!(mgr.get_session(sid1).await.is_none());
|
||||||
|
|
||||||
|
// Reconnect with the same uid -> a brand-new session (no stale reattach).
|
||||||
|
let (sid2, _f2, _i2) = mgr
|
||||||
|
.register_agent("purge-agent-2".to_string(), "HOST-P".to_string(), true, Some(uid))
|
||||||
|
.await;
|
||||||
|
assert_ne!(sid2, sid1, "removed session's uid must not reattach");
|
||||||
|
assert_eq!(mgr.list_sessions().await.len(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// After a server restart, a restored offline row indexed by machine_uid must
|
||||||
|
/// let an un-keyed agent reattach by uid even if it presents a fresh agent_id.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn restored_offline_machine_reattaches_by_machine_uid() {
|
||||||
|
let mgr = SessionManager::new();
|
||||||
|
let uid = "muid-restored";
|
||||||
|
|
||||||
|
// Simulate startup reconcile: restore an offline persistent machine with uid.
|
||||||
|
let sid = mgr
|
||||||
|
.restore_offline_machine("orig-agent", "HOST-R", Some(uid))
|
||||||
|
.await;
|
||||||
|
assert_eq!(mgr.get_session(sid).await.map(|s| s.is_online), Some(false));
|
||||||
|
|
||||||
|
// Agent reconnects post-restart with a FRESH agent_id but the same uid.
|
||||||
|
let (sid2, _f2, _i2) = mgr
|
||||||
|
.register_agent("fresh-agent".to_string(), "HOST-R".to_string(), true, Some(uid))
|
||||||
|
.await;
|
||||||
|
assert_eq!(sid2, sid, "restored machine must reattach by machine_uid");
|
||||||
|
assert_eq!(mgr.list_sessions().await.len(), 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user