feat(server): operator removal of stale sessions/machines (SPEC-004 Task 5, server)
All checks were successful
All checks were successful
Admin-gated soft-delete + purge so operators can clear ghost machines/sessions (the ~15-rows-for-one-host accumulation) from the console. - migration 009: deleted_at on connect_sessions + connect_machines, with partial indexes WHERE deleted_at IS NULL. - DELETE /api/machines/:agent_id?purge=true and DELETE /api/sessions/:id?purge=true soft-delete the row and purge the in-memory session (remove_session); the non-purge path keeps the legacy hard-delete / live-only disconnect. POST /api/machines/bulk-remove handles multi-select (batch cap 500). All admin-gated (AdminUser -> 403; tightens the prior any-user delete) and audited to connect_session_events (actor + target + trusted client IP). - list/get queries filter deleted_at IS NULL so removed units leave the console; upsert revives (deleted_at = NULL) a genuinely-reconnecting machine. The keyed-reattach identity resolver (get_machine_by_id) is intentionally unfiltered. Dashboard removal UI is the A3b follow-up. 86 server tests pass; fmt/clippy/test clean. Implements specs/v2-stable-identity/plan.md Task 5 (server portion). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
43
server/migrations/009_session_machine_soft_delete.sql
Normal file
43
server/migrations/009_session_machine_soft_delete.sql
Normal file
@@ -0,0 +1,43 @@
|
||||
-- Migration: 009_session_machine_soft_delete.sql
|
||||
-- Purpose: Give connect_machines and connect_sessions a soft-delete marker
|
||||
-- (deleted_at) so an operator can PURGE a stale machine/session —
|
||||
-- removing it from the live console — without destroying its audit
|
||||
-- history (SPEC-004 / v2-stable-identity Task 5).
|
||||
--
|
||||
-- Task 5 is the operator-removal mechanism that finally purges the ~14 live
|
||||
-- ghost connect_machines rows left by the duplicate-registration bug. The
|
||||
-- live-only admin "disconnect" (DELETE /api/sessions/:id) and the legacy hard
|
||||
-- DELETE /api/machines/:agent_id stay as they were; the NEW purge path
|
||||
-- (`?purge=true`) sets deleted_at, drops the in-memory session via
|
||||
-- SessionManager::remove_session, and writes an audit row. A soft delete keeps
|
||||
-- the row (and its connect_session_events history) for the audit trail per the
|
||||
-- project convention "prefer deleted_at over hard deletes" (CLAUDE.md), while
|
||||
-- every list/get query filters `deleted_at IS NULL` so the purged unit
|
||||
-- disappears from the dashboard and the startup reconcile never restores it.
|
||||
--
|
||||
-- Idempotent: ADD COLUMN IF NOT EXISTS. The columns are NULLABLE with no default,
|
||||
-- so adding them is a metadata-only change on Postgres (no table rewrite, no row
|
||||
-- locks held for a scan) — safe to apply online. A NULL deleted_at means "live";
|
||||
-- a non-null timestamp means "removed at that instant". Applied on server startup
|
||||
-- by sqlx::migrate!(); never pre-applied via psql. Ordered after 008.
|
||||
-- See .claude/standards/gururmm/sqlx-migrations.md.
|
||||
|
||||
-- 1. connect_sessions.deleted_at: when set, the session was operator-purged and is
|
||||
-- excluded from every list/get query. NULL = live.
|
||||
ALTER TABLE connect_sessions ADD COLUMN IF NOT EXISTS deleted_at TIMESTAMPTZ;
|
||||
|
||||
-- 2. connect_machines.deleted_at: when set, the machine was operator-purged. This
|
||||
-- is the marker that hides the ghost duplicate rows from /api/machines and the
|
||||
-- startup reconcile. NULL = live.
|
||||
ALTER TABLE connect_machines ADD COLUMN IF NOT EXISTS deleted_at TIMESTAMPTZ;
|
||||
|
||||
-- 3. Partial indexes so the hot "live rows only" filter (deleted_at IS NULL) used
|
||||
-- by every list query stays an index scan as the soft-deleted set grows. The
|
||||
-- predicate matches the WHERE clause the queries use.
|
||||
CREATE INDEX IF NOT EXISTS idx_connect_machines_live
|
||||
ON connect_machines (hostname)
|
||||
WHERE deleted_at IS NULL;
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_connect_sessions_live
|
||||
ON connect_sessions (started_at DESC)
|
||||
WHERE deleted_at IS NULL;
|
||||
@@ -6,6 +6,7 @@ pub mod changelog;
|
||||
pub mod downloads;
|
||||
pub mod machine_keys;
|
||||
pub mod releases;
|
||||
pub mod removal;
|
||||
pub mod sessions;
|
||||
pub mod users;
|
||||
|
||||
@@ -172,7 +173,7 @@ impl From<db::sessions::DbSession> for SessionRecord {
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct EventRecord {
|
||||
pub id: i64,
|
||||
pub session_id: String,
|
||||
pub session_id: Option<String>,
|
||||
pub event_type: String,
|
||||
pub timestamp: String,
|
||||
pub viewer_id: Option<String>,
|
||||
@@ -185,7 +186,7 @@ impl From<db::events::SessionEvent> for EventRecord {
|
||||
fn from(e: db::events::SessionEvent) -> Self {
|
||||
Self {
|
||||
id: e.id,
|
||||
session_id: e.session_id.to_string(),
|
||||
session_id: e.session_id.map(|id| id.to_string()),
|
||||
event_type: e.event_type,
|
||||
timestamp: e.timestamp.to_rfc3339(),
|
||||
viewer_id: e.viewer_id,
|
||||
@@ -208,12 +209,17 @@ pub struct MachineHistory {
|
||||
/// Query parameters for machine deletion
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct DeleteMachineParams {
|
||||
/// If true, send uninstall command to agent (if online)
|
||||
/// If true, send uninstall command to agent (if online). Legacy (non-purge) path.
|
||||
#[serde(default)]
|
||||
pub uninstall: bool,
|
||||
/// If true, include history in response before deletion
|
||||
/// If true, include history in response before deletion. Legacy (non-purge) path.
|
||||
#[serde(default)]
|
||||
pub export: bool,
|
||||
/// If true, take the Task-5 SOFT-DELETE path: set `connect_machines.deleted_at`,
|
||||
/// drop the live in-memory session, and audit the removal — instead of the legacy
|
||||
/// hard delete. This is the operator-removal mechanism that purges ghost rows.
|
||||
#[serde(default)]
|
||||
pub purge: bool,
|
||||
}
|
||||
|
||||
/// Response for machine deletion
|
||||
|
||||
612
server/src/api/removal.rs
Normal file
612
server/src/api/removal.rs
Normal file
@@ -0,0 +1,612 @@
|
||||
//! Operator-removal endpoints (admin plane) — SPEC-004 / v2-stable-identity Task 5.
|
||||
//!
|
||||
//! Lets an administrator PURGE stale machines and sessions: soft-delete the DB row
|
||||
//! (`deleted_at = NOW()`, retaining the audit history), drop the live in-memory
|
||||
//! session via [`SessionManager::remove_session`], and write an audit row to
|
||||
//! `connect_session_events`. This is the mechanism that removes the ghost duplicate
|
||||
//! `connect_machines` rows left by the historical duplicate-registration bug.
|
||||
//!
|
||||
//! Removal semantics vs. the pre-existing live-only controls:
|
||||
//! - `DELETE /api/sessions/:id` (live-only "disconnect") sends a `Disconnect`
|
||||
//! message to the agent. It does NOT touch the DB. Left unchanged.
|
||||
//! - `DELETE /api/machines/:agent_id` WITHOUT `?purge=true` keeps the legacy
|
||||
//! behavior (optional `?uninstall=true` admin command, optional `?export=true`
|
||||
//! history dump, then a HARD delete). Left unchanged.
|
||||
//! - `?purge=true` on either route is the NEW soft-delete path defined here.
|
||||
//!
|
||||
//! Auth: dashboard JWT + admin role (the [`AdminUser`] extractor). A non-admin gets
|
||||
//! 403. Every purge is audited with the acting admin and the target id.
|
||||
//!
|
||||
//! Errors use the shared [`ApiError`] envelope; raw `sqlx` strings are never sent to
|
||||
//! the client (they are logged server-side via `tracing`).
|
||||
|
||||
use std::net::IpAddr;
|
||||
|
||||
use axum::{
|
||||
extract::{ConnectInfo, Path, Query, State},
|
||||
http::{HeaderMap, StatusCode},
|
||||
Json,
|
||||
};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::net::SocketAddr;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::auth::AdminUser;
|
||||
use crate::db;
|
||||
use crate::db::events::EventTypes;
|
||||
use crate::proto;
|
||||
use crate::AppState;
|
||||
|
||||
use super::machine_keys::ApiError;
|
||||
use super::{DeleteMachineParams, DeleteMachineResponse, MachineHistory};
|
||||
|
||||
type ApiResult<T> = Result<T, (StatusCode, Json<ApiError>)>;
|
||||
|
||||
/// Build the shared error envelope (mirrors `machine_keys`/`sessions`).
|
||||
fn err(status: StatusCode, code: &str, detail: &str) -> (StatusCode, Json<ApiError>) {
|
||||
(
|
||||
status,
|
||||
Json(ApiError {
|
||||
detail: detail.to_string(),
|
||||
error_code: code.to_string(),
|
||||
status_code: status.as_u16(),
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
/// Resolve the live `Database` handle or 503 (DB is optional in this server).
|
||||
fn require_db(state: &AppState) -> ApiResult<&db::Database> {
|
||||
state.db.as_ref().ok_or_else(|| {
|
||||
err(
|
||||
StatusCode::SERVICE_UNAVAILABLE,
|
||||
"DATABASE_UNAVAILABLE",
|
||||
"Database not available",
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/// Real client IP for the audit row (trusted-proxy aware, matching the rest of the
|
||||
/// server). Best-effort: a missing/garbled forwarded header just yields the peer.
|
||||
fn audit_ip(state: &AppState, addr: SocketAddr, headers: &HeaderMap) -> IpAddr {
|
||||
crate::utils::ip_extract::client_ip(&addr, headers, &state.trusted_proxies)
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Single-machine removal — DELETE /api/machines/:agent_id[?purge=true]
|
||||
// ============================================================================
|
||||
|
||||
/// DELETE /api/machines/:agent_id
|
||||
///
|
||||
/// Admin-only. Two modes, selected by the `purge` query flag:
|
||||
/// - `?purge=true` → SOFT-DELETE (Task 5): set `connect_machines.deleted_at`,
|
||||
/// drop the machine's live in-memory session via `remove_session`, and write a
|
||||
/// `machine_removed` audit event. The row + its history are retained; the
|
||||
/// machine disappears from `/api/machines` and is not restored on startup.
|
||||
/// - otherwise → the legacy HARD delete (optional uninstall command + history
|
||||
/// export, then a cascading `DELETE`), preserved verbatim for compatibility.
|
||||
pub async fn remove_machine(
|
||||
AdminUser(admin): AdminUser,
|
||||
State(state): State<AppState>,
|
||||
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
||||
headers: HeaderMap,
|
||||
Path(agent_id): Path<String>,
|
||||
Query(params): Query<DeleteMachineParams>,
|
||||
) -> ApiResult<Json<DeleteMachineResponse>> {
|
||||
// `agent_id` is a UUID minted by the agent (`generate_agent_id()`); validate the
|
||||
// shape before touching the DB so a malformed path is a clean 400, not a lookup.
|
||||
if Uuid::parse_str(&agent_id).is_err() {
|
||||
return Err(err(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"INVALID_AGENT_ID",
|
||||
"agent_id must be a valid UUID",
|
||||
));
|
||||
}
|
||||
|
||||
let db = require_db(&state)?;
|
||||
|
||||
// The machine must currently exist and be live (the get filters soft-deleted),
|
||||
// so a re-purge or an unknown id is a clean 404 rather than a silent success.
|
||||
let machine = db::machines::get_machine_by_agent_id(db.pool(), &agent_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("DB error loading machine for removal: {}", e);
|
||||
err(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"INTERNAL_ERROR",
|
||||
"Internal server error",
|
||||
)
|
||||
})?
|
||||
.ok_or_else(|| {
|
||||
err(
|
||||
StatusCode::NOT_FOUND,
|
||||
"MACHINE_NOT_FOUND",
|
||||
"Machine not found",
|
||||
)
|
||||
})?;
|
||||
|
||||
if params.purge {
|
||||
return purge_machine(
|
||||
&state,
|
||||
db,
|
||||
&admin,
|
||||
machine,
|
||||
audit_ip(&state, addr, &headers),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// -------- Legacy hard-delete path (unchanged behavior) --------
|
||||
let history = if params.export {
|
||||
let sessions = db::sessions::get_sessions_for_machine(db.pool(), machine.id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("DB error exporting sessions: {}", e);
|
||||
err(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"INTERNAL_ERROR",
|
||||
"Internal server error",
|
||||
)
|
||||
})?;
|
||||
let events = db::events::get_events_for_machine(db.pool(), machine.id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("DB error exporting events: {}", e);
|
||||
err(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"INTERNAL_ERROR",
|
||||
"Internal server error",
|
||||
)
|
||||
})?;
|
||||
Some(MachineHistory {
|
||||
machine: super::MachineInfo::from(machine.clone()),
|
||||
sessions: sessions
|
||||
.into_iter()
|
||||
.map(super::SessionRecord::from)
|
||||
.collect(),
|
||||
events: events.into_iter().map(super::EventRecord::from).collect(),
|
||||
exported_at: chrono::Utc::now().to_rfc3339(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let mut uninstall_sent = false;
|
||||
if params.uninstall {
|
||||
if let Some(session) = state.sessions.get_session_by_agent(&agent_id).await {
|
||||
if session.is_online {
|
||||
uninstall_sent = state
|
||||
.sessions
|
||||
.send_admin_command(
|
||||
session.id,
|
||||
proto::AdminCommandType::AdminUninstall,
|
||||
"Deleted by administrator",
|
||||
)
|
||||
.await;
|
||||
if uninstall_sent {
|
||||
tracing::info!("Sent uninstall command to agent {}", agent_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
state.sessions.remove_agent(&agent_id).await;
|
||||
|
||||
db::machines::delete_machine(db.pool(), &agent_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("DB error hard-deleting machine: {}", e);
|
||||
err(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"INTERNAL_ERROR",
|
||||
"Failed to delete machine",
|
||||
)
|
||||
})?;
|
||||
|
||||
tracing::info!(
|
||||
"Admin {} hard-deleted machine {} (uninstall_sent: {})",
|
||||
admin.username,
|
||||
agent_id,
|
||||
uninstall_sent
|
||||
);
|
||||
|
||||
Ok(Json(DeleteMachineResponse {
|
||||
success: true,
|
||||
message: format!("Machine {} deleted", machine.hostname),
|
||||
uninstall_sent,
|
||||
history,
|
||||
}))
|
||||
}
|
||||
|
||||
/// Soft-delete + in-memory removal + audit for one machine (the `?purge=true` path).
|
||||
async fn purge_machine(
|
||||
state: &AppState,
|
||||
db: &db::Database,
|
||||
admin: &crate::auth::AuthenticatedUser,
|
||||
machine: db::machines::Machine,
|
||||
ip: IpAddr,
|
||||
) -> ApiResult<Json<DeleteMachineResponse>> {
|
||||
// 1. Soft-delete the DB row. 0 rows means it was purged between the load and
|
||||
// here (a race) — treat as already-removed success, idempotent.
|
||||
let affected = db::machines::soft_delete_machine(db.pool(), &machine.agent_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("DB error soft-deleting machine: {}", e);
|
||||
err(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"INTERNAL_ERROR",
|
||||
"Failed to remove machine",
|
||||
)
|
||||
})?;
|
||||
|
||||
// 2. Drop the live in-memory session (agents + machine_uids indexes) so the
|
||||
// purged machine cannot keep streaming or be reattached. `remove_agent`
|
||||
// resolves agent_id -> session_id and routes through `remove_session`.
|
||||
let removed_session = state.sessions.remove_agent(&machine.agent_id).await;
|
||||
|
||||
// 3. Audit. Anchor to the machine's last session when known so the event links
|
||||
// to that machine's history; the detail JSON independently carries the ids.
|
||||
let details = serde_json::json!({
|
||||
"target_kind": "machine",
|
||||
"agent_id": machine.agent_id,
|
||||
"machine_id": machine.id,
|
||||
"hostname": machine.hostname,
|
||||
"machine_uid": machine.machine_uid,
|
||||
"purge": true,
|
||||
"soft_deleted": affected > 0,
|
||||
"in_memory_session_removed": removed_session.map(|s| s.to_string()),
|
||||
});
|
||||
if let Err(e) = db::events::log_admin_removal(
|
||||
db.pool(),
|
||||
machine.last_session_id,
|
||||
EventTypes::MACHINE_REMOVED,
|
||||
&admin.user_id,
|
||||
&admin.username,
|
||||
details,
|
||||
Some(ip),
|
||||
)
|
||||
.await
|
||||
{
|
||||
// Audit is best-effort: the purge already happened. Log loudly; do not fail
|
||||
// the request (mirrors how the relay treats audit-write failures).
|
||||
tracing::error!("Failed to write machine_removed audit event: {}", e);
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"Admin {} purged machine {} (agent_id {}, soft_deleted={}, session_removed={})",
|
||||
admin.username,
|
||||
machine.hostname,
|
||||
machine.agent_id,
|
||||
affected > 0,
|
||||
removed_session.is_some()
|
||||
);
|
||||
|
||||
Ok(Json(DeleteMachineResponse {
|
||||
success: true,
|
||||
message: format!("Machine {} removed", machine.hostname),
|
||||
uninstall_sent: false,
|
||||
history: None,
|
||||
}))
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Single-session removal — DELETE /api/sessions/:id[?purge=true]
|
||||
// ============================================================================
|
||||
|
||||
/// Query flag for the session removal route.
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct PurgeParams {
|
||||
/// When true, soft-delete the session row + remove it in-memory + audit. When
|
||||
/// false/absent, fall back to the live-only disconnect (unchanged behavior).
|
||||
#[serde(default)]
|
||||
pub purge: bool,
|
||||
}
|
||||
|
||||
/// Result body for a session removal.
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct RemoveSessionResponse {
|
||||
pub success: bool,
|
||||
pub message: String,
|
||||
/// Whether the row was soft-deleted in the DB (false if there was no DB row,
|
||||
/// e.g. a live support session that never persisted, or it was already purged).
|
||||
pub soft_deleted: bool,
|
||||
}
|
||||
|
||||
/// DELETE /api/sessions/:id?purge=true
|
||||
///
|
||||
/// Admin-only soft-delete of a session: set `connect_sessions.deleted_at`, drop the
|
||||
/// live in-memory session via `remove_session`, and write a `session_removed` audit
|
||||
/// event. WITHOUT `?purge=true` this delegates to the live-only disconnect so the
|
||||
/// pre-existing semantics are preserved.
|
||||
pub async fn remove_session(
|
||||
AdminUser(admin): AdminUser,
|
||||
State(state): State<AppState>,
|
||||
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
||||
headers: HeaderMap,
|
||||
Path(id): Path<String>,
|
||||
Query(params): Query<PurgeParams>,
|
||||
) -> ApiResult<Json<RemoveSessionResponse>> {
|
||||
let session_id = Uuid::parse_str(&id).map_err(|_| {
|
||||
err(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"INVALID_SESSION_ID",
|
||||
"session id must be a valid UUID",
|
||||
)
|
||||
})?;
|
||||
|
||||
if !params.purge {
|
||||
// Live-only disconnect (unchanged): send a Disconnect to the agent. 404 if
|
||||
// the session is not live in memory.
|
||||
let disconnected = state
|
||||
.sessions
|
||||
.disconnect_session(session_id, "Disconnected by administrator")
|
||||
.await;
|
||||
if disconnected {
|
||||
tracing::info!(
|
||||
"Admin {} disconnected session {} (live-only)",
|
||||
admin.username,
|
||||
session_id
|
||||
);
|
||||
return Ok(Json(RemoveSessionResponse {
|
||||
success: true,
|
||||
message: "Session disconnected".to_string(),
|
||||
soft_deleted: false,
|
||||
}));
|
||||
}
|
||||
return Err(err(
|
||||
StatusCode::NOT_FOUND,
|
||||
"SESSION_NOT_FOUND",
|
||||
"Session not found",
|
||||
));
|
||||
}
|
||||
|
||||
let db = require_db(&state)?;
|
||||
|
||||
// A session may be live in memory, persisted in the DB, or both. Soft-delete the
|
||||
// DB row (no-op if there is none) and drop any in-memory session. The purge is a
|
||||
// 404 only when NEITHER exists.
|
||||
let in_memory = state.sessions.get_session(session_id).await.is_some();
|
||||
let soft_deleted = db::sessions::soft_delete_session(db.pool(), session_id)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
tracing::error!("DB error soft-deleting session: {}", e);
|
||||
err(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"INTERNAL_ERROR",
|
||||
"Failed to remove session",
|
||||
)
|
||||
})?
|
||||
> 0;
|
||||
|
||||
if !in_memory && !soft_deleted {
|
||||
return Err(err(
|
||||
StatusCode::NOT_FOUND,
|
||||
"SESSION_NOT_FOUND",
|
||||
"Session not found",
|
||||
));
|
||||
}
|
||||
|
||||
state.sessions.remove_session(session_id).await;
|
||||
|
||||
let details = serde_json::json!({
|
||||
"target_kind": "session",
|
||||
"session_id": session_id,
|
||||
"purge": true,
|
||||
"soft_deleted": soft_deleted,
|
||||
"was_live": in_memory,
|
||||
});
|
||||
if let Err(e) = db::events::log_admin_removal(
|
||||
db.pool(),
|
||||
Some(session_id),
|
||||
EventTypes::SESSION_REMOVED,
|
||||
&admin.user_id,
|
||||
&admin.username,
|
||||
details,
|
||||
Some(audit_ip(&state, addr, &headers)),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to write session_removed audit event: {}", e);
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"Admin {} purged session {} (soft_deleted={}, was_live={})",
|
||||
admin.username,
|
||||
session_id,
|
||||
soft_deleted,
|
||||
in_memory
|
||||
);
|
||||
|
||||
Ok(Json(RemoveSessionResponse {
|
||||
success: true,
|
||||
message: "Session removed".to_string(),
|
||||
soft_deleted,
|
||||
}))
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Bulk machine removal — POST /api/machines/bulk-remove
|
||||
// ============================================================================
|
||||
|
||||
/// Request body for the bulk machine removal.
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct BulkRemoveRequest {
|
||||
/// `agent_id`s to remove. Each is validated as a UUID; unknown/invalid ids are
|
||||
/// reported per-id rather than failing the whole batch.
|
||||
pub ids: Vec<String>,
|
||||
/// When true, soft-delete + in-memory removal + audit (Task 5). When false, the
|
||||
/// legacy hard delete is applied to each id. Defaults to true: the bulk endpoint
|
||||
/// exists for the purge workflow.
|
||||
#[serde(default = "default_true")]
|
||||
pub purge: bool,
|
||||
}
|
||||
|
||||
fn default_true() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
/// Per-id outcome in a bulk response.
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct BulkRemoveItem {
|
||||
pub agent_id: String,
|
||||
/// `removed` | `not_found` | `invalid` | `error`.
|
||||
pub status: String,
|
||||
}
|
||||
|
||||
/// Summary body for a bulk removal.
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct BulkRemoveResponse {
|
||||
pub requested: usize,
|
||||
pub removed: usize,
|
||||
pub results: Vec<BulkRemoveItem>,
|
||||
}
|
||||
|
||||
/// POST /api/machines/bulk-remove
|
||||
///
|
||||
/// Admin-only. Removes many machines in one call, auditing ONE `machine_removed`
|
||||
/// event per actually-removed id (matching the single-machine granularity). Invalid
|
||||
/// or unknown ids are reported in the per-id results and never abort the batch. With
|
||||
/// `purge=true` (default) each removal is the Task-5 soft-delete; otherwise the
|
||||
/// legacy hard delete.
|
||||
pub async fn bulk_remove_machines(
|
||||
AdminUser(admin): AdminUser,
|
||||
State(state): State<AppState>,
|
||||
ConnectInfo(addr): ConnectInfo<SocketAddr>,
|
||||
headers: HeaderMap,
|
||||
Json(body): Json<BulkRemoveRequest>,
|
||||
) -> ApiResult<Json<BulkRemoveResponse>> {
|
||||
if body.ids.is_empty() {
|
||||
return Err(err(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"EMPTY_ID_LIST",
|
||||
"ids must contain at least one agent_id",
|
||||
));
|
||||
}
|
||||
// Guard against an unbounded batch.
|
||||
const MAX_BATCH: usize = 500;
|
||||
if body.ids.len() > MAX_BATCH {
|
||||
return Err(err(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"BATCH_TOO_LARGE",
|
||||
"ids exceeds the maximum batch size of 500",
|
||||
));
|
||||
}
|
||||
|
||||
let db = require_db(&state)?;
|
||||
let ip = audit_ip(&state, addr, &headers);
|
||||
|
||||
let requested = body.ids.len();
|
||||
let mut results = Vec::with_capacity(requested);
|
||||
let mut removed = 0usize;
|
||||
|
||||
for agent_id in body.ids {
|
||||
// Validate shape first.
|
||||
if Uuid::parse_str(&agent_id).is_err() {
|
||||
results.push(BulkRemoveItem {
|
||||
agent_id,
|
||||
status: "invalid".to_string(),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
// Must exist + be live.
|
||||
let machine = match db::machines::get_machine_by_agent_id(db.pool(), &agent_id).await {
|
||||
Ok(Some(m)) => m,
|
||||
Ok(None) => {
|
||||
results.push(BulkRemoveItem {
|
||||
agent_id,
|
||||
status: "not_found".to_string(),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("DB error loading machine {} in bulk: {}", agent_id, e);
|
||||
results.push(BulkRemoveItem {
|
||||
agent_id,
|
||||
status: "error".to_string(),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if body.purge {
|
||||
match db::machines::soft_delete_machine(db.pool(), &agent_id).await {
|
||||
Ok(affected) => {
|
||||
let removed_session = state.sessions.remove_agent(&agent_id).await;
|
||||
let details = serde_json::json!({
|
||||
"target_kind": "machine",
|
||||
"agent_id": machine.agent_id,
|
||||
"machine_id": machine.id,
|
||||
"hostname": machine.hostname,
|
||||
"machine_uid": machine.machine_uid,
|
||||
"purge": true,
|
||||
"bulk": true,
|
||||
"soft_deleted": affected > 0,
|
||||
"in_memory_session_removed": removed_session.map(|s| s.to_string()),
|
||||
});
|
||||
if let Err(e) = db::events::log_admin_removal(
|
||||
db.pool(),
|
||||
machine.last_session_id,
|
||||
EventTypes::MACHINE_REMOVED,
|
||||
&admin.user_id,
|
||||
&admin.username,
|
||||
details,
|
||||
Some(ip),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to write machine_removed audit event (bulk) for {}: {}",
|
||||
agent_id,
|
||||
e
|
||||
);
|
||||
}
|
||||
removed += 1;
|
||||
results.push(BulkRemoveItem {
|
||||
agent_id,
|
||||
status: "removed".to_string(),
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("DB error soft-deleting machine {} in bulk: {}", agent_id, e);
|
||||
results.push(BulkRemoveItem {
|
||||
agent_id,
|
||||
status: "error".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Legacy hard-delete per id.
|
||||
state.sessions.remove_agent(&agent_id).await;
|
||||
match db::machines::delete_machine(db.pool(), &agent_id).await {
|
||||
Ok(()) => {
|
||||
removed += 1;
|
||||
results.push(BulkRemoveItem {
|
||||
agent_id,
|
||||
status: "removed".to_string(),
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("DB error hard-deleting machine {} in bulk: {}", agent_id, e);
|
||||
results.push(BulkRemoveItem {
|
||||
agent_id,
|
||||
status: "error".to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
"Admin {} bulk-removed {}/{} machine(s) (purge={})",
|
||||
admin.username,
|
||||
removed,
|
||||
requested,
|
||||
body.purge
|
||||
);
|
||||
|
||||
Ok(Json(BulkRemoveResponse {
|
||||
requested,
|
||||
removed,
|
||||
results,
|
||||
}))
|
||||
}
|
||||
@@ -11,7 +11,9 @@ use uuid::Uuid;
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, sqlx::FromRow)]
|
||||
pub struct SessionEvent {
|
||||
pub id: i64,
|
||||
pub session_id: Uuid,
|
||||
/// Nullable: admin-removal audit rows (Task 5) carry no session, so this is
|
||||
/// `None` for those. Session/viewer events always populate it.
|
||||
pub session_id: Option<Uuid>,
|
||||
pub event_type: String,
|
||||
pub timestamp: DateTime<Utc>,
|
||||
pub viewer_id: Option<String>,
|
||||
@@ -58,6 +60,15 @@ impl EventTypes {
|
||||
/// A `ConsentRequest` was sent to the agent for an attended session (the
|
||||
/// prompt is now awaiting the end user's decision).
|
||||
pub const CONSENT_REQUESTED: &'static str = "consent_requested";
|
||||
|
||||
// Operator-removal events (Task 5). Written by the admin purge endpoints. The
|
||||
// acting admin is recorded in `viewer_id`/`viewer_name` (the only actor fields
|
||||
// the audit table carries) and the structured purge detail (target id, purge
|
||||
// flag, in-memory removal result) goes in `details`.
|
||||
/// An administrator soft-deleted (purged) a machine and dropped its live session.
|
||||
pub const MACHINE_REMOVED: &'static str = "machine_removed";
|
||||
/// An administrator soft-deleted (purged) a session and dropped it in-memory.
|
||||
pub const SESSION_REMOVED: &'static str = "session_removed";
|
||||
}
|
||||
|
||||
/// Log a session event
|
||||
@@ -92,6 +103,57 @@ pub async fn log_event(
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Log an operator-removal audit event (Task 5).
|
||||
///
|
||||
/// Writes to the same `connect_session_events` audit table as [`log_event`], but
|
||||
/// shaped for an admin action rather than a viewer/session event: the acting admin
|
||||
/// is recorded in `viewer_id` (the admin's user id) and `viewer_name` (username) —
|
||||
/// the only actor-bearing columns the table has — and structured detail (target id,
|
||||
/// `purge` flag, in-memory removal result) goes in `details`.
|
||||
///
|
||||
/// `session_id` is the audit anchor. For a session purge it is the purged session.
|
||||
/// For a machine purge it is the machine's `last_session_id` when known (linking the
|
||||
/// event to that machine's history) or `None` (the `session_id` FK column is
|
||||
/// nullable). The FK is `ON DELETE CASCADE`, so anchoring to a session that later
|
||||
/// gets hard-deleted would cascade the audit row away — acceptable here because the
|
||||
/// Task-5 flow soft-deletes (never hard-deletes) and the detail JSON independently
|
||||
/// carries the target id.
|
||||
///
|
||||
/// Best-effort: a failure to write the audit row is logged by the caller and does
|
||||
/// not roll back the purge (the soft-delete + in-memory removal already happened),
|
||||
/// matching how the relay treats audit writes.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn log_admin_removal(
|
||||
pool: &PgPool,
|
||||
session_id: Option<Uuid>,
|
||||
event_type: &str,
|
||||
actor_user_id: &str,
|
||||
actor_username: &str,
|
||||
details: JsonValue,
|
||||
ip_address: Option<IpAddr>,
|
||||
) -> Result<i64, sqlx::Error> {
|
||||
let ip_str = ip_address.map(|ip| ip.to_string());
|
||||
|
||||
let result = sqlx::query_scalar::<_, i64>(
|
||||
r#"
|
||||
INSERT INTO connect_session_events
|
||||
(session_id, event_type, viewer_id, viewer_name, details, ip_address)
|
||||
VALUES ($1, $2, $3, $4, $5, $6::inet)
|
||||
RETURNING id
|
||||
"#,
|
||||
)
|
||||
.bind(session_id)
|
||||
.bind(event_type)
|
||||
.bind(actor_user_id)
|
||||
.bind(actor_username)
|
||||
.bind(details)
|
||||
.bind(ip_str)
|
||||
.fetch_one(pool)
|
||||
.await?;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Get events for a session
|
||||
#[allow(dead_code)] // TODO(native-remote-control): consumed by the integration API; see docs/specs/native-remote-control/
|
||||
pub async fn get_session_events(
|
||||
|
||||
@@ -58,6 +58,12 @@ pub struct Machine {
|
||||
/// machine binding stays authoritative and the claimed uid is NOT used to dedup
|
||||
/// (see `upsert_machine`).
|
||||
pub machine_uid: Option<String>,
|
||||
/// Soft-delete marker (migration 009). When non-null the machine was
|
||||
/// operator-purged (Task 5): it is excluded from every list/get query and is
|
||||
/// never restored by the startup reconcile, but the row (and its audit
|
||||
/// history) is retained. NULL = live. Nullable, so it is read NULL-tolerantly
|
||||
/// in the manual `FromRow` below.
|
||||
pub deleted_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
impl<'r> FromRow<'r, PgRow> for Machine {
|
||||
@@ -75,6 +81,8 @@ impl<'r> FromRow<'r, PgRow> for Machine {
|
||||
site: row.try_get("site")?,
|
||||
// Schema-nullable (migration 008); decode directly as Option.
|
||||
machine_uid: row.try_get("machine_uid")?,
|
||||
// Schema-nullable (migration 009); decode directly as Option.
|
||||
deleted_at: row.try_get("deleted_at")?,
|
||||
// Nullable-with-default columns mapped to non-`Option` Rust types: read as
|
||||
// `Option<T>` and fall back to the type default so a NULL cell never errors.
|
||||
is_elevated: row
|
||||
@@ -129,6 +137,12 @@ impl<'r> FromRow<'r, PgRow> for Machine {
|
||||
/// aid, not a trust boundary. Only the un-keyed path supplies it; the keyed path's
|
||||
/// authority lives in the key→machine binding upstream (see
|
||||
/// `relay::agent_ws_handler`), never here.
|
||||
///
|
||||
/// SOFT-DELETE REVIVE (Task 5): both `ON CONFLICT DO UPDATE` arms clear
|
||||
/// `deleted_at` (set it back to NULL). A machine that was operator-purged but then
|
||||
/// genuinely reconnects is a live machine again, so it must reappear in the console
|
||||
/// rather than stay hidden behind a stale soft-delete marker. A purge of a truly
|
||||
/// gone host is permanent precisely because such a host never upserts again.
|
||||
pub async fn upsert_machine(
|
||||
pool: &PgPool,
|
||||
agent_id: &str,
|
||||
@@ -156,7 +170,8 @@ pub async fn upsert_machine(
|
||||
agent_id = EXCLUDED.agent_id,
|
||||
hostname = EXCLUDED.hostname,
|
||||
status = 'online',
|
||||
last_seen = NOW()
|
||||
last_seen = NOW(),
|
||||
deleted_at = NULL
|
||||
RETURNING *
|
||||
"#,
|
||||
)
|
||||
@@ -178,7 +193,8 @@ pub async fn upsert_machine(
|
||||
ON CONFLICT (agent_id) DO UPDATE SET
|
||||
hostname = EXCLUDED.hostname,
|
||||
status = 'online',
|
||||
last_seen = NOW()
|
||||
last_seen = NOW(),
|
||||
deleted_at = NULL
|
||||
RETURNING *
|
||||
"#,
|
||||
)
|
||||
@@ -222,24 +238,31 @@ pub async fn update_machine_status(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Get all persistent machines (for restore on startup)
|
||||
/// Get all persistent machines (for the dashboard list AND the startup restore).
|
||||
///
|
||||
/// Excludes operator-purged rows (`deleted_at IS NOT NULL`, migration 009 / Task 5):
|
||||
/// a soft-deleted machine must not reappear in `/api/machines` and must not be
|
||||
/// re-restored into the in-memory session manager on startup. This is the filter
|
||||
/// that makes the ghost-row purge stick.
|
||||
pub async fn get_all_machines(pool: &PgPool) -> Result<Vec<Machine>, sqlx::Error> {
|
||||
sqlx::query_as::<_, Machine>(
|
||||
"SELECT * FROM connect_machines WHERE is_persistent = true ORDER BY hostname",
|
||||
"SELECT * FROM connect_machines WHERE is_persistent = true AND deleted_at IS NULL ORDER BY hostname",
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get machine by agent_id
|
||||
/// Get machine by agent_id (live rows only — excludes soft-deleted, Task 5).
|
||||
pub async fn get_machine_by_agent_id(
|
||||
pool: &PgPool,
|
||||
agent_id: &str,
|
||||
) -> Result<Option<Machine>, sqlx::Error> {
|
||||
sqlx::query_as::<_, Machine>("SELECT * FROM connect_machines WHERE agent_id = $1")
|
||||
.bind(agent_id)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
sqlx::query_as::<_, Machine>(
|
||||
"SELECT * FROM connect_machines WHERE agent_id = $1 AND deleted_at IS NULL",
|
||||
)
|
||||
.bind(agent_id)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get machine by its primary-key UUID (`connect_machines.id`).
|
||||
@@ -248,6 +271,13 @@ pub async fn get_machine_by_agent_id(
|
||||
/// `verify_agent_key` back to its canonical `agent_id`, so persistent reattach
|
||||
/// binds to the authenticated identity rather than a client-supplied query
|
||||
/// param (Task 3 identity binding).
|
||||
///
|
||||
/// NOTE (Task 5): this deliberately does NOT filter `deleted_at IS NULL`. It is
|
||||
/// the authenticated-identity resolver for a `cak_`-keyed agent's reattach, not a
|
||||
/// dashboard read. If a previously operator-purged machine genuinely reconnects
|
||||
/// with a valid key, it must resolve so `upsert_machine` can revive it (the
|
||||
/// upsert clears `deleted_at`). The dashboard get-by-id path is
|
||||
/// `get_machine_by_agent_id`, which IS filtered.
|
||||
pub async fn get_machine_by_id(
|
||||
pool: &PgPool,
|
||||
machine_id: Uuid,
|
||||
@@ -269,7 +299,11 @@ pub async fn mark_machine_offline(pool: &PgPool, agent_id: &str) -> Result<(), s
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete a machine record
|
||||
/// Hard-delete a machine record (legacy path, retained for backward compatibility).
|
||||
///
|
||||
/// Cascades to `connect_sessions` / `connect_session_events` via the FKs, so it
|
||||
/// also destroys audit history. The Task-5 operator-removal flow prefers
|
||||
/// [`soft_delete_machine`] instead, which keeps the row for the audit trail.
|
||||
pub async fn delete_machine(pool: &PgPool, agent_id: &str) -> Result<(), sqlx::Error> {
|
||||
sqlx::query("DELETE FROM connect_machines WHERE agent_id = $1")
|
||||
.bind(agent_id)
|
||||
@@ -278,6 +312,24 @@ pub async fn delete_machine(pool: &PgPool, agent_id: &str) -> Result<(), sqlx::E
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Soft-delete (operator purge) a single machine by `agent_id` (Task 5).
|
||||
///
|
||||
/// Sets `deleted_at = NOW()` so the row is excluded from every list/get query and
|
||||
/// the startup reconcile, while retaining the row and its `connect_session_events`
|
||||
/// history for the audit trail. Only flips rows that are still live
|
||||
/// (`deleted_at IS NULL`), so a re-purge is a no-op rather than overwriting the
|
||||
/// original removal instant. Returns the number of rows affected (0 = unknown or
|
||||
/// already-purged `agent_id`), letting the caller distinguish a 404 from a success.
|
||||
pub async fn soft_delete_machine(pool: &PgPool, agent_id: &str) -> Result<u64, sqlx::Error> {
|
||||
let result = sqlx::query(
|
||||
"UPDATE connect_machines SET deleted_at = NOW() WHERE agent_id = $1 AND deleted_at IS NULL",
|
||||
)
|
||||
.bind(agent_id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(result.rows_affected())
|
||||
}
|
||||
|
||||
/// Update machine organization, site, and tags
|
||||
pub async fn update_machine_metadata(
|
||||
pool: &PgPool,
|
||||
@@ -441,4 +493,157 @@ mod tests {
|
||||
|
||||
cleanup(&pool, &["null-1", "null-2"], &[]).await;
|
||||
}
|
||||
|
||||
/// Helper: does `get_all_machines` (the dashboard list / startup restore query)
|
||||
/// currently return a row with this agent_id?
|
||||
async fn list_contains(pool: &PgPool, agent_id: &str) -> bool {
|
||||
get_all_machines(pool)
|
||||
.await
|
||||
.expect("list machines")
|
||||
.iter()
|
||||
.any(|m| m.agent_id == agent_id)
|
||||
}
|
||||
|
||||
/// Task 5: soft-deleting a machine sets `deleted_at` and excludes it from BOTH
|
||||
/// the list query and the by-agent_id get — the core operator-removal guarantee
|
||||
/// (a purged ghost row must not reappear in /api/machines).
|
||||
#[tokio::test]
|
||||
async fn soft_delete_machine_hides_from_list_and_get() {
|
||||
let Some(pool) = test_pool().await else {
|
||||
return; // no TEST_DATABASE_URL: skip (runs in CI)
|
||||
};
|
||||
let agent = "test-softdel-agent-001";
|
||||
cleanup(&pool, &[agent], &[]).await;
|
||||
|
||||
let m = upsert_machine(&pool, agent, "SOFTDEL-HOST", true, None)
|
||||
.await
|
||||
.expect("create machine");
|
||||
assert!(m.deleted_at.is_none(), "fresh row must be live");
|
||||
assert!(
|
||||
list_contains(&pool, agent).await,
|
||||
"live machine must be listed"
|
||||
);
|
||||
assert!(
|
||||
get_machine_by_agent_id(&pool, agent)
|
||||
.await
|
||||
.expect("get")
|
||||
.is_some(),
|
||||
"live machine must be gettable"
|
||||
);
|
||||
|
||||
// Soft-delete.
|
||||
let affected = soft_delete_machine(&pool, agent)
|
||||
.await
|
||||
.expect("soft delete");
|
||||
assert_eq!(affected, 1, "exactly one live row flips to deleted");
|
||||
|
||||
// Excluded from list and get.
|
||||
assert!(
|
||||
!list_contains(&pool, agent).await,
|
||||
"soft-deleted machine must NOT be listed"
|
||||
);
|
||||
assert!(
|
||||
get_machine_by_agent_id(&pool, agent)
|
||||
.await
|
||||
.expect("get after delete")
|
||||
.is_none(),
|
||||
"soft-deleted machine must NOT be gettable by agent_id"
|
||||
);
|
||||
|
||||
// The row still exists with a non-null deleted_at (history retained).
|
||||
let deleted_at: Option<DateTime<Utc>> =
|
||||
sqlx::query_scalar("SELECT deleted_at FROM connect_machines WHERE agent_id = $1")
|
||||
.bind(agent)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.expect("row still present");
|
||||
assert!(
|
||||
deleted_at.is_some(),
|
||||
"row must be retained with deleted_at set"
|
||||
);
|
||||
|
||||
// Re-purge is a no-op (does not overwrite the original instant).
|
||||
let again = soft_delete_machine(&pool, agent)
|
||||
.await
|
||||
.expect("re-soft-delete");
|
||||
assert_eq!(
|
||||
again, 0,
|
||||
"re-purge of an already-deleted row affects 0 rows"
|
||||
);
|
||||
|
||||
cleanup(&pool, &[agent], &[]).await;
|
||||
}
|
||||
|
||||
/// Task 5: a genuine reconnect (upsert) of a previously soft-deleted machine
|
||||
/// REVIVES it — `deleted_at` is cleared so it reappears in the console. A purge
|
||||
/// only sticks for a host that never upserts again.
|
||||
#[tokio::test]
|
||||
async fn upsert_revives_soft_deleted_machine() {
|
||||
let Some(pool) = test_pool().await else {
|
||||
return; // no TEST_DATABASE_URL: skip (runs in CI)
|
||||
};
|
||||
let agent = "test-revive-agent-001";
|
||||
cleanup(&pool, &[agent], &[]).await;
|
||||
|
||||
upsert_machine(&pool, agent, "REVIVE-HOST", true, None)
|
||||
.await
|
||||
.expect("create");
|
||||
soft_delete_machine(&pool, agent)
|
||||
.await
|
||||
.expect("soft delete");
|
||||
assert!(!list_contains(&pool, agent).await, "purged: hidden");
|
||||
|
||||
// Reconnect.
|
||||
let revived = upsert_machine(&pool, agent, "REVIVE-HOST", true, None)
|
||||
.await
|
||||
.expect("reconnect upsert");
|
||||
assert!(
|
||||
revived.deleted_at.is_none(),
|
||||
"reconnect must clear deleted_at"
|
||||
);
|
||||
assert!(
|
||||
list_contains(&pool, agent).await,
|
||||
"revived machine must be listed again"
|
||||
);
|
||||
|
||||
cleanup(&pool, &[agent], &[]).await;
|
||||
}
|
||||
|
||||
/// Task 5: bulk soft-delete (as the bulk endpoint does, one id at a time)
|
||||
/// removes every listed id from the live list.
|
||||
#[tokio::test]
|
||||
async fn bulk_soft_delete_hides_all_listed() {
|
||||
let Some(pool) = test_pool().await else {
|
||||
return; // no TEST_DATABASE_URL: skip (runs in CI)
|
||||
};
|
||||
let agents = ["test-bulk-a", "test-bulk-b", "test-bulk-c"];
|
||||
cleanup(&pool, &agents, &[]).await;
|
||||
|
||||
for (i, a) in agents.iter().enumerate() {
|
||||
upsert_machine(&pool, a, &format!("BULK-HOST-{i}"), true, None)
|
||||
.await
|
||||
.expect("create bulk machine");
|
||||
}
|
||||
for a in &agents {
|
||||
assert!(list_contains(&pool, a).await, "{a} listed before bulk");
|
||||
}
|
||||
|
||||
// Purge all three (the bulk endpoint loops soft_delete_machine per id).
|
||||
let mut removed = 0u64;
|
||||
for a in &agents {
|
||||
removed += soft_delete_machine(&pool, a)
|
||||
.await
|
||||
.expect("bulk soft delete");
|
||||
}
|
||||
assert_eq!(removed, 3, "all three live rows flipped to deleted");
|
||||
|
||||
for a in &agents {
|
||||
assert!(
|
||||
!list_contains(&pool, a).await,
|
||||
"{a} must be hidden after bulk purge"
|
||||
);
|
||||
}
|
||||
|
||||
cleanup(&pool, &agents, &[]).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -170,6 +170,7 @@ pub async fn get_machines_needing_update(
|
||||
SELECT agent_id FROM connect_machines
|
||||
WHERE status = 'online'
|
||||
AND is_persistent = true
|
||||
AND deleted_at IS NULL
|
||||
AND (agent_version IS NULL OR agent_version < $1)
|
||||
"#,
|
||||
)
|
||||
|
||||
@@ -25,6 +25,10 @@ pub struct DbSession {
|
||||
/// Attended-consent state: 'not_required' | 'pending' | 'granted' | 'denied'.
|
||||
/// Enforcement is Task 5; this column carries the state only.
|
||||
pub consent_state: String,
|
||||
/// Soft-delete marker (migration 009). When non-null the session was
|
||||
/// operator-purged (Task 5) and is excluded from every list/get query, while
|
||||
/// the row (and its audit history) is retained. NULL = live.
|
||||
pub deleted_at: Option<DateTime<Utc>>,
|
||||
}
|
||||
|
||||
/// Create a new session record
|
||||
@@ -120,10 +124,12 @@ pub async fn get_session(
|
||||
pool: &PgPool,
|
||||
session_id: Uuid,
|
||||
) -> Result<Option<DbSession>, sqlx::Error> {
|
||||
sqlx::query_as::<_, DbSession>("SELECT * FROM connect_sessions WHERE id = $1")
|
||||
.bind(session_id)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
sqlx::query_as::<_, DbSession>(
|
||||
"SELECT * FROM connect_sessions WHERE id = $1 AND deleted_at IS NULL",
|
||||
)
|
||||
.bind(session_id)
|
||||
.fetch_optional(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Get active sessions for a machine
|
||||
@@ -133,7 +139,7 @@ pub async fn get_active_sessions_for_machine(
|
||||
machine_id: Uuid,
|
||||
) -> Result<Vec<DbSession>, sqlx::Error> {
|
||||
sqlx::query_as::<_, DbSession>(
|
||||
"SELECT * FROM connect_sessions WHERE machine_id = $1 AND status = 'active' ORDER BY started_at DESC"
|
||||
"SELECT * FROM connect_sessions WHERE machine_id = $1 AND status = 'active' AND deleted_at IS NULL ORDER BY started_at DESC"
|
||||
)
|
||||
.bind(machine_id)
|
||||
.fetch_all(pool)
|
||||
@@ -144,7 +150,7 @@ pub async fn get_active_sessions_for_machine(
|
||||
#[allow(dead_code)] // TODO(native-remote-control): consumed by the integration API; see docs/specs/native-remote-control/
|
||||
pub async fn get_recent_sessions(pool: &PgPool, limit: i64) -> Result<Vec<DbSession>, sqlx::Error> {
|
||||
sqlx::query_as::<_, DbSession>(
|
||||
"SELECT * FROM connect_sessions ORDER BY started_at DESC LIMIT $1",
|
||||
"SELECT * FROM connect_sessions WHERE deleted_at IS NULL ORDER BY started_at DESC LIMIT $1",
|
||||
)
|
||||
.bind(limit)
|
||||
.fetch_all(pool)
|
||||
@@ -157,9 +163,140 @@ pub async fn get_sessions_for_machine(
|
||||
machine_id: Uuid,
|
||||
) -> Result<Vec<DbSession>, sqlx::Error> {
|
||||
sqlx::query_as::<_, DbSession>(
|
||||
"SELECT * FROM connect_sessions WHERE machine_id = $1 ORDER BY started_at DESC",
|
||||
"SELECT * FROM connect_sessions WHERE machine_id = $1 AND deleted_at IS NULL ORDER BY started_at DESC",
|
||||
)
|
||||
.bind(machine_id)
|
||||
.fetch_all(pool)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Soft-delete (operator purge) a single session by id (Task 5).
|
||||
///
|
||||
/// Sets `deleted_at = NOW()` so the row is excluded from every list/get query
|
||||
/// while the row and its `connect_session_events` history are retained for the
|
||||
/// audit trail. Only flips rows that are still live (`deleted_at IS NULL`) so a
|
||||
/// re-purge does not overwrite the original removal instant. Returns the number
|
||||
/// of rows affected (0 = unknown or already-purged id) so the caller can return a
|
||||
/// clean 404 vs. success.
|
||||
pub async fn soft_delete_session(pool: &PgPool, session_id: Uuid) -> Result<u64, sqlx::Error> {
|
||||
let result = sqlx::query(
|
||||
"UPDATE connect_sessions SET deleted_at = NOW() WHERE id = $1 AND deleted_at IS NULL",
|
||||
)
|
||||
.bind(session_id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(result.rows_affected())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::db::machines;
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
|
||||
/// Connect to a throwaway test Postgres and apply migrations, or return `None`
|
||||
/// when `TEST_DATABASE_URL` is unset (self-skip on workstations; runs in CI).
|
||||
async fn test_pool() -> Option<PgPool> {
|
||||
let url = std::env::var("TEST_DATABASE_URL").ok()?;
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(2)
|
||||
.connect(&url)
|
||||
.await
|
||||
.expect("connect to TEST_DATABASE_URL");
|
||||
sqlx::migrate!("./migrations")
|
||||
.run(&pool)
|
||||
.await
|
||||
.expect("apply migrations to the test database");
|
||||
Some(pool)
|
||||
}
|
||||
|
||||
/// Create a machine to satisfy the `connect_sessions.machine_id` FK; returns its id.
|
||||
async fn seed_machine(pool: &PgPool, agent_id: &str) -> Uuid {
|
||||
let _ = sqlx::query("DELETE FROM connect_machines WHERE agent_id = $1")
|
||||
.bind(agent_id)
|
||||
.execute(pool)
|
||||
.await;
|
||||
machines::upsert_machine(pool, agent_id, "SESS-TEST-HOST", true, None)
|
||||
.await
|
||||
.expect("seed machine")
|
||||
.id
|
||||
}
|
||||
|
||||
/// Task 5: soft-deleting a session sets `deleted_at` and excludes it from get and
|
||||
/// the machine/recent list queries, while the row is retained.
|
||||
#[tokio::test]
|
||||
async fn soft_delete_session_hides_from_get_and_lists() {
|
||||
let Some(pool) = test_pool().await else {
|
||||
return; // no TEST_DATABASE_URL: skip (runs in CI)
|
||||
};
|
||||
let agent = "test-sess-softdel-agent";
|
||||
let machine_id = seed_machine(&pool, agent).await;
|
||||
let session_id = Uuid::new_v4();
|
||||
|
||||
let s = create_session(&pool, session_id, machine_id, false, None)
|
||||
.await
|
||||
.expect("create session");
|
||||
assert!(s.deleted_at.is_none(), "fresh session must be live");
|
||||
|
||||
// Visible before deletion.
|
||||
assert!(
|
||||
get_session(&pool, session_id).await.expect("get").is_some(),
|
||||
"live session must be gettable"
|
||||
);
|
||||
assert!(
|
||||
get_sessions_for_machine(&pool, machine_id)
|
||||
.await
|
||||
.expect("list for machine")
|
||||
.iter()
|
||||
.any(|x| x.id == session_id),
|
||||
"live session must be in the machine list"
|
||||
);
|
||||
|
||||
// Soft-delete.
|
||||
let affected = soft_delete_session(&pool, session_id)
|
||||
.await
|
||||
.expect("soft delete session");
|
||||
assert_eq!(affected, 1, "exactly one live session flips to deleted");
|
||||
|
||||
// Excluded from get and lists.
|
||||
assert!(
|
||||
get_session(&pool, session_id)
|
||||
.await
|
||||
.expect("get after delete")
|
||||
.is_none(),
|
||||
"soft-deleted session must NOT be gettable"
|
||||
);
|
||||
assert!(
|
||||
!get_sessions_for_machine(&pool, machine_id)
|
||||
.await
|
||||
.expect("list after delete")
|
||||
.iter()
|
||||
.any(|x| x.id == session_id),
|
||||
"soft-deleted session must NOT be in the machine list"
|
||||
);
|
||||
|
||||
// Row retained with deleted_at set.
|
||||
let deleted_at: Option<DateTime<Utc>> =
|
||||
sqlx::query_scalar("SELECT deleted_at FROM connect_sessions WHERE id = $1")
|
||||
.bind(session_id)
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.expect("session row still present");
|
||||
assert!(deleted_at.is_some(), "session row retained with deleted_at");
|
||||
|
||||
// Re-purge is a no-op.
|
||||
let again = soft_delete_session(&pool, session_id)
|
||||
.await
|
||||
.expect("re-soft-delete");
|
||||
assert_eq!(
|
||||
again, 0,
|
||||
"re-purge of an already-deleted session affects 0 rows"
|
||||
);
|
||||
|
||||
// Cleanup (cascade from the machine removes the session row too).
|
||||
let _ = sqlx::query("DELETE FROM connect_machines WHERE id = $1")
|
||||
.bind(machine_id)
|
||||
.execute(&pool)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ pub mod proto {
|
||||
use anyhow::Result;
|
||||
use axum::http::{HeaderValue, Method};
|
||||
use axum::{
|
||||
extract::{ConnectInfo, Json, Path, Query, Request, State},
|
||||
extract::{ConnectInfo, Json, Path, Request, State},
|
||||
http::StatusCode,
|
||||
middleware::{self as axum_middleware, Next},
|
||||
response::IntoResponse,
|
||||
@@ -454,7 +454,9 @@ async fn main() -> Result<()> {
|
||||
// REST API - Sessions
|
||||
.route("/api/sessions", get(list_sessions))
|
||||
.route("/api/sessions/:id", get(get_session))
|
||||
.route("/api/sessions/:id", delete(disconnect_session))
|
||||
// DELETE: live-only disconnect by default; `?purge=true` soft-deletes +
|
||||
// removes in-memory + audits (admin-only). Task 5 (api::removal).
|
||||
.route("/api/sessions/:id", delete(api::removal::remove_session))
|
||||
// Session-scoped viewer-token minting (dashboard JWT; bound to one session)
|
||||
.route(
|
||||
"/api/sessions/:id/viewer-token",
|
||||
@@ -462,8 +464,20 @@ async fn main() -> Result<()> {
|
||||
)
|
||||
// REST API - Machines
|
||||
.route("/api/machines", get(list_machines))
|
||||
// Bulk operator removal (admin-only). Registered before the `:agent_id`
|
||||
// routes; matchit (axum 0.7) prefers the static `bulk-remove` segment over
|
||||
// the `:agent_id` capture, so it never shadows a real agent_id. Task 5.
|
||||
.route(
|
||||
"/api/machines/bulk-remove",
|
||||
post(api::removal::bulk_remove_machines),
|
||||
)
|
||||
.route("/api/machines/:agent_id", get(get_machine))
|
||||
.route("/api/machines/:agent_id", delete(delete_machine))
|
||||
// DELETE: legacy hard-delete by default; `?purge=true` soft-deletes +
|
||||
// removes in-memory + audits (admin-only). Task 5 (api::removal).
|
||||
.route(
|
||||
"/api/machines/:agent_id",
|
||||
delete(api::removal::remove_machine),
|
||||
)
|
||||
.route("/api/machines/:agent_id/history", get(get_machine_history))
|
||||
.route(
|
||||
"/api/machines/:agent_id/update",
|
||||
@@ -740,28 +754,6 @@ async fn get_session(
|
||||
Ok(Json(api::SessionInfo::from(session)))
|
||||
}
|
||||
|
||||
async fn disconnect_session(
|
||||
_user: AuthenticatedUser, // Require authentication
|
||||
State(state): State<AppState>,
|
||||
Path(id): Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
let session_id = match uuid::Uuid::parse_str(&id) {
|
||||
Ok(id) => id,
|
||||
Err(_) => return (StatusCode::BAD_REQUEST, "Invalid session ID"),
|
||||
};
|
||||
|
||||
if state
|
||||
.sessions
|
||||
.disconnect_session(session_id, "Disconnected by administrator")
|
||||
.await
|
||||
{
|
||||
info!("Session {} disconnected by admin", session_id);
|
||||
(StatusCode::OK, "Session disconnected")
|
||||
} else {
|
||||
(StatusCode::NOT_FOUND, "Session not found")
|
||||
}
|
||||
}
|
||||
|
||||
// Machine API handlers
|
||||
|
||||
async fn list_machines(
|
||||
@@ -836,89 +828,6 @@ async fn get_machine_history(
|
||||
Ok(Json(history))
|
||||
}
|
||||
|
||||
async fn delete_machine(
|
||||
_user: AuthenticatedUser, // Require authentication
|
||||
State(state): State<AppState>,
|
||||
Path(agent_id): Path<String>,
|
||||
Query(params): Query<api::DeleteMachineParams>,
|
||||
) -> Result<Json<api::DeleteMachineResponse>, (StatusCode, &'static str)> {
|
||||
let db = state
|
||||
.db
|
||||
.as_ref()
|
||||
.ok_or((StatusCode::SERVICE_UNAVAILABLE, "Database not available"))?;
|
||||
|
||||
// Get machine first
|
||||
let machine = db::machines::get_machine_by_agent_id(db.pool(), &agent_id)
|
||||
.await
|
||||
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Database error"))?
|
||||
.ok_or((StatusCode::NOT_FOUND, "Machine not found"))?;
|
||||
|
||||
// Export history if requested
|
||||
let history = if params.export {
|
||||
let sessions = db::sessions::get_sessions_for_machine(db.pool(), machine.id)
|
||||
.await
|
||||
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Database error"))?;
|
||||
let events = db::events::get_events_for_machine(db.pool(), machine.id)
|
||||
.await
|
||||
.map_err(|_| (StatusCode::INTERNAL_SERVER_ERROR, "Database error"))?;
|
||||
|
||||
Some(api::MachineHistory {
|
||||
machine: api::MachineInfo::from(machine.clone()),
|
||||
sessions: sessions.into_iter().map(api::SessionRecord::from).collect(),
|
||||
events: events.into_iter().map(api::EventRecord::from).collect(),
|
||||
exported_at: chrono::Utc::now().to_rfc3339(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Send uninstall command if requested and agent is online
|
||||
let mut uninstall_sent = false;
|
||||
if params.uninstall {
|
||||
// Find session for this agent
|
||||
if let Some(session) = state.sessions.get_session_by_agent(&agent_id).await {
|
||||
if session.is_online {
|
||||
uninstall_sent = state
|
||||
.sessions
|
||||
.send_admin_command(
|
||||
session.id,
|
||||
proto::AdminCommandType::AdminUninstall,
|
||||
"Deleted by administrator",
|
||||
)
|
||||
.await;
|
||||
if uninstall_sent {
|
||||
info!("Sent uninstall command to agent {}", agent_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove from session manager
|
||||
state.sessions.remove_agent(&agent_id).await;
|
||||
|
||||
// Delete from database (cascades to sessions and events)
|
||||
db::machines::delete_machine(db.pool(), &agent_id)
|
||||
.await
|
||||
.map_err(|_| {
|
||||
(
|
||||
StatusCode::INTERNAL_SERVER_ERROR,
|
||||
"Failed to delete machine",
|
||||
)
|
||||
})?;
|
||||
|
||||
info!(
|
||||
"Deleted machine {} (uninstall_sent: {})",
|
||||
agent_id, uninstall_sent
|
||||
);
|
||||
|
||||
Ok(Json(api::DeleteMachineResponse {
|
||||
success: true,
|
||||
message: format!("Machine {} deleted", machine.hostname),
|
||||
uninstall_sent,
|
||||
history,
|
||||
}))
|
||||
}
|
||||
|
||||
// Update trigger request
|
||||
#[derive(Deserialize)]
|
||||
struct TriggerUpdateRequest {
|
||||
|
||||
Reference in New Issue
Block a user