Files
guru-connect/server/src/db/machines.rs
Mike Swanson f950511e3e
Some checks failed
Build and Test / Build Agent (Windows) (push) Successful in 8m16s
Build and Test / Build Server (Linux) (push) Successful in 11m58s
Build and Test / Security Audit (push) Has started running
Build and Test / Build Summary (push) Has been cancelled
fix(server): bind machine_uid upsert ON CONFLICT to the partial index (WHERE machine_uid IS NOT NULL)
Bare ON CONFLICT (machine_uid) could not bind to migration 008's partial unique index, so no connect_machines row was persisted for any agent reporting a machine_uid. Confirmed live on 172.16.3.30 with a signed 0.3.0 test agent.
2026-06-01 09:50:34 -07:00

650 lines
26 KiB
Rust

//! Machine/Agent database operations
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::{postgres::PgRow, FromRow, PgPool, Row};
use uuid::Uuid;
/// Machine record from database
///
/// `FromRow` is implemented manually (not derived) so that every column whose
/// schema definition is *nullable* decodes NULL-tolerantly. The `connect_machines`
/// table was created in migration 001 with only `DEFAULT` clauses (no `NOT NULL`)
/// on `is_elevated`, `is_persistent`, `first_seen`, `last_seen`, `status`,
/// `created_at`, and `updated_at`; `tags` (migration 005) likewise ended up
/// nullable with no default on the production instance. A derived `FromRow` maps
/// those to non-`Option` Rust types and errors at decode time the moment any cell
/// is NULL (`unexpected null; try decoding as an Option`). In production a row with
/// `tags IS NULL` broke the startup reconcile task and would 500 the authenticated
/// Machines list. The manual impl below reads every nullable column as
/// `Option<T>` and falls back to `Default::default()`, so a NULL can never panic or
/// error regardless of how the column was created. Truly non-null columns (`id`,
/// `agent_id`, `hostname`) are decoded directly. Migration 007 additionally pins
/// `tags` to `DEFAULT '{}'` and backfills existing NULLs (defense in depth).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Machine {
pub id: Uuid,
pub agent_id: String,
pub hostname: String,
pub os_version: Option<String>,
pub is_elevated: bool,
pub is_persistent: bool,
pub first_seen: DateTime<Utc>,
pub last_seen: DateTime<Utc>,
pub last_session_id: Option<Uuid>,
pub status: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
/// Tenancy-ready (Phase 4). Backfilled to the default tenant by migration 004.
pub tenant_id: Option<Uuid>,
/// Company/organization name reported by the agent (`AgentStatus.organization`).
/// Column added in migration 005; previously written by `update_machine_metadata`
/// against a non-existent column (the write silently failed). Now mapped here so
/// `SELECT *` returns it.
pub organization: Option<String>,
/// Site/location name reported by the agent (`AgentStatus.site`). Migration 005.
pub site: Option<String>,
/// Free-form tags reported by the agent (`AgentStatus.tags`). Stored as a
/// Postgres `TEXT[]`; matches the `&[String]` bound by `update_machine_metadata`.
/// Migration 005. A `NULL` cell decodes to an empty vec (see the manual `FromRow`
/// impl) so it can never error at decode time.
#[serde(default)]
pub tags: Vec<String>,
/// Deterministic, recomputable hardware identity reported by the agent
/// (`AgentStatus.machine_uid` / connect query param). Column added in migration
/// 008. NULLABLE: legacy rows and agents that do not report a uid carry `None`.
/// For un-keyed agents this is the dedup key (`upsert_machine` keys
/// `ON CONFLICT (machine_uid)` when present); for `cak_`-keyed agents the key's
/// machine binding stays authoritative and the claimed uid is NOT used to dedup
/// (see `upsert_machine`).
pub machine_uid: Option<String>,
/// Soft-delete marker (migration 009). When non-null the machine was
/// operator-purged (Task 5): it is excluded from every list/get query and is
/// never restored by the startup reconcile, but the row (and its audit
/// history) is retained. NULL = live. Nullable, so it is read NULL-tolerantly
/// in the manual `FromRow` below.
pub deleted_at: Option<DateTime<Utc>>,
}
impl<'r> FromRow<'r, PgRow> for Machine {
fn from_row(row: &'r PgRow) -> Result<Self, sqlx::Error> {
Ok(Self {
// NOT NULL columns: decode directly.
id: row.try_get("id")?,
agent_id: row.try_get("agent_id")?,
hostname: row.try_get("hostname")?,
// Schema-nullable in `Option<T>` form already: decode directly.
os_version: row.try_get("os_version")?,
last_session_id: row.try_get("last_session_id")?,
tenant_id: row.try_get("tenant_id")?,
organization: row.try_get("organization")?,
site: row.try_get("site")?,
// Schema-nullable (migration 008); decode directly as Option.
machine_uid: row.try_get("machine_uid")?,
// Schema-nullable (migration 009); decode directly as Option.
deleted_at: row.try_get("deleted_at")?,
// Nullable-with-default columns mapped to non-`Option` Rust types: read as
// `Option<T>` and fall back to the type default so a NULL cell never errors.
is_elevated: row
.try_get::<Option<bool>, _>("is_elevated")?
.unwrap_or_default(),
is_persistent: row
.try_get::<Option<bool>, _>("is_persistent")?
.unwrap_or_default(),
first_seen: row
.try_get::<Option<DateTime<Utc>>, _>("first_seen")?
.unwrap_or_default(),
last_seen: row
.try_get::<Option<DateTime<Utc>>, _>("last_seen")?
.unwrap_or_default(),
status: row
.try_get::<Option<String>, _>("status")?
.unwrap_or_default(),
created_at: row
.try_get::<Option<DateTime<Utc>>, _>("created_at")?
.unwrap_or_default(),
updated_at: row
.try_get::<Option<DateTime<Utc>>, _>("updated_at")?
.unwrap_or_default(),
// The production bug: `tags` was nullable with no default. A NULL cell
// decodes to an empty vec here instead of erroring.
tags: row
.try_get::<Option<Vec<String>>, _>("tags")?
.unwrap_or_default(),
})
}
}
/// Get or create a machine record (upsert), deduplicating on the most stable
/// identity available (SPEC-004 / v2-stable-identity Task 2).
///
/// Two dedup paths, selected by whether the caller passes a `machine_uid`:
///
/// - **`machine_uid = Some(uid)` (the un-keyed dedup path):** key on the stable
/// hardware identity — `ON CONFLICT (machine_uid)`. The SAME machine reconnecting
/// with a DIFFERENT `agent_id` (e.g. after a config loss minted a fresh random id)
/// updates its EXISTING row instead of inserting a duplicate. `agent_id` and
/// `hostname` are refreshed to the latest reported values. This is what collapses
/// the duplicate-registration fleet down to one row per real machine.
///
/// - **`machine_uid = None` (legacy / authoritative path):** preserve the original
/// behavior exactly — `ON CONFLICT (agent_id)`. Used for agents that do not report
/// a uid AND, critically, for `cak_`-keyed agents: the caller (`relay`) passes
/// `None` for keyed agents so their AUTHORITATIVE key-bound `agent_id` is the dedup
/// key and a client-claimed `machine_uid` can never repoint a keyed machine's row.
///
/// SECURITY: a client-asserted `machine_uid` is spoofable, so it is a *correctness*
/// aid, not a trust boundary. Only the un-keyed path supplies it; the keyed path's
/// authority lives in the key→machine binding upstream (see
/// `relay::agent_ws_handler`), never here.
///
/// SOFT-DELETE REVIVE (Task 5): both `ON CONFLICT DO UPDATE` arms clear
/// `deleted_at` (set it back to NULL). A machine that was operator-purged but then
/// genuinely reconnects is a live machine again, so it must reappear in the console
/// rather than stay hidden behind a stale soft-delete marker. A purge of a truly
/// gone host is permanent precisely because such a host never upserts again.
pub async fn upsert_machine(
pool: &PgPool,
agent_id: &str,
hostname: &str,
is_persistent: bool,
machine_uid: Option<&str>,
) -> Result<Machine, sqlx::Error> {
match machine_uid {
// Un-keyed dedup path: stable hardware identity is the conflict arbiter.
// A new agent_id for the same physical machine updates the existing row.
//
// Edge case: if this INSERT's new random `agent_id` happens to collide with a
// DIFFERENT legacy row's value under the `agent_id UNIQUE` constraint, Postgres
// raises the unique violation on `agent_id` BEFORE the `ON CONFLICT
// (machine_uid)` arbiter is consulted, so the upsert errors instead of merging
// on uid. This is non-fatal: the caller logs the error and the live in-memory
// session is unaffected, and the agent simply retries with a freshly minted
// UUID. The window closes on its own as legacy rows age out (SPEC-004 Task 3).
Some(uid) => {
sqlx::query_as::<_, Machine>(
r#"
INSERT INTO connect_machines (agent_id, hostname, is_persistent, status, last_seen, machine_uid)
VALUES ($1, $2, $3, 'online', NOW(), $4)
ON CONFLICT (machine_uid) WHERE machine_uid IS NOT NULL DO UPDATE SET
agent_id = EXCLUDED.agent_id,
hostname = EXCLUDED.hostname,
status = 'online',
last_seen = NOW(),
deleted_at = NULL
RETURNING *
"#,
)
.bind(agent_id)
.bind(hostname)
.bind(is_persistent)
.bind(uid)
.fetch_one(pool)
.await
}
// Legacy / authoritative path: dedup on agent_id exactly as before. Leaves
// machine_uid NULL (the partial unique index excludes NULLs, so any number
// of these may coexist).
None => {
sqlx::query_as::<_, Machine>(
r#"
INSERT INTO connect_machines (agent_id, hostname, is_persistent, status, last_seen)
VALUES ($1, $2, $3, 'online', NOW())
ON CONFLICT (agent_id) DO UPDATE SET
hostname = EXCLUDED.hostname,
status = 'online',
last_seen = NOW(),
deleted_at = NULL
RETURNING *
"#,
)
.bind(agent_id)
.bind(hostname)
.bind(is_persistent)
.fetch_one(pool)
.await
}
}
}
/// Update machine status and info
#[allow(dead_code)] // TODO(native-remote-control): consumed by the integration API; see docs/specs/native-remote-control/
pub async fn update_machine_status(
pool: &PgPool,
agent_id: &str,
status: &str,
os_version: Option<&str>,
is_elevated: bool,
session_id: Option<Uuid>,
) -> Result<(), sqlx::Error> {
sqlx::query(
r#"
UPDATE connect_machines SET
status = $1,
os_version = COALESCE($2, os_version),
is_elevated = $3,
last_seen = NOW(),
last_session_id = COALESCE($4, last_session_id)
WHERE agent_id = $5
"#,
)
.bind(status)
.bind(os_version)
.bind(is_elevated)
.bind(session_id)
.bind(agent_id)
.execute(pool)
.await?;
Ok(())
}
/// Get all persistent machines (for the dashboard list AND the startup restore).
///
/// Excludes operator-purged rows (`deleted_at IS NOT NULL`, migration 009 / Task 5):
/// a soft-deleted machine must not reappear in `/api/machines` and must not be
/// re-restored into the in-memory session manager on startup. This is the filter
/// that makes the ghost-row purge stick.
pub async fn get_all_machines(pool: &PgPool) -> Result<Vec<Machine>, sqlx::Error> {
sqlx::query_as::<_, Machine>(
"SELECT * FROM connect_machines WHERE is_persistent = true AND deleted_at IS NULL ORDER BY hostname",
)
.fetch_all(pool)
.await
}
/// Get machine by agent_id (live rows only — excludes soft-deleted, Task 5).
pub async fn get_machine_by_agent_id(
pool: &PgPool,
agent_id: &str,
) -> Result<Option<Machine>, sqlx::Error> {
sqlx::query_as::<_, Machine>(
"SELECT * FROM connect_machines WHERE agent_id = $1 AND deleted_at IS NULL",
)
.bind(agent_id)
.fetch_optional(pool)
.await
}
/// Get machine by its primary-key UUID (`connect_machines.id`).
///
/// Used by the agent WS plane to resolve the trusted `machine_id` returned by
/// `verify_agent_key` back to its canonical `agent_id`, so persistent reattach
/// binds to the authenticated identity rather than a client-supplied query
/// param (Task 3 identity binding).
///
/// NOTE (Task 5): this deliberately does NOT filter `deleted_at IS NULL`. It is
/// the authenticated-identity resolver for a `cak_`-keyed agent's reattach, not a
/// dashboard read. If a previously operator-purged machine genuinely reconnects
/// with a valid key, it must resolve so `upsert_machine` can revive it (the
/// upsert clears `deleted_at`). The dashboard get-by-id path is
/// `get_machine_by_agent_id`, which IS filtered.
pub async fn get_machine_by_id(
pool: &PgPool,
machine_id: Uuid,
) -> Result<Option<Machine>, sqlx::Error> {
sqlx::query_as::<_, Machine>("SELECT * FROM connect_machines WHERE id = $1")
.bind(machine_id)
.fetch_optional(pool)
.await
}
/// Mark machine as offline
pub async fn mark_machine_offline(pool: &PgPool, agent_id: &str) -> Result<(), sqlx::Error> {
sqlx::query(
"UPDATE connect_machines SET status = 'offline', last_seen = NOW() WHERE agent_id = $1",
)
.bind(agent_id)
.execute(pool)
.await?;
Ok(())
}
/// Hard-delete a machine record (legacy path, retained for backward compatibility).
///
/// Cascades to `connect_sessions` / `connect_session_events` via the FKs, so it
/// also destroys audit history. The Task-5 operator-removal flow prefers
/// [`soft_delete_machine`] instead, which keeps the row for the audit trail.
pub async fn delete_machine(pool: &PgPool, agent_id: &str) -> Result<(), sqlx::Error> {
sqlx::query("DELETE FROM connect_machines WHERE agent_id = $1")
.bind(agent_id)
.execute(pool)
.await?;
Ok(())
}
/// Soft-delete (operator purge) a single machine by `agent_id` (Task 5).
///
/// Sets `deleted_at = NOW()` so the row is excluded from every list/get query and
/// the startup reconcile, while retaining the row and its `connect_session_events`
/// history for the audit trail. Only flips rows that are still live
/// (`deleted_at IS NULL`), so a re-purge is a no-op rather than overwriting the
/// original removal instant. Returns the number of rows affected (0 = unknown or
/// already-purged `agent_id`), letting the caller distinguish a 404 from a success.
pub async fn soft_delete_machine(pool: &PgPool, agent_id: &str) -> Result<u64, sqlx::Error> {
let result = sqlx::query(
"UPDATE connect_machines SET deleted_at = NOW() WHERE agent_id = $1 AND deleted_at IS NULL",
)
.bind(agent_id)
.execute(pool)
.await?;
Ok(result.rows_affected())
}
/// Update machine organization, site, and tags
pub async fn update_machine_metadata(
pool: &PgPool,
agent_id: &str,
organization: Option<&str>,
site: Option<&str>,
tags: &[String],
) -> Result<(), sqlx::Error> {
// Only update if at least one value is provided
if organization.is_none() && site.is_none() && tags.is_empty() {
return Ok(());
}
sqlx::query(
r#"
UPDATE connect_machines SET
organization = COALESCE($1, organization),
site = COALESCE($2, site),
tags = CASE WHEN $3::text[] = '{}' THEN tags ELSE $3 END
WHERE agent_id = $4
"#,
)
.bind(organization)
.bind(site)
.bind(tags)
.bind(agent_id)
.execute(pool)
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use sqlx::postgres::PgPoolOptions;
/// Connect to a throwaway test Postgres and apply migrations, or return `None`
/// when `TEST_DATABASE_URL` is unset so the suite is a no-op on workstations
/// without a database. CI sets `TEST_DATABASE_URL` against an ephemeral Postgres,
/// where these run for real. (The server crate is Linux-targeted and validated
/// in Gitea CI; these DB tests run there.)
async fn test_pool() -> Option<PgPool> {
let url = std::env::var("TEST_DATABASE_URL").ok()?;
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&url)
.await
.expect("connect to TEST_DATABASE_URL");
sqlx::migrate!("./migrations")
.run(&pool)
.await
.expect("apply migrations to the test database");
Some(pool)
}
/// Remove any rows this test created so reruns are clean and tests don't collide.
async fn cleanup(pool: &PgPool, agent_ids: &[&str], machine_uids: &[&str]) {
for id in agent_ids {
let _ = sqlx::query("DELETE FROM connect_machines WHERE agent_id = $1")
.bind(id)
.execute(pool)
.await;
}
for uid in machine_uids {
let _ = sqlx::query("DELETE FROM connect_machines WHERE machine_uid = $1")
.bind(uid)
.execute(pool)
.await;
}
}
/// (a) Same `machine_uid` with two DIFFERENT `agent_id`s collapses to ONE row.
/// The second upsert updates the existing row (and repoints its agent_id) rather
/// than inserting a duplicate — the core dedup guarantee for the un-keyed fleet.
#[tokio::test]
async fn same_machine_uid_two_agent_ids_one_row() {
let Some(pool) = test_pool().await else {
return; // no TEST_DATABASE_URL: skip (runs in CI)
};
let uid = "test-muid-dedup-001";
cleanup(&pool, &["agent-A", "agent-B"], &[uid]).await;
let m1 = upsert_machine(&pool, "agent-A", "HOST-A", true, Some(uid))
.await
.expect("first upsert");
let m2 = upsert_machine(&pool, "agent-B", "HOST-A2", true, Some(uid))
.await
.expect("second upsert with same uid, different agent_id");
// Same physical row, agent_id and hostname refreshed to the latest.
assert_eq!(m1.id, m2.id, "same machine_uid must update the same row");
assert_eq!(m2.agent_id, "agent-B");
assert_eq!(m2.hostname, "HOST-A2");
assert_eq!(m2.machine_uid.as_deref(), Some(uid));
// Exactly one row carries this uid.
let count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM connect_machines WHERE machine_uid = $1")
.bind(uid)
.fetch_one(&pool)
.await
.expect("count rows for uid");
assert_eq!(count, 1, "must be exactly one row for the machine_uid");
cleanup(&pool, &["agent-A", "agent-B"], &[uid]).await;
}
/// (b) Legacy NULL-`machine_uid` path is unchanged: dedup keys on `agent_id`,
/// the row's `machine_uid` stays NULL, and re-upserting the same agent_id with
/// no uid updates the same row (no crash, no duplicate).
#[tokio::test]
async fn legacy_null_machine_uid_dedups_on_agent_id() {
let Some(pool) = test_pool().await else {
return; // no TEST_DATABASE_URL: skip (runs in CI)
};
let agent = "test-legacy-agent-001";
cleanup(&pool, &[agent], &[]).await;
let m1 = upsert_machine(&pool, agent, "LEGACY-HOST", true, None)
.await
.expect("legacy upsert (no uid)");
assert_eq!(
m1.machine_uid, None,
"legacy row must have NULL machine_uid"
);
let m2 = upsert_machine(&pool, agent, "LEGACY-HOST-RENAMED", true, None)
.await
.expect("legacy re-upsert (no uid)");
assert_eq!(m1.id, m2.id, "legacy agent_id must dedup to the same row");
assert_eq!(m2.hostname, "LEGACY-HOST-RENAMED");
assert_eq!(m2.machine_uid, None);
let count: i64 =
sqlx::query_scalar("SELECT COUNT(*) FROM connect_machines WHERE agent_id = $1")
.bind(agent)
.fetch_one(&pool)
.await
.expect("count legacy rows");
assert_eq!(count, 1, "legacy path must not duplicate the row");
cleanup(&pool, &[agent], &[]).await;
}
/// Multiple legacy rows with NULL machine_uid coexist — the partial unique index
/// excludes NULLs, so distinct un-keyed agents are independent rows.
#[tokio::test]
async fn multiple_null_machine_uid_rows_coexist() {
let Some(pool) = test_pool().await else {
return; // no TEST_DATABASE_URL: skip (runs in CI)
};
cleanup(&pool, &["null-1", "null-2"], &[]).await;
let a = upsert_machine(&pool, "null-1", "H1", true, None)
.await
.expect("first null-uid row");
let b = upsert_machine(&pool, "null-2", "H2", true, None)
.await
.expect("second null-uid row");
assert_ne!(a.id, b.id, "distinct legacy agents must be distinct rows");
cleanup(&pool, &["null-1", "null-2"], &[]).await;
}
/// Helper: does `get_all_machines` (the dashboard list / startup restore query)
/// currently return a row with this agent_id?
async fn list_contains(pool: &PgPool, agent_id: &str) -> bool {
get_all_machines(pool)
.await
.expect("list machines")
.iter()
.any(|m| m.agent_id == agent_id)
}
/// Task 5: soft-deleting a machine sets `deleted_at` and excludes it from BOTH
/// the list query and the by-agent_id get — the core operator-removal guarantee
/// (a purged ghost row must not reappear in /api/machines).
#[tokio::test]
async fn soft_delete_machine_hides_from_list_and_get() {
let Some(pool) = test_pool().await else {
return; // no TEST_DATABASE_URL: skip (runs in CI)
};
let agent = "test-softdel-agent-001";
cleanup(&pool, &[agent], &[]).await;
let m = upsert_machine(&pool, agent, "SOFTDEL-HOST", true, None)
.await
.expect("create machine");
assert!(m.deleted_at.is_none(), "fresh row must be live");
assert!(
list_contains(&pool, agent).await,
"live machine must be listed"
);
assert!(
get_machine_by_agent_id(&pool, agent)
.await
.expect("get")
.is_some(),
"live machine must be gettable"
);
// Soft-delete.
let affected = soft_delete_machine(&pool, agent)
.await
.expect("soft delete");
assert_eq!(affected, 1, "exactly one live row flips to deleted");
// Excluded from list and get.
assert!(
!list_contains(&pool, agent).await,
"soft-deleted machine must NOT be listed"
);
assert!(
get_machine_by_agent_id(&pool, agent)
.await
.expect("get after delete")
.is_none(),
"soft-deleted machine must NOT be gettable by agent_id"
);
// The row still exists with a non-null deleted_at (history retained).
let deleted_at: Option<DateTime<Utc>> =
sqlx::query_scalar("SELECT deleted_at FROM connect_machines WHERE agent_id = $1")
.bind(agent)
.fetch_one(&pool)
.await
.expect("row still present");
assert!(
deleted_at.is_some(),
"row must be retained with deleted_at set"
);
// Re-purge is a no-op (does not overwrite the original instant).
let again = soft_delete_machine(&pool, agent)
.await
.expect("re-soft-delete");
assert_eq!(
again, 0,
"re-purge of an already-deleted row affects 0 rows"
);
cleanup(&pool, &[agent], &[]).await;
}
/// Task 5: a genuine reconnect (upsert) of a previously soft-deleted machine
/// REVIVES it — `deleted_at` is cleared so it reappears in the console. A purge
/// only sticks for a host that never upserts again.
#[tokio::test]
async fn upsert_revives_soft_deleted_machine() {
let Some(pool) = test_pool().await else {
return; // no TEST_DATABASE_URL: skip (runs in CI)
};
let agent = "test-revive-agent-001";
cleanup(&pool, &[agent], &[]).await;
upsert_machine(&pool, agent, "REVIVE-HOST", true, None)
.await
.expect("create");
soft_delete_machine(&pool, agent)
.await
.expect("soft delete");
assert!(!list_contains(&pool, agent).await, "purged: hidden");
// Reconnect.
let revived = upsert_machine(&pool, agent, "REVIVE-HOST", true, None)
.await
.expect("reconnect upsert");
assert!(
revived.deleted_at.is_none(),
"reconnect must clear deleted_at"
);
assert!(
list_contains(&pool, agent).await,
"revived machine must be listed again"
);
cleanup(&pool, &[agent], &[]).await;
}
/// Task 5: bulk soft-delete (as the bulk endpoint does, one id at a time)
/// removes every listed id from the live list.
#[tokio::test]
async fn bulk_soft_delete_hides_all_listed() {
let Some(pool) = test_pool().await else {
return; // no TEST_DATABASE_URL: skip (runs in CI)
};
let agents = ["test-bulk-a", "test-bulk-b", "test-bulk-c"];
cleanup(&pool, &agents, &[]).await;
for (i, a) in agents.iter().enumerate() {
upsert_machine(&pool, a, &format!("BULK-HOST-{i}"), true, None)
.await
.expect("create bulk machine");
}
for a in &agents {
assert!(list_contains(&pool, a).await, "{a} listed before bulk");
}
// Purge all three (the bulk endpoint loops soft_delete_machine per id).
let mut removed = 0u64;
for a in &agents {
removed += soft_delete_machine(&pool, a)
.await
.expect("bulk soft delete");
}
assert_eq!(removed, 3, "all three live rows flipped to deleted");
for a in &agents {
assert!(
!list_contains(&pool, a).await,
"{a} must be hidden after bulk purge"
);
}
cleanup(&pool, &agents, &[]).await;
}
}