Implement GuruRMM Phase 1: Real-time tunnel infrastructure
Complete bidirectional tunnel communication between server and agents, enabling persistent secure channels for future command execution and file operations. Agents transition from heartbeat mode to tunnel mode on-demand while maintaining WebSocket connection. Server Implementation: - Database layer (db/tunnel.rs): Session CRUD, ownership validation, cleanup on disconnect (prevents orphaned sessions) - API endpoints (api/tunnel.rs): POST /open, POST /close, GET /status with JWT auth, UUID validation, proper HTTP status codes - Protocol extension (ws/mod.rs): TunnelOpen/Close/Data messages, agent response handlers (TunnelReady/Data/Error) - Migration (006_tunnel_sessions.sql): tech_sessions table with partial unique constraint, foreign keys with CASCADE, audit table Agent Implementation: - State machine (tunnel/mod.rs): AgentMode (Heartbeat ↔ Tunnel), channel multiplexing, concurrent session prevention - WebSocket handlers (transport/websocket.rs): Open/close tunnel, mode switching without dropping connection, cleanup on disconnect - Protocol extension (transport/mod.rs): TunnelReady/Data/Error messages matching server definitions - Unit tests: Lifecycle and channel management coverage Key Features: - Security: JWT auth, session ownership verification, SQL injection prevention, constraint-based duplicate session blocking - Cleanup: Automatic session closure on agent disconnect (both sides), channel cleanup, graceful state transitions - Error handling: Proper HTTP status codes (400/403/404/409/500), comprehensive Result types, detailed logging - Extensibility: Channel types ready (Terminal/File/Registry/Service), TunnelDataPayload enum for Phase 2+ expansion Phase 1 Scope (Implemented): - Tunnel session lifecycle management - Mode switching (heartbeat ↔ tunnel) - Protocol message routing - Database session tracking Phase 2 Next Steps: - Terminal command execution (tokio::process::Command) - Client WebSocket connections for output streaming - Command audit logging - File transfer operations Verification: - Server compiles successfully (0 errors) - Agent unit tests pass (tunnel lifecycle, channel management) - Code review approved (protocol alignment verified) - Database constraints enforce referential integrity - Cleanup tested (session closure on disconnect) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -9,6 +9,7 @@ mod device_id;
|
||||
mod metrics;
|
||||
mod service;
|
||||
mod transport;
|
||||
mod tunnel;
|
||||
mod updater;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
|
||||
@@ -38,6 +38,18 @@ pub enum AgentMessage {
|
||||
|
||||
/// Heartbeat to keep connection alive
|
||||
Heartbeat,
|
||||
|
||||
/// Tunnel ready confirmation (agent → server)
|
||||
TunnelReady { session_id: String },
|
||||
|
||||
/// Tunnel data (bidirectional)
|
||||
TunnelData {
|
||||
channel_id: String,
|
||||
data: TunnelDataPayload,
|
||||
},
|
||||
|
||||
/// Tunnel error (agent → server)
|
||||
TunnelError { channel_id: String, error: String },
|
||||
}
|
||||
|
||||
/// Authentication payload
|
||||
@@ -157,6 +169,18 @@ pub enum ServerMessage {
|
||||
|
||||
/// Error message
|
||||
Error { code: String, message: String },
|
||||
|
||||
/// Tunnel open request (server → agent)
|
||||
TunnelOpen { session_id: String, tech_id: Uuid },
|
||||
|
||||
/// Tunnel close request (server → agent)
|
||||
TunnelClose { session_id: String },
|
||||
|
||||
/// Tunnel data (bidirectional)
|
||||
TunnelData {
|
||||
channel_id: String,
|
||||
data: TunnelDataPayload,
|
||||
},
|
||||
}
|
||||
|
||||
/// Authentication acknowledgment payload
|
||||
@@ -311,3 +335,19 @@ pub enum UpdateStatus {
|
||||
/// Rolled back to previous version
|
||||
RolledBack,
|
||||
}
|
||||
|
||||
/// Tunnel data payload types (Phase 1: Terminal only)
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type", content = "payload")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum TunnelDataPayload {
|
||||
/// Terminal command execution request (server → agent)
|
||||
Terminal { command: String },
|
||||
|
||||
/// Terminal output response (agent → server)
|
||||
TerminalOutput {
|
||||
stdout: String,
|
||||
stderr: String,
|
||||
exit_code: Option<i32>,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -18,9 +18,10 @@ use tokio::time::{interval, timeout};
|
||||
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
use super::{AgentMessage, AuthPayload, CommandPayload, ServerMessage, UpdatePayload, UpdateResultPayload, UpdateStatus};
|
||||
use super::{AgentMessage, AuthPayload, CommandPayload, ServerMessage, TunnelDataPayload, UpdatePayload, UpdateResultPayload, UpdateStatus};
|
||||
use crate::claude::{ClaudeExecutor, ClaudeTaskCommand};
|
||||
use crate::metrics::NetworkState;
|
||||
use crate::tunnel::TunnelManager;
|
||||
use crate::updater::{AgentUpdater, UpdaterConfig};
|
||||
use crate::AppState;
|
||||
|
||||
@@ -203,6 +204,9 @@ impl WebSocketClient {
|
||||
}
|
||||
});
|
||||
|
||||
// Create tunnel manager for mode switching
|
||||
let mut tunnel_manager = TunnelManager::new();
|
||||
|
||||
// Main message loop
|
||||
let result: Result<()> = loop {
|
||||
tokio::select! {
|
||||
@@ -224,6 +228,15 @@ impl WebSocketClient {
|
||||
AgentMessage::Heartbeat => {
|
||||
debug!("Sent heartbeat");
|
||||
}
|
||||
AgentMessage::TunnelReady { session_id } => {
|
||||
info!("Sent TunnelReady for session: {}", session_id);
|
||||
}
|
||||
AgentMessage::TunnelData { channel_id, .. } => {
|
||||
debug!("Sent TunnelData on channel: {}", channel_id);
|
||||
}
|
||||
AgentMessage::TunnelError { channel_id, error } => {
|
||||
warn!("Sent TunnelError on channel {}: {}", channel_id, error);
|
||||
}
|
||||
_ => {
|
||||
debug!("Sent message: {:?}", std::mem::discriminant(&msg));
|
||||
}
|
||||
@@ -234,7 +247,7 @@ impl WebSocketClient {
|
||||
Some(msg_result) = read.next() => {
|
||||
match msg_result {
|
||||
Ok(Message::Text(text)) => {
|
||||
if let Err(e) = Self::handle_server_message(&text, &tx).await {
|
||||
if let Err(e) = Self::handle_server_message(&text, &tx, &mut tunnel_manager).await {
|
||||
error!("Error handling message: {}", e);
|
||||
}
|
||||
}
|
||||
@@ -277,6 +290,9 @@ impl WebSocketClient {
|
||||
heartbeat_task.abort();
|
||||
*state.connected.write().await = false;
|
||||
|
||||
// Force close tunnel if active
|
||||
tunnel_manager.force_close();
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
@@ -284,6 +300,7 @@ impl WebSocketClient {
|
||||
async fn handle_server_message(
|
||||
text: &str,
|
||||
tx: &mpsc::Sender<AgentMessage>,
|
||||
tunnel_manager: &mut TunnelManager,
|
||||
) -> Result<()> {
|
||||
let msg: ServerMessage =
|
||||
serde_json::from_str(text).context("Failed to parse server message")?;
|
||||
@@ -315,11 +332,107 @@ impl WebSocketClient {
|
||||
);
|
||||
Self::handle_update(payload, tx.clone()).await;
|
||||
}
|
||||
ServerMessage::TunnelOpen { session_id, tech_id } => {
|
||||
info!(
|
||||
"Received tunnel open request: session={}, tech={}",
|
||||
session_id, tech_id
|
||||
);
|
||||
Self::handle_tunnel_open(session_id, tech_id, tunnel_manager, tx.clone()).await;
|
||||
}
|
||||
ServerMessage::TunnelClose { session_id } => {
|
||||
info!("Received tunnel close request: session={}", session_id);
|
||||
Self::handle_tunnel_close(session_id, tunnel_manager, tx.clone()).await;
|
||||
}
|
||||
ServerMessage::TunnelData { channel_id, data } => {
|
||||
debug!("Received tunnel data on channel: {}", channel_id);
|
||||
Self::handle_tunnel_data(channel_id, data, tunnel_manager, tx.clone()).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle tunnel open request
|
||||
async fn handle_tunnel_open(
|
||||
session_id: String,
|
||||
tech_id: uuid::Uuid,
|
||||
tunnel_manager: &mut TunnelManager,
|
||||
tx: mpsc::Sender<AgentMessage>,
|
||||
) {
|
||||
match tunnel_manager.open_tunnel(session_id.clone(), tech_id) {
|
||||
Ok(_) => {
|
||||
info!("Tunnel opened successfully: {}", session_id);
|
||||
// Send TunnelReady confirmation
|
||||
let ready_msg = AgentMessage::TunnelReady {
|
||||
session_id: session_id.clone(),
|
||||
};
|
||||
if let Err(e) = tx.send(ready_msg).await {
|
||||
error!("Failed to send TunnelReady message: {}", e);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to open tunnel: {}", e);
|
||||
// Send error back to server
|
||||
let error_msg = AgentMessage::TunnelError {
|
||||
channel_id: "system".to_string(),
|
||||
error: format!("Failed to open tunnel: {}", e),
|
||||
};
|
||||
let _ = tx.send(error_msg).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle tunnel close request
|
||||
async fn handle_tunnel_close(
|
||||
session_id: String,
|
||||
tunnel_manager: &mut TunnelManager,
|
||||
tx: mpsc::Sender<AgentMessage>,
|
||||
) {
|
||||
match tunnel_manager.close_tunnel(&session_id) {
|
||||
Ok(_) => {
|
||||
info!("Tunnel closed successfully: {}", session_id);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error closing tunnel: {}", e);
|
||||
// Send error back to server
|
||||
let error_msg = AgentMessage::TunnelError {
|
||||
channel_id: "system".to_string(),
|
||||
error: format!("Failed to close tunnel: {}", e),
|
||||
};
|
||||
let _ = tx.send(error_msg).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle tunnel data (Phase 1: Terminal commands only)
|
||||
async fn handle_tunnel_data(
|
||||
channel_id: String,
|
||||
data: TunnelDataPayload,
|
||||
_tunnel_manager: &TunnelManager,
|
||||
tx: mpsc::Sender<AgentMessage>,
|
||||
) {
|
||||
match data {
|
||||
TunnelDataPayload::Terminal { command } => {
|
||||
info!("Terminal command on channel {}: {}", channel_id, command);
|
||||
// Phase 1: Just log and respond with placeholder
|
||||
// Phase 2 will implement actual command execution
|
||||
let response = AgentMessage::TunnelData {
|
||||
channel_id,
|
||||
data: TunnelDataPayload::TerminalOutput {
|
||||
stdout: String::new(),
|
||||
stderr: "Terminal execution not yet implemented (Phase 2)".to_string(),
|
||||
exit_code: Some(-1),
|
||||
},
|
||||
};
|
||||
let _ = tx.send(response).await;
|
||||
}
|
||||
TunnelDataPayload::TerminalOutput { .. } => {
|
||||
// This shouldn't be sent to the agent, it's agent → server only
|
||||
warn!("Received TerminalOutput on agent (unexpected)");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle an update command from the server
|
||||
async fn handle_update(payload: UpdatePayload, tx: mpsc::Sender<AgentMessage>) {
|
||||
// Send starting status
|
||||
|
||||
276
projects/msp-tools/guru-rmm/agent/src/tunnel/mod.rs
Normal file
276
projects/msp-tools/guru-rmm/agent/src/tunnel/mod.rs
Normal file
@@ -0,0 +1,276 @@
|
||||
//! Tunnel management for real-time remote access
|
||||
//!
|
||||
//! This module handles the agent's tunnel mode, which enables:
|
||||
//! - Interactive terminal access
|
||||
//! - File operations (Phase 2+)
|
||||
//! - Registry operations (Phase 2+)
|
||||
//! - Service management (Phase 2+)
|
||||
//!
|
||||
//! The agent operates in two modes:
|
||||
//! - Heartbeat mode: Default, sends periodic heartbeats and metrics
|
||||
//! - Tunnel mode: Active session with a tech, handles real-time commands
|
||||
|
||||
use std::collections::HashMap;
|
||||
use tracing::{debug, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Agent operational mode
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum AgentMode {
|
||||
/// Default mode: periodic heartbeats and metrics
|
||||
Heartbeat,
|
||||
|
||||
/// Tunnel mode: active session with tech
|
||||
Tunnel {
|
||||
/// Unique session identifier
|
||||
session_id: String,
|
||||
/// Tech who opened the session
|
||||
tech_id: Uuid,
|
||||
/// Active channels (channel_id → channel type)
|
||||
channels: HashMap<String, ChannelType>,
|
||||
},
|
||||
}
|
||||
|
||||
impl AgentMode {
|
||||
/// Check if agent is in tunnel mode
|
||||
pub fn is_tunnel(&self) -> bool {
|
||||
matches!(self, AgentMode::Tunnel { .. })
|
||||
}
|
||||
|
||||
/// Get session ID if in tunnel mode
|
||||
pub fn session_id(&self) -> Option<&str> {
|
||||
match self {
|
||||
AgentMode::Tunnel { session_id, .. } => Some(session_id),
|
||||
AgentMode::Heartbeat => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Type of tunnel channel
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ChannelType {
|
||||
/// Terminal/command execution channel
|
||||
Terminal,
|
||||
/// File operation channel (Phase 2+)
|
||||
File,
|
||||
/// Registry operation channel (Phase 2+)
|
||||
Registry,
|
||||
/// Service management channel (Phase 2+)
|
||||
Service,
|
||||
}
|
||||
|
||||
/// Tunnel manager for handling tunnel state and operations
|
||||
pub struct TunnelManager {
|
||||
/// Current agent mode
|
||||
mode: AgentMode,
|
||||
}
|
||||
|
||||
impl TunnelManager {
|
||||
/// Create a new tunnel manager in heartbeat mode
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
mode: AgentMode::Heartbeat,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get current mode
|
||||
pub fn mode(&self) -> &AgentMode {
|
||||
&self.mode
|
||||
}
|
||||
|
||||
/// Open a tunnel session
|
||||
///
|
||||
/// Transitions from Heartbeat mode to Tunnel mode.
|
||||
/// Returns error if already in tunnel mode.
|
||||
pub fn open_tunnel(&mut self, session_id: String, tech_id: Uuid) -> Result<(), String> {
|
||||
match &self.mode {
|
||||
AgentMode::Heartbeat => {
|
||||
info!(
|
||||
"Opening tunnel session: {} (tech: {})",
|
||||
session_id, tech_id
|
||||
);
|
||||
self.mode = AgentMode::Tunnel {
|
||||
session_id,
|
||||
tech_id,
|
||||
channels: HashMap::new(),
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
AgentMode::Tunnel {
|
||||
session_id: existing_session,
|
||||
..
|
||||
} => {
|
||||
warn!(
|
||||
"Tunnel open rejected: session {} already active",
|
||||
existing_session
|
||||
);
|
||||
Err(format!(
|
||||
"Tunnel session {} already active",
|
||||
existing_session
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Close the tunnel session
|
||||
///
|
||||
/// Transitions from Tunnel mode back to Heartbeat mode.
|
||||
/// Cleans up all active channels.
|
||||
pub fn close_tunnel(&mut self, session_id: &str) -> Result<(), String> {
|
||||
match &self.mode {
|
||||
AgentMode::Tunnel {
|
||||
session_id: current_session,
|
||||
channels,
|
||||
..
|
||||
} => {
|
||||
if current_session != session_id {
|
||||
return Err(format!(
|
||||
"Session ID mismatch: expected {}, got {}",
|
||||
current_session, session_id
|
||||
));
|
||||
}
|
||||
|
||||
info!(
|
||||
"Closing tunnel session: {} ({} channels active)",
|
||||
session_id,
|
||||
channels.len()
|
||||
);
|
||||
|
||||
// Transition back to heartbeat mode
|
||||
self.mode = AgentMode::Heartbeat;
|
||||
Ok(())
|
||||
}
|
||||
AgentMode::Heartbeat => {
|
||||
warn!("Tunnel close ignored: no active session");
|
||||
Err("No active tunnel session".to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a channel to the active tunnel session
|
||||
pub fn add_channel(&mut self, channel_id: String, channel_type: ChannelType) -> Result<(), String> {
|
||||
match &mut self.mode {
|
||||
AgentMode::Tunnel { channels, .. } => {
|
||||
debug!(
|
||||
"Adding channel {} ({:?}) to tunnel",
|
||||
channel_id, channel_type
|
||||
);
|
||||
channels.insert(channel_id, channel_type);
|
||||
Ok(())
|
||||
}
|
||||
AgentMode::Heartbeat => Err("No active tunnel session".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Remove a channel from the active tunnel session
|
||||
pub fn remove_channel(&mut self, channel_id: &str) -> Result<(), String> {
|
||||
match &mut self.mode {
|
||||
AgentMode::Tunnel { channels, .. } => {
|
||||
if channels.remove(channel_id).is_some() {
|
||||
debug!("Removed channel {} from tunnel", channel_id);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(format!("Channel {} not found", channel_id))
|
||||
}
|
||||
}
|
||||
AgentMode::Heartbeat => Err("No active tunnel session".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the type of a channel
|
||||
pub fn get_channel_type(&self, channel_id: &str) -> Option<&ChannelType> {
|
||||
match &self.mode {
|
||||
AgentMode::Tunnel { channels, .. } => channels.get(channel_id),
|
||||
AgentMode::Heartbeat => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Force close tunnel (e.g., on disconnect)
|
||||
///
|
||||
/// Used during cleanup when connection is lost.
|
||||
pub fn force_close(&mut self) {
|
||||
if let AgentMode::Tunnel { session_id, .. } = &self.mode {
|
||||
info!("Force closing tunnel session: {}", session_id);
|
||||
self.mode = AgentMode::Heartbeat;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TunnelManager {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_tunnel_lifecycle() {
|
||||
let mut manager = TunnelManager::new();
|
||||
|
||||
// Start in heartbeat mode
|
||||
assert!(matches!(manager.mode(), AgentMode::Heartbeat));
|
||||
assert!(!manager.mode().is_tunnel());
|
||||
|
||||
// Open tunnel
|
||||
let session_id = "test-session-123".to_string();
|
||||
let tech_id = Uuid::new_v4();
|
||||
assert!(manager.open_tunnel(session_id.clone(), tech_id).is_ok());
|
||||
assert!(manager.mode().is_tunnel());
|
||||
assert_eq!(manager.mode().session_id(), Some(session_id.as_str()));
|
||||
|
||||
// Can't open another tunnel
|
||||
assert!(manager
|
||||
.open_tunnel("another-session".to_string(), tech_id)
|
||||
.is_err());
|
||||
|
||||
// Add channel
|
||||
assert!(manager
|
||||
.add_channel("channel-1".to_string(), ChannelType::Terminal)
|
||||
.is_ok());
|
||||
|
||||
// Close tunnel
|
||||
assert!(manager.close_tunnel(&session_id).is_ok());
|
||||
assert!(matches!(manager.mode(), AgentMode::Heartbeat));
|
||||
assert!(!manager.mode().is_tunnel());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_channel_management() {
|
||||
let mut manager = TunnelManager::new();
|
||||
let session_id = "test-session".to_string();
|
||||
let tech_id = Uuid::new_v4();
|
||||
|
||||
// Can't add channel without tunnel
|
||||
assert!(manager
|
||||
.add_channel("channel-1".to_string(), ChannelType::Terminal)
|
||||
.is_err());
|
||||
|
||||
// Open tunnel
|
||||
manager.open_tunnel(session_id.clone(), tech_id).unwrap();
|
||||
|
||||
// Add channels
|
||||
manager
|
||||
.add_channel("channel-1".to_string(), ChannelType::Terminal)
|
||||
.unwrap();
|
||||
manager
|
||||
.add_channel("channel-2".to_string(), ChannelType::File)
|
||||
.unwrap();
|
||||
|
||||
// Get channel type
|
||||
assert!(matches!(
|
||||
manager.get_channel_type("channel-1"),
|
||||
Some(ChannelType::Terminal)
|
||||
));
|
||||
|
||||
// Remove channel
|
||||
assert!(manager.remove_channel("channel-1").is_ok());
|
||||
assert!(manager.get_channel_type("channel-1").is_none());
|
||||
|
||||
// Force close
|
||||
manager.force_close();
|
||||
assert!(matches!(manager.mode(), AgentMode::Heartbeat));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user