feat(server,agent): v2 secure-session-core Task 5 - attended consent
SPEC-002 Phase 1 Task 5, code-reviewed APPROVED. An attended (support-code) session is invisible and inert to the technician until the end user accepts a consent prompt on their own machine. - proto: ConsentRequest / ConsentResponse + ConsentAccessMode enum (oneof fields 80/81; no existing field renumbered). - server: ConsentState on Session; attended -> Pending, managed -> NotRequired; join_session refuses viewers unless Granted/NotRequired (single chokepoint - StartStream only fires from join_session, so no frames or input flow pre- consent); run_consent_handshake sends ConsentRequest, 60s timeout, granted -> proceed, denied/timeout/disconnect -> teardown (end_session denied, machine offline, support code released). consent_state persisted; consent_requested/ granted/denied audited. - agent: Windows MessageBox (topmost/system-modal) on spawn_blocking; anything but an explicit Yes = deny; non-Windows build is a fail-closed stub. Not cargo-check-verified locally (no toolchain). Server verified on the build host; the Windows agent half is verified by CI build-agent (Pluto). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -11,6 +11,7 @@ use axum::{
|
||||
http::StatusCode,
|
||||
response::IntoResponse,
|
||||
};
|
||||
use futures_util::stream::{SplitSink, SplitStream};
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use prost::Message as ProstMessage;
|
||||
use serde::Deserialize;
|
||||
@@ -49,6 +50,18 @@ const VIEWER_WS_MAX_MESSAGE_BYTES: usize = 64 * 1024;
|
||||
/// buffered unboundedly. (Closes the input-injection MEDIUM.)
|
||||
const VIEWER_INPUT_EVENTS_PER_SEC: u32 = 200;
|
||||
|
||||
/// How long the server waits for the end user to answer the attended-session
|
||||
/// consent prompt before treating it as a DENY and tearing the session down.
|
||||
///
|
||||
/// An attended (support-code) session is held with `consent_state = pending`
|
||||
/// for at most this long: if the agent does not return a `ConsentResponse`
|
||||
/// (user accepted/declined) within the window, the session is denied and torn
|
||||
/// down. The agent is told the same value in `ConsentRequest.timeout_secs` so
|
||||
/// its dialog can auto-deny in lock-step. 60s is long enough for a real person
|
||||
/// to read and decide, short enough that an unattended/abandoned prompt does
|
||||
/// not hold a session open indefinitely.
|
||||
const CONSENT_TIMEOUT_SECS: u64 = 60;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct AgentParams {
|
||||
agent_id: String,
|
||||
@@ -628,6 +641,76 @@ async fn handle_agent_connection(
|
||||
}
|
||||
}
|
||||
|
||||
// ATTENDED-MODE CONSENT GATE (Task 5).
|
||||
//
|
||||
// For an attended (support-code) session, the end user on the managed
|
||||
// machine must SEE and ACCEPT a consent prompt before the technician's
|
||||
// session goes live. The session was registered with `consent_state =
|
||||
// Pending` (in-memory) and the DB row created with `consent_state =
|
||||
// 'pending'`, so `join_session` already refuses any viewer until this
|
||||
// resolves. Here we drive the handshake to completion BEFORE entering the
|
||||
// main relay loop:
|
||||
// 1. Send a `ConsentRequest` to the agent (it shows a native dialog).
|
||||
// 2. Wait up to `CONSENT_TIMEOUT_SECS` for a `ConsentResponse`.
|
||||
// 3. granted -> consent_state = Granted, audit, proceed to the main loop.
|
||||
// denied/timeout/disconnect -> consent_state = Denied, audit, send a
|
||||
// Disconnect to the agent, and tear the session down (return early).
|
||||
//
|
||||
// Managed/unattended sessions (no support code) are `NotRequired`: they skip
|
||||
// this entirely and proceed straight to the relay loop.
|
||||
if support_code.is_some() {
|
||||
let consent_granted = run_consent_handshake(
|
||||
&mut ws_sender,
|
||||
&mut ws_receiver,
|
||||
&sessions,
|
||||
db.as_ref(),
|
||||
session_id,
|
||||
&agent_id,
|
||||
&support_codes,
|
||||
client_ip,
|
||||
)
|
||||
.await;
|
||||
|
||||
if !consent_granted {
|
||||
// Denied / timed out / agent vanished. The session row is already
|
||||
// marked denied + audited inside the handshake. Tear down the live
|
||||
// session and DB record, then return — the technician never sees it.
|
||||
info!(
|
||||
"Attended session {} denied/abandoned at consent; tearing down",
|
||||
session_id
|
||||
);
|
||||
|
||||
sessions.mark_agent_disconnected(session_id).await;
|
||||
|
||||
if let Some(ref db) = db {
|
||||
let _ = db::sessions::end_session(db.pool(), session_id, "denied").await;
|
||||
let _ = db::machines::mark_machine_offline(db.pool(), &agent_id).await;
|
||||
}
|
||||
|
||||
// Best-effort: release the support code so a fresh attempt can start
|
||||
// clean rather than colliding with a half-bound code.
|
||||
if let Some(ref code) = support_code {
|
||||
support_codes.mark_completed(code).await;
|
||||
if let Some(ref db) = db {
|
||||
let _ = db::support_codes::mark_code_completed(db.pool(), code).await;
|
||||
}
|
||||
}
|
||||
|
||||
// Tell the agent why it is being dropped, then close the socket.
|
||||
let disconnect_msg = proto::Message {
|
||||
payload: Some(proto::message::Payload::Disconnect(proto::Disconnect {
|
||||
reason: "Remote support was declined on the managed computer".to_string(),
|
||||
})),
|
||||
};
|
||||
let mut buf = Vec::new();
|
||||
if prost::Message::encode(&disconnect_msg, &mut buf).is_ok() {
|
||||
let _ = ws_sender.send(Message::Binary(buf)).await;
|
||||
}
|
||||
let _ = ws_sender.close().await;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Use Arc<Mutex> for sender so we can use it from multiple places
|
||||
let ws_sender = std::sync::Arc::new(tokio::sync::Mutex::new(ws_sender));
|
||||
let ws_sender_input = ws_sender.clone();
|
||||
@@ -758,6 +841,19 @@ async fn handle_agent_connection(
|
||||
// Agent acknowledged our heartbeat
|
||||
sessions_status.update_heartbeat(session_id).await;
|
||||
}
|
||||
Some(proto::message::Payload::ConsentResponse(_)) => {
|
||||
// The consent handshake (Task 5) runs to
|
||||
// completion BEFORE this loop is entered, so any
|
||||
// ConsentResponse arriving here is a late/dup —
|
||||
// the decision is already final. Acknowledge by
|
||||
// ignoring it (do not re-open the gate), but log
|
||||
// so a stray response is observable.
|
||||
tracing::debug!(
|
||||
"Late ConsentResponse from agent {} on session {}; ignoring \
|
||||
(consent already finalized)",
|
||||
agent_id, session_id
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -826,6 +922,259 @@ async fn handle_agent_connection(
|
||||
info!("Session {} ended", session_id);
|
||||
}
|
||||
|
||||
/// Drive the attended-mode consent handshake for a support-code session
|
||||
/// (Task 5).
|
||||
///
|
||||
/// Sends a [`proto::ConsentRequest`] to the agent (which shows the end user a
|
||||
/// native dialog), then waits up to [`CONSENT_TIMEOUT_SECS`] for a
|
||||
/// [`proto::ConsentResponse`]. Returns `true` iff the end user GRANTED consent;
|
||||
/// `false` for a denial, a timeout, a closed/errored socket, or any other
|
||||
/// outcome. On every terminal outcome it updates the session's `consent_state`
|
||||
/// (in-memory authoritative + best-effort DB mirror) and writes a
|
||||
/// `consent_granted` / `consent_denied` audit event.
|
||||
///
|
||||
/// While waiting, non-consent inbound messages (e.g. heartbeats the agent may
|
||||
/// emit) are tolerated and ignored — only a `ConsentResponse` (or the timeout /
|
||||
/// disconnect) resolves the gate. No secret or code value is logged.
|
||||
#[allow(clippy::too_many_arguments)] // mirrors the relay/session contract; tracked for a params-struct refactor in docs/specs/native-remote-control/
|
||||
async fn run_consent_handshake(
|
||||
ws_sender: &mut SplitSink<WebSocket, Message>,
|
||||
ws_receiver: &mut SplitStream<WebSocket>,
|
||||
sessions: &SessionManager,
|
||||
db: Option<&Database>,
|
||||
session_id: Uuid,
|
||||
agent_id: &str,
|
||||
support_codes: &crate::support_codes::SupportCodeManager,
|
||||
client_ip: Option<std::net::IpAddr>,
|
||||
) -> bool {
|
||||
// Resolve the requesting technician's display name (best-effort) so the
|
||||
// prompt is honest about who is asking. The support code carries the name
|
||||
// of the technician who generated it (`created_by`); if the mapping is not
|
||||
// yet available, fall back to a generic label rather than leaking the
|
||||
// machine's own name. Attended sessions grant CONTROL.
|
||||
let technician_name = match support_codes.get_by_session(session_id).await {
|
||||
Some(code) => code.created_by,
|
||||
None => "A support technician".to_string(),
|
||||
};
|
||||
|
||||
// 1. Send the ConsentRequest to the agent.
|
||||
let request = proto::Message {
|
||||
payload: Some(proto::message::Payload::ConsentRequest(
|
||||
proto::ConsentRequest {
|
||||
session_id: session_id.to_string(),
|
||||
technician_name,
|
||||
// Attended support sessions are full-control by default; the
|
||||
// viewer-token access split (Task 3) still governs what the relay
|
||||
// actually forwards once joined.
|
||||
access_mode: proto::ConsentAccessMode::ConsentControl as i32,
|
||||
timeout_secs: CONSENT_TIMEOUT_SECS as i32,
|
||||
},
|
||||
)),
|
||||
};
|
||||
|
||||
let mut buf = Vec::new();
|
||||
if prost::Message::encode(&request, &mut buf).is_err() {
|
||||
warn!(
|
||||
"Failed to encode ConsentRequest for session {}; denying",
|
||||
session_id
|
||||
);
|
||||
finalize_consent(sessions, db, session_id, agent_id, false, "encode_error", client_ip).await;
|
||||
return false;
|
||||
}
|
||||
if ws_sender.send(Message::Binary(buf)).await.is_err() {
|
||||
warn!(
|
||||
"Failed to send ConsentRequest to agent {} (session {}); denying",
|
||||
agent_id, session_id
|
||||
);
|
||||
finalize_consent(sessions, db, session_id, agent_id, false, "send_error", client_ip).await;
|
||||
return false;
|
||||
}
|
||||
|
||||
info!(
|
||||
"Attended session {}: ConsentRequest sent to agent {}, awaiting end-user decision",
|
||||
session_id, agent_id
|
||||
);
|
||||
|
||||
// Audit that consent was requested (prompt now pending).
|
||||
if let Some(db) = db {
|
||||
let _ = db::events::log_event(
|
||||
db.pool(),
|
||||
session_id,
|
||||
db::events::EventTypes::CONSENT_REQUESTED,
|
||||
None,
|
||||
None,
|
||||
Some(serde_json::json!({ "agent_id": agent_id })),
|
||||
client_ip,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// 2. Wait for the ConsentResponse, bounded by the timeout.
|
||||
let deadline = tokio::time::Instant::now()
|
||||
+ std::time::Duration::from_secs(CONSENT_TIMEOUT_SECS);
|
||||
|
||||
loop {
|
||||
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
|
||||
if remaining.is_zero() {
|
||||
warn!(
|
||||
"Attended session {}: consent timed out after {}s; denying",
|
||||
session_id, CONSENT_TIMEOUT_SECS
|
||||
);
|
||||
finalize_consent(sessions, db, session_id, agent_id, false, "timeout", client_ip).await;
|
||||
return false;
|
||||
}
|
||||
|
||||
match tokio::time::timeout(remaining, ws_receiver.next()).await {
|
||||
// Timed out waiting for the next frame.
|
||||
Err(_) => {
|
||||
warn!(
|
||||
"Attended session {}: consent timed out after {}s; denying",
|
||||
session_id, CONSENT_TIMEOUT_SECS
|
||||
);
|
||||
finalize_consent(sessions, db, session_id, agent_id, false, "timeout", client_ip)
|
||||
.await;
|
||||
return false;
|
||||
}
|
||||
// Socket closed before answering.
|
||||
Ok(None) => {
|
||||
warn!(
|
||||
"Attended session {}: agent {} disconnected before consent; denying",
|
||||
session_id, agent_id
|
||||
);
|
||||
finalize_consent(
|
||||
sessions, db, session_id, agent_id, false, "agent_disconnected", client_ip,
|
||||
)
|
||||
.await;
|
||||
return false;
|
||||
}
|
||||
Ok(Some(Ok(Message::Binary(data)))) => {
|
||||
match proto::Message::decode(data.as_ref()) {
|
||||
Ok(proto_msg) => match proto_msg.payload {
|
||||
Some(proto::message::Payload::ConsentResponse(resp)) => {
|
||||
// Verify the response is for THIS session (an agent
|
||||
// only ever has one pending consent, but be strict).
|
||||
if resp.session_id != session_id.to_string() {
|
||||
warn!(
|
||||
"Attended session {}: ConsentResponse for a different \
|
||||
session ({}); ignoring",
|
||||
session_id, resp.session_id
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if resp.granted {
|
||||
info!("Attended session {}: end user GRANTED consent", session_id);
|
||||
finalize_consent(
|
||||
sessions, db, session_id, agent_id, true, "granted", client_ip,
|
||||
)
|
||||
.await;
|
||||
return true;
|
||||
} else {
|
||||
let reason = if resp.reason.is_empty() {
|
||||
"denied".to_string()
|
||||
} else {
|
||||
resp.reason.clone()
|
||||
};
|
||||
info!(
|
||||
"Attended session {}: end user DENIED consent ({})",
|
||||
session_id, reason
|
||||
);
|
||||
finalize_consent(
|
||||
sessions, db, session_id, agent_id, false, &reason, client_ip,
|
||||
)
|
||||
.await;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// Any other message during the consent window is
|
||||
// tolerated (e.g. heartbeats) — keep waiting.
|
||||
_ => continue,
|
||||
},
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Attended session {}: failed to decode agent message during \
|
||||
consent wait: {}",
|
||||
session_id, e
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Agent-initiated close frame.
|
||||
Ok(Some(Ok(Message::Close(_)))) => {
|
||||
warn!(
|
||||
"Attended session {}: agent {} closed during consent; denying",
|
||||
session_id, agent_id
|
||||
);
|
||||
finalize_consent(
|
||||
sessions, db, session_id, agent_id, false, "agent_closed", client_ip,
|
||||
)
|
||||
.await;
|
||||
return false;
|
||||
}
|
||||
// Ping/pong/text and transient frames: keep waiting.
|
||||
Ok(Some(Ok(_))) => continue,
|
||||
Ok(Some(Err(e))) => {
|
||||
warn!(
|
||||
"Attended session {}: WebSocket error during consent wait: {}; denying",
|
||||
session_id, e
|
||||
);
|
||||
finalize_consent(
|
||||
sessions, db, session_id, agent_id, false, "ws_error", client_ip,
|
||||
)
|
||||
.await;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply a terminal consent decision: update the in-memory consent state
|
||||
/// (authoritative), mirror it to the DB row (best-effort), and audit it.
|
||||
async fn finalize_consent(
|
||||
sessions: &SessionManager,
|
||||
db: Option<&Database>,
|
||||
session_id: Uuid,
|
||||
agent_id: &str,
|
||||
granted: bool,
|
||||
reason: &str,
|
||||
client_ip: Option<std::net::IpAddr>,
|
||||
) {
|
||||
use crate::session::ConsentState;
|
||||
|
||||
let new_state = if granted {
|
||||
ConsentState::Granted
|
||||
} else {
|
||||
ConsentState::Denied
|
||||
};
|
||||
|
||||
// Authoritative in-memory state — this is what `join_session` consults.
|
||||
sessions.set_consent_state(session_id, new_state).await;
|
||||
|
||||
if let Some(db) = db {
|
||||
// Durable mirror of the consent decision on the session row.
|
||||
let _ = db::sessions::update_consent_state(db.pool(), session_id, new_state.as_db_str())
|
||||
.await;
|
||||
|
||||
// Audit event (consent_granted | consent_denied) with the agent id and
|
||||
// the (non-secret) reason so the decision is fully traceable.
|
||||
let event_type = if granted {
|
||||
db::events::EventTypes::CONSENT_GRANTED
|
||||
} else {
|
||||
db::events::EventTypes::CONSENT_DENIED
|
||||
};
|
||||
let _ = db::events::log_event(
|
||||
db.pool(),
|
||||
session_id,
|
||||
event_type,
|
||||
None,
|
||||
None,
|
||||
Some(serde_json::json!({ "agent_id": agent_id, "reason": reason })),
|
||||
client_ip,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a viewer WebSocket connection
|
||||
///
|
||||
/// `access` is the VERIFIED access mode from the viewer token's signed claims.
|
||||
|
||||
Reference in New Issue
Block a user