Complete bidirectional tunnel communication between server and agents, enabling persistent secure channels for future command execution and file operations. Agents transition from heartbeat mode to tunnel mode on-demand while maintaining WebSocket connection. Server Implementation: - Database layer (db/tunnel.rs): Session CRUD, ownership validation, cleanup on disconnect (prevents orphaned sessions) - API endpoints (api/tunnel.rs): POST /open, POST /close, GET /status with JWT auth, UUID validation, proper HTTP status codes - Protocol extension (ws/mod.rs): TunnelOpen/Close/Data messages, agent response handlers (TunnelReady/Data/Error) - Migration (006_tunnel_sessions.sql): tech_sessions table with partial unique constraint, foreign keys with CASCADE, audit table Agent Implementation: - State machine (tunnel/mod.rs): AgentMode (Heartbeat ↔ Tunnel), channel multiplexing, concurrent session prevention - WebSocket handlers (transport/websocket.rs): Open/close tunnel, mode switching without dropping connection, cleanup on disconnect - Protocol extension (transport/mod.rs): TunnelReady/Data/Error messages matching server definitions - Unit tests: Lifecycle and channel management coverage Key Features: - Security: JWT auth, session ownership verification, SQL injection prevention, constraint-based duplicate session blocking - Cleanup: Automatic session closure on agent disconnect (both sides), channel cleanup, graceful state transitions - Error handling: Proper HTTP status codes (400/403/404/409/500), comprehensive Result types, detailed logging - Extensibility: Channel types ready (Terminal/File/Registry/Service), TunnelDataPayload enum for Phase 2+ expansion Phase 1 Scope (Implemented): - Tunnel session lifecycle management - Mode switching (heartbeat ↔ tunnel) - Protocol message routing - Database session tracking Phase 2 Next Steps: - Terminal command execution (tokio::process::Command) - Client WebSocket connections for output streaming - Command audit logging - File transfer operations Verification: - Server compiles successfully (0 errors) - Agent unit tests pass (tunnel lifecycle, channel management) - Code review approved (protocol alignment verified) - Database constraints enforce referential integrity - Cleanup tested (session closure on disconnect) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
232 lines
7.0 KiB
Rust
232 lines
7.0 KiB
Rust
//! Tunnel session management endpoints
|
|
|
|
use axum::{
|
|
extract::{Path, State},
|
|
http::StatusCode,
|
|
Json,
|
|
};
|
|
use serde::{Deserialize, Serialize};
|
|
use tracing::{error, warn};
|
|
use uuid::Uuid;
|
|
|
|
use crate::auth::AuthUser;
|
|
use crate::db;
|
|
use crate::ws::ServerMessage;
|
|
use crate::AppState;
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
pub struct OpenTunnelRequest {
|
|
pub agent_id: String,
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
pub struct OpenTunnelResponse {
|
|
pub session_id: String,
|
|
pub status: String,
|
|
}
|
|
|
|
#[derive(Debug, Deserialize)]
|
|
pub struct CloseTunnelRequest {
|
|
pub session_id: String,
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
pub struct CloseTunnelResponse {
|
|
pub status: String,
|
|
}
|
|
|
|
#[derive(Debug, Serialize)]
|
|
pub struct TunnelStatusResponse {
|
|
pub session_id: String,
|
|
pub agent_id: String,
|
|
pub status: String,
|
|
pub opened_at: String,
|
|
pub last_activity: String,
|
|
}
|
|
|
|
/// POST /api/v1/tunnel/open
|
|
/// Open a new tunnel session to an agent
|
|
pub async fn open_tunnel(
|
|
State(state): State<AppState>,
|
|
user: AuthUser,
|
|
Json(req): Json<OpenTunnelRequest>,
|
|
) -> Result<Json<OpenTunnelResponse>, (StatusCode, String)> {
|
|
// Parse agent_id
|
|
let agent_id = Uuid::parse_str(&req.agent_id)
|
|
.map_err(|_| (StatusCode::BAD_REQUEST, "Invalid agent_id format".to_string()))?;
|
|
|
|
// Check if agent exists and is online
|
|
let agent_connected = state.agents.read().await.is_connected(&agent_id);
|
|
if !agent_connected {
|
|
return Err((
|
|
StatusCode::NOT_FOUND,
|
|
"Agent not connected".to_string(),
|
|
));
|
|
}
|
|
|
|
// Check for existing active session
|
|
let has_session = db::has_active_session(&state.db, user.user_id, agent_id)
|
|
.await
|
|
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
|
|
|
|
if has_session {
|
|
return Err((
|
|
StatusCode::CONFLICT,
|
|
"Active session already exists for this agent".to_string(),
|
|
));
|
|
}
|
|
|
|
// Generate session ID
|
|
let session_id = Uuid::new_v4().to_string();
|
|
|
|
// Create session in database
|
|
let _session = db::create_tech_session(&state.db, &session_id, user.user_id, agent_id)
|
|
.await
|
|
.map_err(|e| {
|
|
// Handle unique constraint violation (PostgreSQL error code 23505)
|
|
if let Some(db_err) = e.as_database_error() {
|
|
if db_err.code().as_deref() == Some("23505") {
|
|
return (
|
|
StatusCode::CONFLICT,
|
|
"Active session already exists for this agent".to_string(),
|
|
);
|
|
}
|
|
}
|
|
error!("Failed to create tunnel session: {}", e);
|
|
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
|
|
})?;
|
|
|
|
// Send TunnelOpen message to agent via WebSocket
|
|
let tunnel_open_msg = ServerMessage::TunnelOpen {
|
|
session_id: session_id.clone(),
|
|
tech_id: user.user_id,
|
|
};
|
|
|
|
let sent = state.agents.read().await.send_to(&agent_id, tunnel_open_msg).await;
|
|
if !sent {
|
|
// Clean up database session if send failed
|
|
if let Err(e) = db::close_tech_session(&state.db, &session_id).await {
|
|
error!("Failed to cleanup session {} after send failure: {}", session_id, e);
|
|
}
|
|
return Err((
|
|
StatusCode::INTERNAL_SERVER_ERROR,
|
|
"Failed to send tunnel open message to agent".to_string(),
|
|
));
|
|
}
|
|
|
|
Ok(Json(OpenTunnelResponse {
|
|
session_id,
|
|
status: "active".to_string(),
|
|
}))
|
|
}
|
|
|
|
/// POST /api/v1/tunnel/close
|
|
/// Close an existing tunnel session
|
|
pub async fn close_tunnel(
|
|
State(state): State<AppState>,
|
|
user: AuthUser,
|
|
Json(req): Json<CloseTunnelRequest>,
|
|
) -> Result<Json<CloseTunnelResponse>, (StatusCode, String)> {
|
|
// Validate session_id format
|
|
if Uuid::parse_str(&req.session_id).is_err() {
|
|
return Err((StatusCode::BAD_REQUEST, "Invalid session_id format".to_string()));
|
|
}
|
|
|
|
// Verify session ownership
|
|
let is_owner = db::verify_session_ownership(&state.db, &req.session_id, user.user_id)
|
|
.await
|
|
.map_err(|e| {
|
|
error!("Failed to verify session ownership: {}", e);
|
|
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
|
|
})?;
|
|
|
|
if !is_owner {
|
|
return Err((
|
|
StatusCode::FORBIDDEN,
|
|
"Session not found or not owned by user".to_string(),
|
|
));
|
|
}
|
|
|
|
// Get session to find agent_id
|
|
let session = db::get_tech_session(&state.db, &req.session_id)
|
|
.await
|
|
.map_err(|e| {
|
|
error!("Failed to get session: {}", e);
|
|
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
|
|
})?
|
|
.ok_or((StatusCode::NOT_FOUND, "Session not found".to_string()))?;
|
|
|
|
// Send TunnelClose message to agent
|
|
let tunnel_close_msg = ServerMessage::TunnelClose {
|
|
session_id: req.session_id.clone(),
|
|
};
|
|
|
|
if !state.agents.read().await.send_to(&session.agent_id, tunnel_close_msg).await {
|
|
warn!(
|
|
"Failed to send TunnelClose message to agent {} for session {}",
|
|
session.agent_id, req.session_id
|
|
);
|
|
}
|
|
|
|
// Close session in database
|
|
match db::close_tech_session(&state.db, &req.session_id).await {
|
|
Ok(rows) if rows == 0 => {
|
|
warn!("No rows updated when closing session {}", req.session_id);
|
|
}
|
|
Ok(_) => {}
|
|
Err(e) => {
|
|
error!("Failed to close session in database: {}", e);
|
|
return Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string()));
|
|
}
|
|
}
|
|
|
|
Ok(Json(CloseTunnelResponse {
|
|
status: "closed".to_string(),
|
|
}))
|
|
}
|
|
|
|
/// GET /api/v1/tunnel/status/:session_id
|
|
/// Get tunnel session status
|
|
pub async fn get_tunnel_status(
|
|
State(state): State<AppState>,
|
|
user: AuthUser,
|
|
Path(session_id): Path<String>,
|
|
) -> Result<Json<TunnelStatusResponse>, (StatusCode, String)> {
|
|
// Validate session_id format
|
|
if Uuid::parse_str(&session_id).is_err() {
|
|
return Err((StatusCode::BAD_REQUEST, "Invalid session_id format".to_string()));
|
|
}
|
|
|
|
// Verify session ownership
|
|
let is_owner = db::verify_session_ownership(&state.db, &session_id, user.user_id)
|
|
.await
|
|
.map_err(|e| {
|
|
error!("Failed to verify session ownership: {}", e);
|
|
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
|
|
})?;
|
|
|
|
if !is_owner {
|
|
return Err((
|
|
StatusCode::FORBIDDEN,
|
|
"Session not found or not owned by user".to_string(),
|
|
));
|
|
}
|
|
|
|
// Get session
|
|
let session = db::get_tech_session(&state.db, &session_id)
|
|
.await
|
|
.map_err(|e| {
|
|
error!("Failed to get session: {}", e);
|
|
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string())
|
|
})?
|
|
.ok_or((StatusCode::NOT_FOUND, "Session not found".to_string()))?;
|
|
|
|
Ok(Json(TunnelStatusResponse {
|
|
session_id: session.session_id,
|
|
agent_id: session.agent_id.to_string(),
|
|
status: session.status,
|
|
opened_at: session.opened_at.to_rfc3339(),
|
|
last_activity: session.last_activity.to_rfc3339(),
|
|
}))
|
|
}
|