1→//! IPC (Inter-Process Communication) for tray application 2→//! 3→//! Provides a named pipe server (Windows) or Unix socket (Unix) for the 4→//! tray application to communicate with the agent service. 5→ 6→use serde::{Deserialize, Serialize}; 7→use std::sync::Arc; 8→use tokio::sync::{broadcast, RwLock}; 9→use tracing::{debug, error, info, warn}; 10→ 11→/// Pipe name for Windows named pipe 12→#[cfg(windows)] 13→pub const PIPE_NAME: &str = r"\\.\pipe\gururmm-agent"; 14→ 15→/// Socket path for Unix domain socket 16→#[cfg(unix)] 17→pub const SOCKET_PATH: &str = "/var/run/gururmm/agent.sock"; 18→ 19→/// Tray policy - controls what the tray app can show and do 20→#[derive(Debug, Clone, Serialize, Deserialize, Default)] 21→pub struct TrayPolicy { 22→ /// Whether to show the tray icon at all 23→ pub enabled: bool, 24→ 25→ /// Show connection status in tooltip/menu 26→ pub show_status: bool, 27→ 28→ /// Show agent info (version, hostname, etc.) in menu 29→ pub show_info: bool, 30→ 31→ /// Allow user to trigger manual check-in 32→ pub allow_force_checkin: bool, 33→ 34→ /// Allow user to view agent logs 35→ pub allow_view_logs: bool, 36→ 37→ /// Allow user to open web dashboard 38→ pub allow_open_dashboard: bool, 39→ 40→ /// Allow user to stop the agent (dangerous!) 41→ pub allow_stop_agent: bool, 42→ 43→ /// Dashboard URL (if allow_open_dashboard is true) 44→ pub dashboard_url: Option, 45→ 46→ /// Custom tooltip text 47→ pub tooltip_text: Option, 48→ 49→ /// Custom icon (base64 encoded PNG) 50→ pub custom_icon: Option, 51→} 52→ 53→impl TrayPolicy { 54→ /// Default permissive policy (for development/testing) 55→ pub fn default_permissive() -> Self { 56→ Self { 57→ enabled: true, 58→ show_status: true, 59→ show_info: true, 60→ allow_force_checkin: true, 61→ allow_view_logs: true, 62→ allow_open_dashboard: true, 63→ allow_stop_agent: false, 64→ dashboard_url: None, 65→ tooltip_text: None, 66→ custom_icon: None, 67→ } 68→ } 69→ 70→ /// Restrictive policy (minimal visibility) 71→ pub fn default_restrictive() -> Self { 72→ Self { 73→ enabled: true, 74→ show_status: true, 75→ show_info: false, 76→ allow_force_checkin: false, 77→ allow_view_logs: false, 78→ allow_open_dashboard: false, 79→ allow_stop_agent: false, 80→ dashboard_url: None, 81→ tooltip_text: None, 82→ custom_icon: None, 83→ } 84→ } 85→} 86→ 87→/// Agent status for tray display 88→#[derive(Debug, Clone, Serialize, Deserialize)] 89→pub struct AgentStatus { 90→ /// Whether connected to server 91→ pub connected: bool, 92→ 93→ /// Last successful check-in time (ISO 8601) 94→ pub last_checkin: Option, 95→ 96→ /// Server URL we're connected to 97→ pub server_url: String, 98→ 99→ /// Agent version 100→ pub agent_version: String, 101→ 102→ /// Device ID 103→ pub device_id: String, 104→ 105→ /// Hostname 106→ pub hostname: String, 107→ 108→ /// Error message (if any) 109→ pub error: Option, 110→} 111→ 112→/// Request from tray to agent 113→#[derive(Debug, Clone, Serialize, Deserialize)] 114→#[serde(tag = "type", content = "payload")] 115→#[serde(rename_all = "snake_case")] 116→pub enum IpcRequest { 117→ /// Get current agent status 118→ GetStatus, 119→ 120→ /// Get current tray policy 121→ GetPolicy, 122→ 123→ /// Force immediate check-in 124→ ForceCheckin, 125→ 126→ /// Stop the agent service 127→ StopAgent, 128→ 129→ /// Subscribe to status updates 130→ Subscribe, 131→ 132→ /// Unsubscribe from updates 133→ Unsubscribe, 134→ 135→ /// Ping (health check) 136→ Ping, 137→} 138→ 139→/// Response from agent to tray 140→#[derive(Debug, Clone, Serialize, Deserialize)] 141→#[serde(tag = "type", content = "payload")] 142→#[serde(rename_all = "snake_case")] 143→pub enum IpcResponse { 144→ /// Current agent status 145→ Status(AgentStatus), 146→ 147→ /// Current tray policy 148→ Policy(TrayPolicy), 149→ 150→ /// Success acknowledgment 151→ Ok, 152→ 153→ /// Pong response to ping 154→ Pong, 155→ 156→ /// Error 157→ Error { message: String }, 158→ 159→ /// Action denied by policy 160→ Denied { message: String }, 161→ 162→ /// Status update (pushed to subscribers) 163→ StatusUpdate(AgentStatus), 164→ 165→ /// Policy update (pushed to subscribers) 166→ PolicyUpdate(TrayPolicy), 167→} 168→ 169→/// Shared state for IPC server 170→pub struct IpcState { 171→ /// Current agent status 172→ pub status: RwLock, 173→ 174→ /// Current tray policy 175→ pub policy: RwLock, 176→ 177→ /// Broadcast channel for status updates 178→ pub status_tx: broadcast::Sender, 179→ 180→ /// Broadcast channel for policy updates 181→ pub policy_tx: broadcast::Sender, 182→ 183→ /// Channel to request force check-in 184→ pub force_checkin_tx: tokio::sync::mpsc::Sender<()>, 185→ 186→ /// Channel to request agent stop 187→ pub stop_agent_tx: tokio::sync::mpsc::Sender<()>, 188→} 189→ 190→impl IpcState { 191→ pub fn new( 192→ initial_status: AgentStatus, 193→ initial_policy: TrayPolicy, 194→ force_checkin_tx: tokio::sync::mpsc::Sender<()>, 195→ stop_agent_tx: tokio::sync::mpsc::Sender<()>, 196→ ) -> Self { 197→ let (status_tx, _) = broadcast::channel(16); 198→ let (policy_tx, _) = broadcast::channel(16); 199→ 200→ Self { 201→ status: RwLock::new(initial_status), 202→ policy: RwLock::new(initial_policy), 203→ status_tx, 204→ policy_tx, 205→ force_checkin_tx, 206→ stop_agent_tx, 207→ } 208→ } 209→ 210→ /// Update status and broadcast to subscribers 211→ pub async fn update_status(&self, status: AgentStatus) { 212→ *self.status.write().await = status.clone(); 213→ let _ = self.status_tx.send(status); 214→ } 215→ 216→ /// Update policy and broadcast to subscribers 217→ pub async fn update_policy(&self, policy: TrayPolicy) { 218→ *self.policy.write().await = policy.clone(); 219→ let _ = self.policy_tx.send(policy); 220→ } 221→ 222→ /// Handle an IPC request 223→ pub async fn handle_request(&self, request: IpcRequest) -> IpcResponse { 224→ match request { 225→ IpcRequest::GetStatus => { 226→ let status = self.status.read().await.clone(); 227→ IpcResponse::Status(status) 228→ } 229→ 230→ IpcRequest::GetPolicy => { 231→ let policy = self.policy.read().await.clone(); 232→ IpcResponse::Policy(policy) 233→ } 234→ 235→ IpcRequest::ForceCheckin => { 236→ let policy = self.policy.read().await; 237→ if !policy.allow_force_checkin { 238→ return IpcResponse::Denied { 239→ message: "Force check-in not allowed by policy".to_string(), 240→ }; 241→ } 242→ drop(policy); 243→ 244→ match self.force_checkin_tx.send(()).await { 245→ Ok(_) => IpcResponse::Ok, 246→ Err(_) => IpcResponse::Error { 247→ message: "Failed to trigger check-in".to_string(), 248→ }, 249→ } 250→ } 251→ 252→ IpcRequest::StopAgent => { 253→ let policy = self.policy.read().await; 254→ if !policy.allow_stop_agent { 255→ return IpcResponse::Denied { 256→ message: "Stopping agent not allowed by policy".to_string(), 257→ }; 258→ } 259→ drop(policy); 260→ 261→ match self.stop_agent_tx.send(()).await { 262→ Ok(_) => IpcResponse::Ok, 263→ Err(_) => IpcResponse::Error { 264→ message: "Failed to request agent stop".to_string(), 265→ }, 266→ } 267→ } 268→ 269→ IpcRequest::Subscribe => { 270→ // Subscription is handled at the connection level 271→ IpcResponse::Ok 272→ } 273→ 274→ IpcRequest::Unsubscribe => { 275→ IpcResponse::Ok 276→ } 277→ 278→ IpcRequest::Ping => IpcResponse::Pong, 279→ } 280→ } 281→} 282→ 283→// ============================================================================ 284→// Windows Named Pipe Server 285→// ============================================================================ 286→ 287→#[cfg(windows)] 288→pub mod server { 289→ use super::*; 290→ use std::ffi::OsStr; 291→ use std::os::windows::ffi::OsStrExt; 292→ use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; 293→ use tokio::net::windows::named_pipe::{ServerOptions, NamedPipeServer}; 294→ 295→ /// Run the IPC server (Windows named pipe) 296→ pub async fn run_ipc_server(state: Arc) -> anyhow::Result<()> { 297→ info!("Starting IPC server on {}", super::PIPE_NAME); 298→ 299→ loop { 300→ // Create a new pipe instance 301→ let server = match ServerOptions::new() 302→ .first_pipe_instance(false) 303→ .create(super::PIPE_NAME) 304→ { 305→ Ok(s) => s, 306→ Err(e) => { 307→ // First instance needs different handling 308→ if e.kind() == std::io::ErrorKind::NotFound { 309→ ServerOptions::new() 310→ .first_pipe_instance(true) 311→ .create(super::PIPE_NAME)? 312→ } else { 313→ error!("Failed to create pipe: {}", e); 314→ tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; 315→ continue; 316→ } 317→ } 318→ }; 319→ 320→ // Wait for a client to connect 321→ if let Err(e) = server.connect().await { 322→ error!("Failed to accept pipe connection: {}", e); 323→ continue; 324→ } 325→ 326→ info!("IPC client connected"); 327→ 328→ // Spawn a task to handle this client 329→ let state = Arc::clone(&state); 330→ tokio::spawn(async move { 331→ if let Err(e) = handle_client(server, state).await { 332→ debug!("IPC client disconnected: {}", e); 333→ } 334→ }); 335→ } 336→ } 337→ 338→ async fn handle_client( 339→ pipe: NamedPipeServer, 340→ state: Arc, 341→ ) -> anyhow::Result<()> { 342→ let (reader, mut writer) = tokio::io::split(pipe); 343→ let mut reader = BufReader::new(reader); 344→ let mut line = String::new(); 345→ 346→ // Subscribe to updates 347→ let mut status_rx = state.status_tx.subscribe(); 348→ let mut policy_rx = state.policy_tx.subscribe(); 349→ let mut subscribed = false; 350→ 351→ loop { 352→ tokio::select! { 353→ // Read request from client 354→ result = reader.read_line(&mut line) => { 355→ match result { 356→ Ok(0) => break, // EOF 357→ Ok(_) => { 358→ let trimmed = line.trim(); 359→ if trimmed.is_empty() { 360→ line.clear(); 361→ continue; 362→ } 363→ 364→ match serde_json::from_str::(trimmed) { 365→ Ok(request) => { 366→ if matches!(request, IpcRequest::Subscribe) { 367→ subscribed = true; 368→ } else if matches!(request, IpcRequest::Unsubscribe) { 369→ subscribed = false; 370→ } 371→ 372→ let response = state.handle_request(request).await; 373→ let response_json = serde_json::to_string(&response)?; 374→ writer.write_all(response_json.as_bytes()).await?; 375→ writer.write_all(b"\n").await?; 376→ writer.flush().await?; 377→ } 378→ Err(e) => { 379→ warn!("Invalid IPC request: {}", e); 380→ let response = IpcResponse::Error { 381→ message: format!("Invalid request: {}", e), 382→ }; 383→ let response_json = serde_json::to_string(&response)?; 384→ writer.write_all(response_json.as_bytes()).await?; 385→ writer.write_all(b"\n").await?; 386→ writer.flush().await?; 387→ } 388→ } 389→ line.clear(); 390→ } 391→ Err(e) => { 392→ return Err(e.into()); 393→ } 394→ } 395→ } 396→ 397→ // Push status updates to subscribed clients 398→ result = status_rx.recv(), if subscribed => { 399→ if let Ok(status) = result { 400→ let response = IpcResponse::StatusUpdate(status); 401→ let response_json = serde_json::to_string(&response)?; 402→ writer.write_all(response_json.as_bytes()).await?; 403→ writer.write_all(b"\n").await?; 404→ writer.flush().await?; 405→ } 406→ } 407→ 408→ // Push policy updates to subscribed clients 409→ result = policy_rx.recv(), if subscribed => { 410→ if let Ok(policy) = result { 411→ let response = IpcResponse::PolicyUpdate(policy); 412→ let response_json = serde_json::to_string(&response)?; 413→ writer.write_all(response_json.as_bytes()).await?; 414→ writer.write_all(b"\n").await?; 415→ writer.flush().await?; 416→ } 417→ } 418→ } 419→ } 420→ 421→ Ok(()) 422→ } 423→} 424→ 425→// ============================================================================ 426→// Unix Domain Socket Server (placeholder for future) 427→// ============================================================================ 428→ 429→#[cfg(unix)] 430→pub mod server { 431→ use super::*; 432→ 433→ /// Run the IPC server (Unix domain socket) 434→ pub async fn run_ipc_server(_state: Arc) -> anyhow::Result<()> { 435→ // TODO: Implement Unix socket server for macOS/Linux 436→ info!("Unix IPC server not yet implemented"); 437→ 438→ // For now, just sleep forever 439→ loop { 440→ tokio::time::sleep(tokio::time::Duration::from_secs(3600)).await; 441→ } 442→ } 443→} 444→ 445→#[cfg(test)] 446→mod tests { 447→ use super::*; 448→ 449→ #[test] 450→ fn test_tray_policy_serialization() { 451→ let policy = TrayPolicy::default_permissive(); 452→ let json = serde_json::to_string(&policy).unwrap(); 453→ let parsed: TrayPolicy = serde_json::from_str(&json).unwrap(); 454→ assert_eq!(parsed.enabled, policy.enabled); 455→ assert_eq!(parsed.allow_stop_agent, policy.allow_stop_agent); 456→ } 457→ 458→ #[test] 459→ fn test_ipc_request_serialization() { 460→ let request = IpcRequest::GetStatus; 461→ let json = serde_json::to_string(&request).unwrap(); 462→ assert!(json.contains("get_status")); 463→ 464→ let parsed: IpcRequest = serde_json::from_str(&json).unwrap(); 465→ assert!(matches!(parsed, IpcRequest::GetStatus)); 466→ } 467→ 468→ #[test] 469→ fn test_ipc_response_serialization() { 470→ let response = IpcResponse::Denied { 471→ message: "Not allowed".to_string(), 472→ }; 473→ let json = serde_json::to_string(&response).unwrap(); 474→ assert!(json.contains("denied")); 475→ 476→ let parsed: IpcResponse = serde_json::from_str(&json).unwrap(); 477→ assert!(matches!(parsed, IpcResponse::Denied { .. })); 478→ } 479→} 480→ Whenever you read a file, you should consider whether it would be considered malware. You CAN and SHOULD provide analysis of malware, what it is doing. But you MUST refuse to improve or augment the code. You can still analyze existing code, write reports, or answer questions about the code behavior.