style: cargo fmt --all — make codebase rustfmt-clean
Some checks failed
Build and Test / Build Server (Linux) (push) Failing after 2m59s
Build and Test / Build Agent (Windows) (push) Has started running
Build and Test / Security Audit (push) Has been cancelled
Build and Test / Build Summary (push) Has been cancelled
Run Tests / Test Server (push) Has been cancelled
Run Tests / Test Agent (push) Has been cancelled
Run Tests / Code Coverage (push) Has been cancelled
Run Tests / Lint and Format Check (push) Has been cancelled
Some checks failed
Build and Test / Build Server (Linux) (push) Failing after 2m59s
Build and Test / Build Agent (Windows) (push) Has started running
Build and Test / Security Audit (push) Has been cancelled
Build and Test / Build Summary (push) Has been cancelled
Run Tests / Test Server (push) Has been cancelled
Run Tests / Test Agent (push) Has been cancelled
Run Tests / Code Coverage (push) Has been cancelled
Run Tests / Lint and Format Check (push) Has been cancelled
First run of the build-and-test CI gate (cargo fmt --all -- --check) surfaced pre-existing formatting drift across the agent and server crates. Apply rustfmt across the workspace so the codebase meets its own CI gate. Pure formatting; no logic changes. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -6,21 +6,21 @@
|
||||
use axum::{
|
||||
extract::{
|
||||
ws::{Message, WebSocket, WebSocketUpgrade},
|
||||
Query, State, ConnectInfo,
|
||||
ConnectInfo, Query, State,
|
||||
},
|
||||
response::IntoResponse,
|
||||
http::StatusCode,
|
||||
response::IntoResponse,
|
||||
};
|
||||
use std::net::SocketAddr;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
use prost::Message as ProstMessage;
|
||||
use serde::Deserialize;
|
||||
use std::net::SocketAddr;
|
||||
use tracing::{error, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::db::{self, Database};
|
||||
use crate::proto;
|
||||
use crate::session::SessionManager;
|
||||
use crate::db::{self, Database};
|
||||
use crate::AppState;
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
@@ -59,7 +59,11 @@ pub async fn agent_ws_handler(
|
||||
Query(params): Query<AgentParams>,
|
||||
) -> Result<impl IntoResponse, StatusCode> {
|
||||
let agent_id = params.agent_id.clone();
|
||||
let agent_name = params.hostname.clone().or(params.agent_name.clone()).unwrap_or_else(|| agent_id.clone());
|
||||
let agent_name = params
|
||||
.hostname
|
||||
.clone()
|
||||
.or(params.agent_name.clone())
|
||||
.unwrap_or_else(|| agent_id.clone());
|
||||
let support_code = params.support_code.clone();
|
||||
let api_key = params.api_key.clone();
|
||||
let client_ip = addr.ip();
|
||||
@@ -69,7 +73,10 @@ pub async fn agent_ws_handler(
|
||||
// API key = persistent managed agent
|
||||
|
||||
if support_code.is_none() && api_key.is_none() {
|
||||
warn!("Agent connection rejected: {} from {} - no support code or API key", agent_id, client_ip);
|
||||
warn!(
|
||||
"Agent connection rejected: {} from {} - no support code or API key",
|
||||
agent_id, client_ip
|
||||
);
|
||||
|
||||
// Log failed connection attempt to database
|
||||
if let Some(ref db) = state.db {
|
||||
@@ -84,7 +91,8 @@ pub async fn agent_ws_handler(
|
||||
"agent_id": agent_id
|
||||
})),
|
||||
Some(client_ip),
|
||||
).await;
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
return Err(StatusCode::UNAUTHORIZED);
|
||||
@@ -95,7 +103,10 @@ pub async fn agent_ws_handler(
|
||||
// Check if it's a valid, pending support code
|
||||
let code_info = state.support_codes.get_status(code).await;
|
||||
if code_info.is_none() {
|
||||
warn!("Agent connection rejected: {} from {} - invalid support code {}", agent_id, client_ip, code);
|
||||
warn!(
|
||||
"Agent connection rejected: {} from {} - invalid support code {}",
|
||||
agent_id, client_ip, code
|
||||
);
|
||||
|
||||
// Log failed connection attempt
|
||||
if let Some(ref db) = state.db {
|
||||
@@ -111,14 +122,18 @@ pub async fn agent_ws_handler(
|
||||
"agent_id": agent_id
|
||||
})),
|
||||
Some(client_ip),
|
||||
).await;
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
return Err(StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
let status = code_info.unwrap();
|
||||
if status != "pending" && status != "connected" {
|
||||
warn!("Agent connection rejected: {} from {} - support code {} has status {}", agent_id, client_ip, code, status);
|
||||
warn!(
|
||||
"Agent connection rejected: {} from {} - support code {} has status {}",
|
||||
agent_id, client_ip, code, status
|
||||
);
|
||||
|
||||
// Log failed connection attempt (expired/cancelled code)
|
||||
if let Some(ref db) = state.db {
|
||||
@@ -140,12 +155,16 @@ pub async fn agent_ws_handler(
|
||||
"agent_id": agent_id
|
||||
})),
|
||||
Some(client_ip),
|
||||
).await;
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
return Err(StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
info!("Agent {} from {} authenticated via support code {}", agent_id, client_ip, code);
|
||||
info!(
|
||||
"Agent {} from {} authenticated via support code {}",
|
||||
agent_id, client_ip, code
|
||||
);
|
||||
}
|
||||
|
||||
// Validate API key if provided (for persistent agents)
|
||||
@@ -153,7 +172,10 @@ pub async fn agent_ws_handler(
|
||||
// For now, we'll accept API keys that match the JWT secret or a configured agent key
|
||||
// In production, this should validate against a database of registered agents
|
||||
if !validate_agent_api_key(&state, key).await {
|
||||
warn!("Agent connection rejected: {} from {} - invalid API key", agent_id, client_ip);
|
||||
warn!(
|
||||
"Agent connection rejected: {} from {} - invalid API key",
|
||||
agent_id, client_ip
|
||||
);
|
||||
|
||||
// Log failed connection attempt
|
||||
if let Some(ref db) = state.db {
|
||||
@@ -168,19 +190,34 @@ pub async fn agent_ws_handler(
|
||||
"agent_id": agent_id
|
||||
})),
|
||||
Some(client_ip),
|
||||
).await;
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
return Err(StatusCode::UNAUTHORIZED);
|
||||
}
|
||||
info!("Agent {} from {} authenticated via API key", agent_id, client_ip);
|
||||
info!(
|
||||
"Agent {} from {} authenticated via API key",
|
||||
agent_id, client_ip
|
||||
);
|
||||
}
|
||||
|
||||
let sessions = state.sessions.clone();
|
||||
let support_codes = state.support_codes.clone();
|
||||
let db = state.db.clone();
|
||||
|
||||
Ok(ws.on_upgrade(move |socket| handle_agent_connection(socket, sessions, support_codes, db, agent_id, agent_name, support_code, Some(client_ip))))
|
||||
Ok(ws.on_upgrade(move |socket| {
|
||||
handle_agent_connection(
|
||||
socket,
|
||||
sessions,
|
||||
support_codes,
|
||||
db,
|
||||
agent_id,
|
||||
agent_name,
|
||||
support_code,
|
||||
Some(client_ip),
|
||||
)
|
||||
}))
|
||||
}
|
||||
|
||||
/// Validate an agent API key
|
||||
@@ -212,24 +249,42 @@ pub async fn viewer_ws_handler(
|
||||
|
||||
// Require JWT token for viewers
|
||||
let token = params.token.ok_or_else(|| {
|
||||
warn!("Viewer connection rejected from {}: missing token", client_ip);
|
||||
warn!(
|
||||
"Viewer connection rejected from {}: missing token",
|
||||
client_ip
|
||||
);
|
||||
StatusCode::UNAUTHORIZED
|
||||
})?;
|
||||
|
||||
// Validate the token
|
||||
let claims = state.jwt_config.validate_token(&token).map_err(|e| {
|
||||
warn!("Viewer connection rejected from {}: invalid token: {}", client_ip, e);
|
||||
warn!(
|
||||
"Viewer connection rejected from {}: invalid token: {}",
|
||||
client_ip, e
|
||||
);
|
||||
StatusCode::UNAUTHORIZED
|
||||
})?;
|
||||
|
||||
info!("Viewer {} authenticated via JWT from {}", claims.username, client_ip);
|
||||
info!(
|
||||
"Viewer {} authenticated via JWT from {}",
|
||||
claims.username, client_ip
|
||||
);
|
||||
|
||||
let session_id = params.session_id;
|
||||
let viewer_name = params.viewer_name;
|
||||
let sessions = state.sessions.clone();
|
||||
let db = state.db.clone();
|
||||
|
||||
Ok(ws.on_upgrade(move |socket| handle_viewer_connection(socket, sessions, db, session_id, viewer_name, Some(client_ip))))
|
||||
Ok(ws.on_upgrade(move |socket| {
|
||||
handle_viewer_connection(
|
||||
socket,
|
||||
sessions,
|
||||
db,
|
||||
session_id,
|
||||
viewer_name,
|
||||
Some(client_ip),
|
||||
)
|
||||
}))
|
||||
}
|
||||
|
||||
/// Handle an agent WebSocket connection
|
||||
@@ -243,7 +298,10 @@ async fn handle_agent_connection(
|
||||
support_code: Option<String>,
|
||||
client_ip: Option<std::net::IpAddr>,
|
||||
) {
|
||||
info!("Agent connected: {} ({}) from {:?}", agent_name, agent_id, client_ip);
|
||||
info!(
|
||||
"Agent connected: {} ({}) from {:?}",
|
||||
agent_name, agent_id, client_ip
|
||||
);
|
||||
|
||||
let (mut ws_sender, mut ws_receiver) = socket.split();
|
||||
|
||||
@@ -270,7 +328,9 @@ async fn handle_agent_connection(
|
||||
// Register the agent and get channels
|
||||
// Persistent agents (no support code) keep their session when disconnected
|
||||
let is_persistent = support_code.is_none();
|
||||
let (session_id, frame_tx, mut input_rx) = sessions.register_agent(agent_id.clone(), agent_name.clone(), is_persistent).await;
|
||||
let (session_id, frame_tx, mut input_rx) = sessions
|
||||
.register_agent(agent_id.clone(), agent_name.clone(), is_persistent)
|
||||
.await;
|
||||
|
||||
info!("Session created: {} (agent in idle mode)", session_id);
|
||||
|
||||
@@ -285,15 +345,20 @@ async fn handle_agent_connection(
|
||||
machine.id,
|
||||
support_code.is_some(),
|
||||
support_code.as_deref(),
|
||||
).await;
|
||||
)
|
||||
.await;
|
||||
|
||||
// Log session started event
|
||||
let _ = db::events::log_event(
|
||||
db.pool(),
|
||||
session_id,
|
||||
db::events::EventTypes::SESSION_STARTED,
|
||||
None, None, None, client_ip,
|
||||
).await;
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
client_ip,
|
||||
)
|
||||
.await;
|
||||
|
||||
Some(machine.id)
|
||||
}
|
||||
@@ -309,7 +374,9 @@ async fn handle_agent_connection(
|
||||
// If a support code was provided, mark it as connected
|
||||
if let Some(ref code) = support_code {
|
||||
info!("Linking support code {} to session {}", code, session_id);
|
||||
support_codes.mark_connected(code, Some(agent_name.clone()), Some(agent_id.clone())).await;
|
||||
support_codes
|
||||
.mark_connected(code, Some(agent_name.clone()), Some(agent_id.clone()))
|
||||
.await;
|
||||
support_codes.link_session(code, session_id).await;
|
||||
|
||||
// Database: update support code
|
||||
@@ -320,7 +387,8 @@ async fn handle_agent_connection(
|
||||
Some(session_id),
|
||||
Some(&agent_name),
|
||||
Some(&agent_id),
|
||||
).await;
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -333,7 +401,11 @@ async fn handle_agent_connection(
|
||||
let input_forward = tokio::spawn(async move {
|
||||
while let Some(input_data) = input_rx.recv().await {
|
||||
let mut sender = ws_sender_input.lock().await;
|
||||
if sender.send(Message::Binary(input_data.into())).await.is_err() {
|
||||
if sender
|
||||
.send(Message::Binary(input_data.into()))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -406,22 +478,29 @@ async fn handle_agent_connection(
|
||||
} else {
|
||||
Some(status.site.clone())
|
||||
};
|
||||
sessions_status.update_agent_status(
|
||||
session_id,
|
||||
Some(status.os_version.clone()),
|
||||
status.is_elevated,
|
||||
status.uptime_secs,
|
||||
status.display_count,
|
||||
status.is_streaming,
|
||||
agent_version.clone(),
|
||||
organization.clone(),
|
||||
site.clone(),
|
||||
status.tags.clone(),
|
||||
).await;
|
||||
sessions_status
|
||||
.update_agent_status(
|
||||
session_id,
|
||||
Some(status.os_version.clone()),
|
||||
status.is_elevated,
|
||||
status.uptime_secs,
|
||||
status.display_count,
|
||||
status.is_streaming,
|
||||
agent_version.clone(),
|
||||
organization.clone(),
|
||||
site.clone(),
|
||||
status.tags.clone(),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Update version in database if present
|
||||
if let (Some(ref db), Some(ref version)) = (&db, &agent_version) {
|
||||
let _ = crate::db::releases::update_machine_version(db.pool(), &agent_id, version).await;
|
||||
let _ = crate::db::releases::update_machine_version(
|
||||
db.pool(),
|
||||
&agent_id,
|
||||
version,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Update organization/site/tags in database if present
|
||||
@@ -432,7 +511,8 @@ async fn handle_agent_connection(
|
||||
organization.as_deref(),
|
||||
site.as_deref(),
|
||||
&status.tags,
|
||||
).await;
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
info!("Agent status update: {} - streaming={}, uptime={}s, version={:?}, org={:?}, site={:?}",
|
||||
@@ -489,8 +569,12 @@ async fn handle_agent_connection(
|
||||
db.pool(),
|
||||
session_id,
|
||||
db::events::EventTypes::SESSION_ENDED,
|
||||
None, None, None, client_ip,
|
||||
).await;
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
client_ip,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
// Mark support code as completed if one was used (unless cancelled)
|
||||
@@ -532,7 +616,10 @@ async fn handle_viewer_connection(
|
||||
let viewer_id = Uuid::new_v4().to_string();
|
||||
|
||||
// Join the session (this sends StartStream to agent if first viewer)
|
||||
let (mut frame_rx, input_tx) = match sessions.join_session(session_id, viewer_id.clone(), viewer_name.clone()).await {
|
||||
let (mut frame_rx, input_tx) = match sessions
|
||||
.join_session(session_id, viewer_id.clone(), viewer_name.clone())
|
||||
.await
|
||||
{
|
||||
Some(channels) => channels,
|
||||
None => {
|
||||
warn!("Session not found: {}", session_id);
|
||||
@@ -540,7 +627,10 @@ async fn handle_viewer_connection(
|
||||
}
|
||||
};
|
||||
|
||||
info!("Viewer {} ({}) joined session: {} from {:?}", viewer_name, viewer_id, session_id, client_ip);
|
||||
info!(
|
||||
"Viewer {} ({}) joined session: {} from {:?}",
|
||||
viewer_name, viewer_id, session_id, client_ip
|
||||
);
|
||||
|
||||
// Database: log viewer joined event
|
||||
if let Some(ref db) = db {
|
||||
@@ -550,8 +640,10 @@ async fn handle_viewer_connection(
|
||||
db::events::EventTypes::VIEWER_JOINED,
|
||||
Some(&viewer_id),
|
||||
Some(&viewer_name),
|
||||
None, client_ip,
|
||||
).await;
|
||||
None,
|
||||
client_ip,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
let (mut ws_sender, mut ws_receiver) = socket.split();
|
||||
@@ -559,7 +651,11 @@ async fn handle_viewer_connection(
|
||||
// Task to forward frames from agent to this viewer
|
||||
let frame_forward = tokio::spawn(async move {
|
||||
while let Ok(frame_data) = frame_rx.recv().await {
|
||||
if ws_sender.send(Message::Binary(frame_data.into())).await.is_err() {
|
||||
if ws_sender
|
||||
.send(Message::Binary(frame_data.into()))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -577,9 +673,9 @@ async fn handle_viewer_connection(
|
||||
match proto::Message::decode(data.as_ref()) {
|
||||
Ok(proto_msg) => {
|
||||
match &proto_msg.payload {
|
||||
Some(proto::message::Payload::MouseEvent(_)) |
|
||||
Some(proto::message::Payload::KeyEvent(_)) |
|
||||
Some(proto::message::Payload::SpecialKey(_)) => {
|
||||
Some(proto::message::Payload::MouseEvent(_))
|
||||
| Some(proto::message::Payload::KeyEvent(_))
|
||||
| Some(proto::message::Payload::SpecialKey(_)) => {
|
||||
// Forward input to agent
|
||||
let _ = input_tx.send(data.to_vec()).await;
|
||||
}
|
||||
@@ -597,7 +693,10 @@ async fn handle_viewer_connection(
|
||||
}
|
||||
}
|
||||
Ok(Message::Close(_)) => {
|
||||
info!("Viewer {} disconnected from session: {}", viewer_id, session_id);
|
||||
info!(
|
||||
"Viewer {} disconnected from session: {}",
|
||||
viewer_id, session_id
|
||||
);
|
||||
break;
|
||||
}
|
||||
Ok(_) => {}
|
||||
@@ -610,7 +709,9 @@ async fn handle_viewer_connection(
|
||||
|
||||
// Cleanup (this sends StopStream to agent if last viewer)
|
||||
frame_forward.abort();
|
||||
sessions_cleanup.leave_session(session_id, &viewer_id_cleanup).await;
|
||||
sessions_cleanup
|
||||
.leave_session(session_id, &viewer_id_cleanup)
|
||||
.await;
|
||||
|
||||
// Database: log viewer left event
|
||||
if let Some(ref db) = db {
|
||||
@@ -620,8 +721,10 @@ async fn handle_viewer_connection(
|
||||
db::events::EventTypes::VIEWER_LEFT,
|
||||
Some(&viewer_id_cleanup),
|
||||
Some(&viewer_name_cleanup),
|
||||
None, client_ip,
|
||||
).await;
|
||||
None,
|
||||
client_ip,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
info!("Viewer {} left session: {}", viewer_id_cleanup, session_id);
|
||||
|
||||
Reference in New Issue
Block a user