Add VPN configuration tools and agent documentation

Created comprehensive VPN setup tooling for Peaceful Spirit L2TP/IPsec connection
and enhanced agent documentation framework.

VPN Configuration (PST-NW-VPN):
- Setup-PST-L2TP-VPN.ps1: Automated L2TP/IPsec setup with split-tunnel and DNS
- Connect-PST-VPN.ps1: Connection helper with PPP adapter detection, DNS (192.168.0.2), and route config (192.168.0.0/24)
- Connect-PST-VPN-Standalone.ps1: Self-contained connection script for remote deployment
- Fix-PST-VPN-Auth.ps1: Authentication troubleshooting for CHAP/MSChapv2
- Diagnose-VPN-Interface.ps1: Comprehensive VPN interface and routing diagnostic
- Quick-Test-VPN.ps1: Fast connectivity verification (DNS/router/routes)
- Add-PST-VPN-Route-Manual.ps1: Manual route configuration helper
- vpn-connect.bat, vpn-disconnect.bat: Simple batch file shortcuts
- OpenVPN config files (Windows-compatible, abandoned for L2TP)

Key VPN Implementation Details:
- L2TP creates PPP adapter with connection name as interface description
- UniFi auto-configures DNS (192.168.0.2) but requires manual route to 192.168.0.0/24
- Split-tunnel enabled (only remote traffic through VPN)
- All-user connection for pre-login auto-connect via scheduled task
- Authentication: CHAP + MSChapv2 for UniFi compatibility

Agent Documentation:
- AGENT_QUICK_REFERENCE.md: Quick reference for all specialized agents
- documentation-squire.md: Documentation and task management specialist agent
- Updated all agent markdown files with standardized formatting

Project Organization:
- Moved conversation logs to dedicated directories (guru-connect-conversation-logs, guru-rmm-conversation-logs)
- Cleaned up old session JSONL files from projects/msp-tools/
- Added guru-connect infrastructure (agent, dashboard, proto, scripts, .gitea workflows)
- Added guru-rmm server components and deployment configs

Technical Notes:
- VPN IP pool: 192.168.4.x (client gets 192.168.4.6)
- Remote network: 192.168.0.0/24 (router at 192.168.0.10)
- PSK: rrClvnmUeXEFo90Ol+z7tfsAZHeSK6w7
- Credentials: pst-admin / 24Hearts$

Files: 15 VPN scripts, 2 agent docs, conversation log reorganization,
guru-connect/guru-rmm infrastructure additions

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-18 11:51:47 -07:00
parent b0a68d89bf
commit 6c316aa701
272 changed files with 37068 additions and 2 deletions

View File

@@ -0,0 +1,11 @@
//! Remote command execution module
//!
//! Handles execution of commands received from the server.
//! Command execution is currently handled inline in transport/websocket.rs
//! This module will be expanded with additional features in Phase 2.
// Future additions:
// - Command queue for offline execution
// - Script caching
// - Elevated execution handling
// - Command result streaming

View File

@@ -0,0 +1,290 @@
//! Agent configuration handling
//!
//! Configuration is loaded from a TOML file (default: agent.toml).
//! The config file defines server connection, metrics collection,
//! and watchdog settings.
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::path::Path;
/// Root configuration structure
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentConfig {
/// Server connection settings
pub server: ServerConfig,
/// Metrics collection settings
#[serde(default)]
pub metrics: MetricsConfig,
/// Watchdog settings for monitoring services/processes
#[serde(default)]
pub watchdog: WatchdogConfig,
}
/// Server connection configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServerConfig {
/// WebSocket URL for the GuruRMM server (e.g., wss://rmm.example.com/ws)
pub url: String,
/// API key for authentication (obtained from server during registration)
pub api_key: String,
/// Optional custom hostname to report (defaults to system hostname)
pub hostname_override: Option<String>,
}
/// Metrics collection configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsConfig {
/// Interval in seconds between metrics collection (default: 60)
#[serde(default = "default_metrics_interval")]
pub interval_seconds: u64,
/// Whether to collect CPU metrics
#[serde(default = "default_true")]
pub collect_cpu: bool,
/// Whether to collect memory metrics
#[serde(default = "default_true")]
pub collect_memory: bool,
/// Whether to collect disk metrics
#[serde(default = "default_true")]
pub collect_disk: bool,
/// Whether to collect network metrics
#[serde(default = "default_true")]
pub collect_network: bool,
}
impl Default for MetricsConfig {
fn default() -> Self {
Self {
interval_seconds: 60,
collect_cpu: true,
collect_memory: true,
collect_disk: true,
collect_network: true,
}
}
}
/// Watchdog configuration for service/process monitoring
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WatchdogConfig {
/// Enable/disable watchdog functionality
#[serde(default)]
pub enabled: bool,
/// Interval in seconds between watchdog checks (default: 30)
#[serde(default = "default_watchdog_interval")]
pub check_interval_seconds: u64,
/// List of Windows/systemd services to monitor
#[serde(default)]
pub services: Vec<ServiceWatch>,
/// List of processes to monitor
#[serde(default)]
pub processes: Vec<ProcessWatch>,
}
impl Default for WatchdogConfig {
fn default() -> Self {
Self {
enabled: false,
check_interval_seconds: 30,
services: Vec::new(),
processes: Vec::new(),
}
}
}
/// Configuration for monitoring a service
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceWatch {
/// Service name (e.g., "CagService" for Datto RMM, "Syncro" for Syncro)
pub name: String,
/// Action to take when service is stopped
#[serde(default)]
pub action: WatchAction,
/// Maximum number of restart attempts before alerting (default: 3)
#[serde(default = "default_max_restarts")]
pub max_restarts: u32,
/// Cooldown period in seconds between restart attempts
#[serde(default = "default_restart_cooldown")]
pub restart_cooldown_seconds: u64,
}
/// Configuration for monitoring a process
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessWatch {
/// Process name (e.g., "AEM.exe")
pub name: String,
/// Action to take when process is not found
#[serde(default)]
pub action: WatchAction,
/// Optional path to executable to start if process is not running
pub start_command: Option<String>,
}
/// Action to take when a watched service/process is down
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum WatchAction {
/// Only send an alert to the server
#[default]
Alert,
/// Attempt to restart the service/process
Restart,
/// Ignore (for temporary disable without removing config)
Ignore,
}
// Default value functions for serde
fn default_metrics_interval() -> u64 {
60
}
fn default_watchdog_interval() -> u64 {
30
}
fn default_max_restarts() -> u32 {
3
}
fn default_restart_cooldown() -> u64 {
60
}
fn default_true() -> bool {
true
}
impl AgentConfig {
/// Load configuration from a TOML file
pub fn load(path: &Path) -> Result<Self> {
let content = std::fs::read_to_string(path)
.with_context(|| format!("Failed to read config file: {:?}", path))?;
let config: Self = toml::from_str(&content)
.with_context(|| format!("Failed to parse config file: {:?}", path))?;
config.validate()?;
Ok(config)
}
/// Validate the configuration
fn validate(&self) -> Result<()> {
// Validate server URL
if self.server.url.is_empty() {
anyhow::bail!("Server URL cannot be empty");
}
if !self.server.url.starts_with("ws://") && !self.server.url.starts_with("wss://") {
anyhow::bail!("Server URL must start with ws:// or wss://");
}
// Validate API key
if self.server.api_key.is_empty() {
anyhow::bail!("API key cannot be empty");
}
// Validate intervals
if self.metrics.interval_seconds < 10 {
anyhow::bail!("Metrics interval must be at least 10 seconds");
}
if self.watchdog.check_interval_seconds < 5 {
anyhow::bail!("Watchdog check interval must be at least 5 seconds");
}
Ok(())
}
/// Generate a sample configuration
pub fn sample() -> Self {
Self {
server: ServerConfig {
url: "wss://rmm-api.azcomputerguru.com/ws".to_string(),
api_key: "your-api-key-here".to_string(),
hostname_override: None,
},
metrics: MetricsConfig::default(),
watchdog: WatchdogConfig {
enabled: true,
check_interval_seconds: 30,
services: vec![
ServiceWatch {
name: "CagService".to_string(), // Datto RMM
action: WatchAction::Restart,
max_restarts: 3,
restart_cooldown_seconds: 60,
},
ServiceWatch {
name: "Syncro".to_string(),
action: WatchAction::Restart,
max_restarts: 3,
restart_cooldown_seconds: 60,
},
],
processes: vec![ProcessWatch {
name: "AEM.exe".to_string(), // Datto AEM
action: WatchAction::Alert,
start_command: None,
}],
},
}
}
/// Get the hostname to report to the server
pub fn get_hostname(&self) -> String {
self.server
.hostname_override
.clone()
.unwrap_or_else(|| hostname::get().map(|h| h.to_string_lossy().to_string()).unwrap_or_else(|_| "unknown".to_string()))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sample_config_is_valid_structure() {
let sample = AgentConfig::sample();
// Sample uses placeholder values, so it won't pass full validation
// but the structure should be correct
assert!(!sample.server.url.is_empty());
assert!(!sample.server.api_key.is_empty());
assert!(sample.watchdog.enabled);
assert!(!sample.watchdog.services.is_empty());
}
#[test]
fn test_default_metrics_config() {
let config = MetricsConfig::default();
assert_eq!(config.interval_seconds, 60);
assert!(config.collect_cpu);
assert!(config.collect_memory);
assert!(config.collect_disk);
assert!(config.collect_network);
}
#[test]
fn test_watch_action_default() {
let action = WatchAction::default();
assert_eq!(action, WatchAction::Alert);
}
}

View File

@@ -0,0 +1,213 @@
//! Device ID generation
//!
//! Provides a stable, unique identifier for each machine that:
//! - Survives agent reinstalls
//! - Is hardware-derived when possible
//! - Falls back to a persisted UUID if hardware IDs are unavailable
use anyhow::Result;
use std::fs;
use std::path::PathBuf;
use tracing::{debug, info, warn};
/// Get the device ID for this machine
///
/// Priority:
/// 1. Hardware-based ID (MachineGuid on Windows, machine-id on Linux)
/// 2. Previously persisted ID
/// 3. Generate and persist a new UUID
pub fn get_device_id() -> String {
// Try hardware-based ID first
if let Some(id) = get_hardware_device_id() {
debug!("Using hardware-based device ID");
return id;
}
// Try to read a persisted ID
let persist_path = get_persist_path();
if let Some(id) = read_persisted_id(&persist_path) {
debug!("Using persisted device ID from {:?}", persist_path);
return id;
}
// Generate and persist a new ID
let new_id = generate_device_id();
info!("Generated new device ID, persisting to {:?}", persist_path);
if let Err(e) = persist_device_id(&persist_path, &new_id) {
warn!("Failed to persist device ID: {}", e);
}
new_id
}
/// Generate a new device ID (UUID v4)
fn generate_device_id() -> String {
uuid::Uuid::new_v4().to_string()
}
/// Get the path where device ID should be persisted
fn get_persist_path() -> PathBuf {
#[cfg(target_os = "windows")]
{
// %ProgramData%\GuruRMM\.device-id
let program_data = std::env::var("ProgramData")
.unwrap_or_else(|_| "C:\\ProgramData".to_string());
PathBuf::from(program_data).join("GuruRMM").join(".device-id")
}
#[cfg(not(target_os = "windows"))]
{
// /var/lib/gururmm/.device-id
PathBuf::from("/var/lib/gururmm/.device-id")
}
}
/// Read a persisted device ID from disk
fn read_persisted_id(path: &PathBuf) -> Option<String> {
fs::read_to_string(path)
.ok()
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty() && s.len() < 100)
}
/// Persist device ID to disk
fn persist_device_id(path: &PathBuf, id: &str) -> Result<()> {
// Create parent directory if needed
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
fs::write(path, id)?;
Ok(())
}
/// Get hardware-based device ID
#[cfg(target_os = "windows")]
fn get_hardware_device_id() -> Option<String> {
// Try MachineGuid from registry
// HKLM\SOFTWARE\Microsoft\Cryptography\MachineGuid
use std::process::Command;
let output = Command::new("reg")
.args([
"query",
"HKLM\\SOFTWARE\\Microsoft\\Cryptography",
"/v",
"MachineGuid",
])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let stdout = String::from_utf8_lossy(&output.stdout);
// Parse the output: "MachineGuid REG_SZ <guid>"
for line in stdout.lines() {
if line.contains("MachineGuid") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 3 {
let guid = parts.last()?.trim();
if !guid.is_empty() && guid.len() > 20 {
return Some(format!("win-{}", guid));
}
}
}
}
None
}
/// Get hardware-based device ID
#[cfg(target_os = "linux")]
fn get_hardware_device_id() -> Option<String> {
// Try /etc/machine-id first (systemd)
if let Ok(id) = fs::read_to_string("/etc/machine-id") {
let id = id.trim();
if !id.is_empty() && id.len() >= 32 {
return Some(format!("linux-{}", id));
}
}
// Try /var/lib/dbus/machine-id (older systems)
if let Ok(id) = fs::read_to_string("/var/lib/dbus/machine-id") {
let id = id.trim();
if !id.is_empty() && id.len() >= 32 {
return Some(format!("linux-{}", id));
}
}
// Try SMBIOS product UUID (requires root usually)
if let Ok(id) = fs::read_to_string("/sys/class/dmi/id/product_uuid") {
let id = id.trim();
if !id.is_empty() && id.len() > 20 {
return Some(format!("hw-{}", id));
}
}
None
}
/// Get hardware-based device ID
#[cfg(target_os = "macos")]
fn get_hardware_device_id() -> Option<String> {
use std::process::Command;
// Try IOPlatformUUID
let output = Command::new("ioreg")
.args(["-rd1", "-c", "IOPlatformExpertDevice"])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let stdout = String::from_utf8_lossy(&output.stdout);
// Parse: "IOPlatformUUID" = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
for line in stdout.lines() {
if line.contains("IOPlatformUUID") {
if let Some(start) = line.find('"') {
let rest = &line[start + 1..];
if let Some(end) = rest.find('"') {
let uuid = &rest[..end];
// Skip the first quote if double-quoted
let uuid = uuid.trim_start_matches('"');
if !uuid.is_empty() && uuid.len() > 20 {
return Some(format!("mac-{}", uuid));
}
}
}
}
}
None
}
/// Fallback for unsupported platforms
#[cfg(not(any(target_os = "windows", target_os = "linux", target_os = "macos")))]
fn get_hardware_device_id() -> Option<String> {
None
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_get_device_id() {
let id = get_device_id();
assert!(!id.is_empty());
println!("Device ID: {}", id);
}
#[test]
fn test_generate_device_id() {
let id1 = generate_device_id();
let id2 = generate_device_id();
assert_ne!(id1, id2);
assert!(id1.len() >= 32);
}
}

View File

@@ -0,0 +1,690 @@
//! GuruRMM Agent - Cross-platform Remote Monitoring and Management Agent
//!
//! This agent connects to the GuruRMM server, reports system metrics,
//! monitors services (watchdog), and executes remote commands.
mod config;
mod device_id;
mod metrics;
mod service;
mod transport;
mod updater;
use anyhow::{Context, Result};
use clap::{Parser, Subcommand};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{error, info, warn};
use crate::config::AgentConfig;
use crate::metrics::MetricsCollector;
use crate::transport::WebSocketClient;
/// GuruRMM Agent - Remote Monitoring and Management
#[derive(Parser)]
#[command(name = "gururmm-agent")]
#[command(author, version, about, long_about = None)]
struct Cli {
/// Path to configuration file
#[arg(short, long, default_value = "agent.toml")]
config: PathBuf,
/// Subcommand to run
#[command(subcommand)]
command: Option<Commands>,
}
#[derive(Subcommand)]
enum Commands {
/// Run the agent (default)
Run,
/// Install as a system service
Install {
/// Server WebSocket URL (e.g., wss://rmm-api.example.com/ws)
#[arg(long)]
server_url: Option<String>,
/// API key for authentication
#[arg(long)]
api_key: Option<String>,
/// Skip legacy service detection and cleanup
#[arg(long, default_value = "false")]
skip_legacy_check: bool,
},
/// Uninstall the system service
Uninstall,
/// Start the installed service
Start,
/// Stop the installed service
Stop,
/// Show agent status
Status,
/// Generate a sample configuration file
GenerateConfig {
/// Output path for config file
#[arg(short, long, default_value = "agent.toml")]
output: PathBuf,
},
/// Run as Windows service (called by SCM, not for manual use)
#[command(hide = true)]
Service,
}
/// Shared application state
pub struct AppState {
pub config: AgentConfig,
pub metrics_collector: MetricsCollector,
pub connected: RwLock<bool>,
}
#[tokio::main]
async fn main() -> Result<()> {
// Initialize logging
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("gururmm_agent=info".parse()?)
.add_directive("info".parse()?),
)
.init();
let cli = Cli::parse();
match cli.command.unwrap_or(Commands::Run) {
Commands::Run => run_agent(cli.config).await,
Commands::Install { server_url, api_key, skip_legacy_check } => {
install_service(server_url, api_key, skip_legacy_check).await
}
Commands::Uninstall => uninstall_service().await,
Commands::Start => start_service().await,
Commands::Stop => stop_service().await,
Commands::Status => show_status(cli.config).await,
Commands::GenerateConfig { output } => generate_config(output).await,
Commands::Service => run_as_windows_service(),
}
}
/// Run as a Windows service (called by SCM)
fn run_as_windows_service() -> Result<()> {
#[cfg(windows)]
{
service::windows::run_as_service()
}
#[cfg(not(windows))]
{
anyhow::bail!("Windows service mode is only available on Windows");
}
}
/// Main agent runtime loop
async fn run_agent(config_path: PathBuf) -> Result<()> {
info!("GuruRMM Agent starting...");
// Load configuration
let config = AgentConfig::load(&config_path)?;
info!("Loaded configuration from {:?}", config_path);
info!("Server URL: {}", config.server.url);
// Initialize metrics collector
let metrics_collector = MetricsCollector::new();
info!("Metrics collector initialized");
// Create shared state
let state = Arc::new(AppState {
config: config.clone(),
metrics_collector,
connected: RwLock::new(false),
});
// Start the WebSocket client with auto-reconnect
let ws_state = Arc::clone(&state);
let ws_handle = tokio::spawn(async move {
loop {
info!("Connecting to server...");
match WebSocketClient::connect_and_run(Arc::clone(&ws_state)).await {
Ok(_) => {
warn!("WebSocket connection closed normally, reconnecting...");
}
Err(e) => {
error!("WebSocket error: {}, reconnecting in 10 seconds...", e);
}
}
// Mark as disconnected
*ws_state.connected.write().await = false;
// Wait before reconnecting
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
});
// Start metrics collection loop
let metrics_state = Arc::clone(&state);
let metrics_handle = tokio::spawn(async move {
let interval = metrics_state.config.metrics.interval_seconds;
let mut interval_timer = tokio::time::interval(tokio::time::Duration::from_secs(interval));
loop {
interval_timer.tick().await;
// Collect metrics (they'll be sent via WebSocket if connected)
let metrics = metrics_state.metrics_collector.collect().await;
if *metrics_state.connected.read().await {
info!(
"Metrics: CPU={:.1}%, Mem={:.1}%, Disk={:.1}%",
metrics.cpu_percent, metrics.memory_percent, metrics.disk_percent
);
}
}
});
// Wait for shutdown signal
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Received shutdown signal");
}
_ = ws_handle => {
error!("WebSocket task ended unexpectedly");
}
_ = metrics_handle => {
error!("Metrics task ended unexpectedly");
}
}
info!("GuruRMM Agent shutting down");
Ok(())
}
/// Install the agent as a system service
async fn install_service(
server_url: Option<String>,
api_key: Option<String>,
skip_legacy_check: bool,
) -> Result<()> {
#[cfg(windows)]
{
service::windows::install(server_url, api_key, skip_legacy_check)
}
#[cfg(target_os = "linux")]
{
install_systemd_service(server_url, api_key, skip_legacy_check).await
}
#[cfg(target_os = "macos")]
{
let _ = (server_url, api_key, skip_legacy_check); // Suppress unused warnings
info!("Installing GuruRMM Agent as launchd service...");
todo!("macOS launchd service installation not yet implemented");
}
}
/// Legacy service names to check for and clean up (Linux)
#[cfg(target_os = "linux")]
const LINUX_LEGACY_SERVICE_NAMES: &[&str] = &[
"gururmm", // Old name without -agent suffix
"guru-rmm-agent", // Alternative naming
"GuruRMM-Agent", // Case variant
];
/// Clean up legacy Linux service installations
#[cfg(target_os = "linux")]
fn cleanup_legacy_linux_services() -> Result<()> {
use std::process::Command;
info!("Checking for legacy service installations...");
for legacy_name in LINUX_LEGACY_SERVICE_NAMES {
// Check if service exists
let status = Command::new("systemctl")
.args(["status", legacy_name])
.output();
if let Ok(output) = status {
if output.status.success() || String::from_utf8_lossy(&output.stderr).contains("Loaded:") {
info!("Found legacy service '{}', removing...", legacy_name);
// Stop the service
let _ = Command::new("systemctl")
.args(["stop", legacy_name])
.status();
// Disable the service
let _ = Command::new("systemctl")
.args(["disable", legacy_name])
.status();
// Remove unit file
let unit_file = format!("/etc/systemd/system/{}.service", legacy_name);
if std::path::Path::new(&unit_file).exists() {
info!("Removing legacy unit file: {}", unit_file);
let _ = std::fs::remove_file(&unit_file);
}
}
}
}
// Check for legacy binaries in common locations
let legacy_binary_locations = [
"/usr/local/bin/gururmm",
"/usr/bin/gururmm",
"/opt/gururmm/gururmm",
"/opt/gururmm/agent",
];
for legacy_path in legacy_binary_locations {
if std::path::Path::new(legacy_path).exists() {
info!("Found legacy binary at '{}', removing...", legacy_path);
let _ = std::fs::remove_file(legacy_path);
}
}
// Reload systemd to pick up removed unit files
let _ = Command::new("systemctl")
.args(["daemon-reload"])
.status();
Ok(())
}
/// Install as a systemd service (Linux)
#[cfg(target_os = "linux")]
async fn install_systemd_service(
server_url: Option<String>,
api_key: Option<String>,
skip_legacy_check: bool,
) -> Result<()> {
use std::process::Command;
const SERVICE_NAME: &str = "gururmm-agent";
const INSTALL_DIR: &str = "/usr/local/bin";
const CONFIG_DIR: &str = "/etc/gururmm";
const SYSTEMD_DIR: &str = "/etc/systemd/system";
info!("Installing GuruRMM Agent as systemd service...");
// Check if running as root
if !nix::unistd::geteuid().is_root() {
anyhow::bail!("Installation requires root privileges. Please run with sudo.");
}
// Clean up legacy installations unless skipped
if !skip_legacy_check {
if let Err(e) = cleanup_legacy_linux_services() {
warn!("Legacy cleanup warning: {}", e);
}
}
// Get the current executable path
let current_exe = std::env::current_exe()
.context("Failed to get current executable path")?;
let binary_dest = format!("{}/{}", INSTALL_DIR, SERVICE_NAME);
let config_dest = format!("{}/agent.toml", CONFIG_DIR);
let unit_file = format!("{}/{}.service", SYSTEMD_DIR, SERVICE_NAME);
// Create config directory
info!("Creating config directory: {}", CONFIG_DIR);
std::fs::create_dir_all(CONFIG_DIR)
.context("Failed to create config directory")?;
// Copy binary
info!("Copying binary to: {}", binary_dest);
std::fs::copy(&current_exe, &binary_dest)
.context("Failed to copy binary")?;
// Make binary executable
Command::new("chmod")
.args(["+x", &binary_dest])
.status()
.context("Failed to set binary permissions")?;
// Handle configuration
let config_needs_manual_edit;
if !std::path::Path::new(&config_dest).exists() {
info!("Creating config: {}", config_dest);
// Start with sample config
let mut config = crate::config::AgentConfig::sample();
// Apply provided values
if let Some(url) = &server_url {
config.server.url = url.clone();
}
if let Some(key) = &api_key {
config.server.api_key = key.clone();
}
let toml_str = toml::to_string_pretty(&config)?;
std::fs::write(&config_dest, toml_str)
.context("Failed to write config file")?;
// Set restrictive permissions on config (contains API key)
Command::new("chmod")
.args(["600", &config_dest])
.status()
.context("Failed to set config permissions")?;
config_needs_manual_edit = server_url.is_none() || api_key.is_none();
} else {
info!("Config already exists: {}", config_dest);
config_needs_manual_edit = false;
// If server_url or api_key provided, update existing config
if server_url.is_some() || api_key.is_some() {
info!("Updating existing configuration...");
let config_content = std::fs::read_to_string(&config_dest)?;
let mut config: crate::config::AgentConfig = toml::from_str(&config_content)
.context("Failed to parse existing config")?;
if let Some(url) = &server_url {
config.server.url = url.clone();
}
if let Some(key) = &api_key {
config.server.api_key = key.clone();
}
let toml_str = toml::to_string_pretty(&config)?;
std::fs::write(&config_dest, toml_str)
.context("Failed to update config file")?;
}
}
// Create systemd unit file
let unit_content = format!(r#"[Unit]
Description=GuruRMM Agent - Remote Monitoring and Management
Documentation=https://github.com/azcomputerguru/gururmm
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart={binary} --config {config} run
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
SyslogIdentifier={service}
# Security hardening
NoNewPrivileges=true
ProtectSystem=strict
ProtectHome=read-only
PrivateTmp=true
ReadWritePaths=/var/log
[Install]
WantedBy=multi-user.target
"#,
binary = binary_dest,
config = config_dest,
service = SERVICE_NAME
);
info!("Creating systemd unit file: {}", unit_file);
std::fs::write(&unit_file, unit_content)
.context("Failed to write systemd unit file")?;
// Reload systemd daemon
info!("Reloading systemd daemon...");
let status = Command::new("systemctl")
.args(["daemon-reload"])
.status()
.context("Failed to reload systemd")?;
if !status.success() {
anyhow::bail!("systemctl daemon-reload failed");
}
// Enable the service
info!("Enabling service...");
let status = Command::new("systemctl")
.args(["enable", SERVICE_NAME])
.status()
.context("Failed to enable service")?;
if !status.success() {
anyhow::bail!("systemctl enable failed");
}
println!("\n✓ GuruRMM Agent installed successfully!");
println!("\nInstalled files:");
println!(" Binary: {}", binary_dest);
println!(" Config: {}", config_dest);
println!(" Service: {}", unit_file);
if config_needs_manual_edit {
println!("\n⚠️ IMPORTANT: Edit {} with your server URL and API key!", config_dest);
println!("\nNext steps:");
println!(" 1. Edit {} with your server URL and API key", config_dest);
println!(" 2. Start the service: sudo systemctl start {}", SERVICE_NAME);
} else {
println!("\nStarting service...");
let status = Command::new("systemctl")
.args(["start", SERVICE_NAME])
.status();
if status.is_ok() && status.unwrap().success() {
println!("✓ Service started successfully!");
} else {
println!("⚠️ Failed to start service. Check logs: sudo journalctl -u {} -f", SERVICE_NAME);
}
}
println!("\nUseful commands:");
println!(" Status: sudo systemctl status {}", SERVICE_NAME);
println!(" Logs: sudo journalctl -u {} -f", SERVICE_NAME);
println!(" Stop: sudo systemctl stop {}", SERVICE_NAME);
println!(" Start: sudo systemctl start {}", SERVICE_NAME);
Ok(())
}
/// Uninstall the system service
async fn uninstall_service() -> Result<()> {
#[cfg(windows)]
{
service::windows::uninstall()
}
#[cfg(target_os = "linux")]
{
uninstall_systemd_service().await
}
#[cfg(target_os = "macos")]
{
todo!("macOS service uninstallation not yet implemented");
}
}
/// Uninstall systemd service (Linux)
#[cfg(target_os = "linux")]
async fn uninstall_systemd_service() -> Result<()> {
use std::process::Command;
const SERVICE_NAME: &str = "gururmm-agent";
const INSTALL_DIR: &str = "/usr/local/bin";
const CONFIG_DIR: &str = "/etc/gururmm";
const SYSTEMD_DIR: &str = "/etc/systemd/system";
info!("Uninstalling GuruRMM Agent...");
if !nix::unistd::geteuid().is_root() {
anyhow::bail!("Uninstallation requires root privileges. Please run with sudo.");
}
let binary_path = format!("{}/{}", INSTALL_DIR, SERVICE_NAME);
let unit_file = format!("{}/{}.service", SYSTEMD_DIR, SERVICE_NAME);
// Stop the service if running
info!("Stopping service...");
let _ = Command::new("systemctl")
.args(["stop", SERVICE_NAME])
.status();
// Disable the service
info!("Disabling service...");
let _ = Command::new("systemctl")
.args(["disable", SERVICE_NAME])
.status();
// Remove unit file
if std::path::Path::new(&unit_file).exists() {
info!("Removing unit file: {}", unit_file);
std::fs::remove_file(&unit_file)?;
}
// Remove binary
if std::path::Path::new(&binary_path).exists() {
info!("Removing binary: {}", binary_path);
std::fs::remove_file(&binary_path)?;
}
// Reload systemd
let _ = Command::new("systemctl")
.args(["daemon-reload"])
.status();
println!("\n✓ GuruRMM Agent uninstalled successfully!");
println!("\nNote: Config directory {} was preserved.", CONFIG_DIR);
println!("Remove it manually if no longer needed: sudo rm -rf {}", CONFIG_DIR);
Ok(())
}
/// Start the installed service
async fn start_service() -> Result<()> {
#[cfg(windows)]
{
service::windows::start()
}
#[cfg(target_os = "linux")]
{
use std::process::Command;
info!("Starting GuruRMM Agent service...");
let status = Command::new("systemctl")
.args(["start", "gururmm-agent"])
.status()
.context("Failed to start service")?;
if status.success() {
println!("** Service started successfully");
println!("Check status: sudo systemctl status gururmm-agent");
} else {
anyhow::bail!("Failed to start service. Check: sudo journalctl -u gururmm-agent -n 50");
}
Ok(())
}
#[cfg(target_os = "macos")]
{
todo!("macOS service start not yet implemented");
}
}
/// Stop the installed service
async fn stop_service() -> Result<()> {
#[cfg(windows)]
{
service::windows::stop()
}
#[cfg(target_os = "linux")]
{
use std::process::Command;
info!("Stopping GuruRMM Agent service...");
let status = Command::new("systemctl")
.args(["stop", "gururmm-agent"])
.status()
.context("Failed to stop service")?;
if status.success() {
println!("** Service stopped successfully");
} else {
anyhow::bail!("Failed to stop service");
}
Ok(())
}
#[cfg(target_os = "macos")]
{
todo!("macOS service stop not yet implemented");
}
}
/// Show agent status
async fn show_status(config_path: PathBuf) -> Result<()> {
// On Windows, show service status
#[cfg(windows)]
{
service::windows::status()?;
println!();
}
// Try to load config for additional info
match AgentConfig::load(&config_path) {
Ok(config) => {
println!("Configuration");
println!("=============");
println!("Config file: {:?}", config_path);
println!("Server URL: {}", config.server.url);
println!("Metrics interval: {} seconds", config.metrics.interval_seconds);
println!("Watchdog enabled: {}", config.watchdog.enabled);
// Collect current metrics
let collector = MetricsCollector::new();
let metrics = collector.collect().await;
println!("\nCurrent System Metrics:");
println!(" CPU Usage: {:.1}%", metrics.cpu_percent);
println!(" Memory Usage: {:.1}%", metrics.memory_percent);
println!(
" Memory Used: {:.2} GB",
metrics.memory_used_bytes as f64 / 1_073_741_824.0
);
println!(" Disk Usage: {:.1}%", metrics.disk_percent);
println!(
" Disk Used: {:.2} GB",
metrics.disk_used_bytes as f64 / 1_073_741_824.0
);
}
Err(_) => {
println!("\nConfig file {:?} not found or invalid.", config_path);
#[cfg(windows)]
println!("Service config location: {}\\agent.toml", service::windows::CONFIG_DIR);
}
}
Ok(())
}
/// Generate a sample configuration file
async fn generate_config(output: PathBuf) -> Result<()> {
let sample_config = AgentConfig::sample();
let toml_str = toml::to_string_pretty(&sample_config)?;
std::fs::write(&output, toml_str)?;
println!("Sample configuration written to {:?}", output);
println!("\nEdit this file with your server URL and API key, then run:");
println!(" gururmm-agent --config {:?} run", output);
Ok(())
}

View File

@@ -0,0 +1,605 @@
//! System metrics collection module
//!
//! Uses the `sysinfo` crate for cross-platform system metrics collection.
//! Collects CPU, memory, disk, and network statistics.
//! Uses `local-ip-address` for network interface enumeration.
use chrono::{DateTime, Utc};
use local_ip_address::list_afinet_netifas;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::Mutex;
use sysinfo::{CpuRefreshKind, Disks, MemoryRefreshKind, Networks, RefreshKind, System, Users};
/// System metrics data structure
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemMetrics {
/// Timestamp when metrics were collected
pub timestamp: DateTime<Utc>,
/// CPU usage percentage (0-100)
pub cpu_percent: f32,
/// Memory usage percentage (0-100)
pub memory_percent: f32,
/// Memory used in bytes
pub memory_used_bytes: u64,
/// Total memory in bytes
pub memory_total_bytes: u64,
/// Disk usage percentage (0-100) - primary disk
pub disk_percent: f32,
/// Disk used in bytes - primary disk
pub disk_used_bytes: u64,
/// Total disk space in bytes - primary disk
pub disk_total_bytes: u64,
/// Network bytes received since last collection
pub network_rx_bytes: u64,
/// Network bytes transmitted since last collection
pub network_tx_bytes: u64,
/// Operating system type
pub os_type: String,
/// Operating system version
pub os_version: String,
/// System hostname
pub hostname: String,
/// System uptime in seconds
#[serde(default)]
pub uptime_seconds: u64,
/// Boot time as Unix timestamp
#[serde(default)]
pub boot_time: i64,
/// Logged in username (if available)
#[serde(default)]
pub logged_in_user: Option<String>,
/// User idle time in seconds (time since last input)
#[serde(default)]
pub user_idle_seconds: Option<u64>,
/// Public/WAN IP address (fetched periodically)
#[serde(default)]
pub public_ip: Option<String>,
}
/// Metrics collector using sysinfo
pub struct MetricsCollector {
/// System info instance (needs to be refreshed for each collection)
system: Mutex<System>,
/// Previous network stats for delta calculation
prev_network_rx: Mutex<u64>,
prev_network_tx: Mutex<u64>,
/// Cached public IP (refreshed less frequently)
cached_public_ip: Mutex<Option<String>>,
/// Last time public IP was fetched
last_public_ip_fetch: Mutex<Option<std::time::Instant>>,
}
impl MetricsCollector {
/// Create a new metrics collector
pub fn new() -> Self {
// Create system with minimal initial refresh
let system = System::new_with_specifics(
RefreshKind::new()
.with_cpu(CpuRefreshKind::everything())
.with_memory(MemoryRefreshKind::everything()),
);
Self {
system: Mutex::new(system),
prev_network_rx: Mutex::new(0),
prev_network_tx: Mutex::new(0),
cached_public_ip: Mutex::new(None),
last_public_ip_fetch: Mutex::new(None),
}
}
/// Collect current system metrics
pub async fn collect(&self) -> SystemMetrics {
// Collect CPU - need to do two refreshes with delay for accurate reading
// We release the lock between operations to avoid holding MutexGuard across await
{
let mut system = self.system.lock().unwrap();
system.refresh_cpu_all();
}
// Small delay for CPU measurement accuracy
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
// Collect all synchronous metrics first, in a block that releases all locks
let (
cpu_percent,
memory_percent,
memory_used,
memory_total,
disk_percent,
disk_used,
disk_total,
delta_rx,
delta_tx,
os_type,
os_version,
hostname,
uptime_seconds,
boot_time,
logged_in_user,
user_idle_seconds,
) = {
// Acquire system lock
let mut system = self.system.lock().unwrap();
system.refresh_cpu_all();
system.refresh_memory();
// Calculate CPU usage (average across all cores)
let cpu_percent = system.global_cpu_usage();
// Memory metrics
let memory_used = system.used_memory();
let memory_total = system.total_memory();
let memory_percent = if memory_total > 0 {
(memory_used as f32 / memory_total as f32) * 100.0
} else {
0.0
};
// Disk metrics (use first/primary disk)
let disks = Disks::new_with_refreshed_list();
let (disk_used, disk_total, disk_percent) = disks
.iter()
.next()
.map(|d| {
let total = d.total_space();
let available = d.available_space();
let used = total.saturating_sub(available);
let percent = if total > 0 {
(used as f32 / total as f32) * 100.0
} else {
0.0
};
(used, total, percent)
})
.unwrap_or((0, 0, 0.0));
// Network metrics (sum all interfaces)
let networks = Networks::new_with_refreshed_list();
let (total_rx, total_tx): (u64, u64) = networks
.iter()
.map(|(_, data)| (data.total_received(), data.total_transmitted()))
.fold((0, 0), |(acc_rx, acc_tx), (rx, tx)| {
(acc_rx + rx, acc_tx + tx)
});
// Calculate delta from previous collection
let (delta_rx, delta_tx) = {
let mut prev_rx = self.prev_network_rx.lock().unwrap();
let mut prev_tx = self.prev_network_tx.lock().unwrap();
let delta_rx = total_rx.saturating_sub(*prev_rx);
let delta_tx = total_tx.saturating_sub(*prev_tx);
*prev_rx = total_rx;
*prev_tx = total_tx;
(delta_rx, delta_tx)
};
// Get OS info
let os_type = std::env::consts::OS.to_string();
let os_version = System::os_version().unwrap_or_else(|| "unknown".to_string());
let hostname = System::host_name().unwrap_or_else(|| "unknown".to_string());
// Get uptime and boot time
let uptime_seconds = System::uptime();
let boot_time = System::boot_time() as i64;
// Get logged in user
let logged_in_user = self.get_logged_in_user();
// Get user idle time (platform-specific)
let user_idle_seconds = self.get_user_idle_time();
// Return all values - locks are dropped at end of this block
(
cpu_percent,
memory_percent,
memory_used,
memory_total,
disk_percent,
disk_used,
disk_total,
delta_rx,
delta_tx,
os_type,
os_version,
hostname,
uptime_seconds,
boot_time,
logged_in_user,
user_idle_seconds,
)
};
// All locks are now released - safe to do async work
// Get public IP (cached, refreshed every 5 minutes)
let public_ip = self.get_public_ip().await;
SystemMetrics {
timestamp: Utc::now(),
cpu_percent,
memory_percent,
memory_used_bytes: memory_used,
memory_total_bytes: memory_total,
disk_percent,
disk_used_bytes: disk_used,
disk_total_bytes: disk_total,
network_rx_bytes: delta_rx,
network_tx_bytes: delta_tx,
os_type,
os_version,
hostname,
uptime_seconds,
boot_time,
logged_in_user,
user_idle_seconds,
public_ip,
}
}
/// Get the currently logged in user
fn get_logged_in_user(&self) -> Option<String> {
let users = Users::new_with_refreshed_list();
// Return the first user found (typically the console user)
users.iter().next().map(|u| u.name().to_string())
}
/// Get user idle time in seconds (time since last keyboard/mouse input)
#[cfg(target_os = "windows")]
fn get_user_idle_time(&self) -> Option<u64> {
// Windows: Use GetLastInputInfo API
use std::mem;
#[repr(C)]
struct LASTINPUTINFO {
cb_size: u32,
dw_time: u32,
}
extern "system" {
fn GetLastInputInfo(plii: *mut LASTINPUTINFO) -> i32;
fn GetTickCount() -> u32;
}
unsafe {
let mut lii = LASTINPUTINFO {
cb_size: mem::size_of::<LASTINPUTINFO>() as u32,
dw_time: 0,
};
if GetLastInputInfo(&mut lii) != 0 {
let idle_ms = GetTickCount().wrapping_sub(lii.dw_time);
Some((idle_ms / 1000) as u64)
} else {
None
}
}
}
/// Get user idle time in seconds (Unix/macOS)
#[cfg(not(target_os = "windows"))]
fn get_user_idle_time(&self) -> Option<u64> {
// Unix: Check /dev/tty* or use platform-specific APIs
// For now, return None - can be enhanced with X11/Wayland idle detection
None
}
/// Get public IP address (cached for 5 minutes)
async fn get_public_ip(&self) -> Option<String> {
use std::time::{Duration, Instant};
const REFRESH_INTERVAL: Duration = Duration::from_secs(300); // 5 minutes
// Check if we have a cached value that's still fresh
{
let last_fetch = self.last_public_ip_fetch.lock().unwrap();
let cached_ip = self.cached_public_ip.lock().unwrap();
if let Some(last) = *last_fetch {
if last.elapsed() < REFRESH_INTERVAL {
return cached_ip.clone();
}
}
}
// Fetch new public IP
let new_ip = self.fetch_public_ip().await;
// Update cache
{
let mut last_fetch = self.last_public_ip_fetch.lock().unwrap();
let mut cached_ip = self.cached_public_ip.lock().unwrap();
*last_fetch = Some(Instant::now());
*cached_ip = new_ip.clone();
}
new_ip
}
/// Fetch public IP from external service
async fn fetch_public_ip(&self) -> Option<String> {
// Try multiple services for reliability
let services = [
"https://api.ipify.org",
"https://ifconfig.me/ip",
"https://icanhazip.com",
];
for service in &services {
match reqwest::get(*service).await {
Ok(resp) if resp.status().is_success() => {
if let Ok(ip) = resp.text().await {
let ip = ip.trim().to_string();
// Basic validation: should look like an IP
if ip.parse::<std::net::IpAddr>().is_ok() {
return Some(ip);
}
}
}
_ => continue,
}
}
None
}
/// Get basic system info (for registration)
pub fn get_system_info(&self) -> SystemInfo {
let system = self.system.lock().unwrap();
SystemInfo {
os_type: std::env::consts::OS.to_string(),
os_version: System::os_version().unwrap_or_else(|| "unknown".to_string()),
hostname: System::host_name().unwrap_or_else(|| "unknown".to_string()),
cpu_count: system.cpus().len() as u32,
total_memory_bytes: system.total_memory(),
}
}
}
impl Default for MetricsCollector {
fn default() -> Self {
Self::new()
}
}
/// Basic system information (for agent registration)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SystemInfo {
/// Operating system type (windows, linux, macos)
pub os_type: String,
/// Operating system version
pub os_version: String,
/// System hostname
pub hostname: String,
/// Number of CPU cores
pub cpu_count: u32,
/// Total memory in bytes
pub total_memory_bytes: u64,
}
/// Network interface information
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NetworkInterface {
/// Interface name (e.g., "eth0", "Wi-Fi", "Ethernet")
pub name: String,
/// MAC address (if available from sysinfo)
pub mac_address: Option<String>,
/// IPv4 addresses assigned to this interface
pub ipv4_addresses: Vec<String>,
/// IPv6 addresses assigned to this interface
pub ipv6_addresses: Vec<String>,
}
/// Complete network state (sent on connect and on change)
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NetworkState {
/// Timestamp when network state was collected
pub timestamp: DateTime<Utc>,
/// All network interfaces with their addresses
pub interfaces: Vec<NetworkInterface>,
/// Hash of the network state for quick change detection
pub state_hash: String,
}
impl NetworkState {
/// Collect current network state from the system
pub fn collect() -> Self {
let mut interface_map: HashMap<String, NetworkInterface> = HashMap::new();
// Get IP addresses from local-ip-address crate
if let Ok(netifas) = list_afinet_netifas() {
for (name, ip) in netifas {
let entry = interface_map.entry(name.clone()).or_insert_with(|| {
NetworkInterface {
name: name.clone(),
mac_address: None,
ipv4_addresses: Vec::new(),
ipv6_addresses: Vec::new(),
}
});
match ip {
IpAddr::V4(addr) => {
let addr_str = addr.to_string();
if !entry.ipv4_addresses.contains(&addr_str) {
entry.ipv4_addresses.push(addr_str);
}
}
IpAddr::V6(addr) => {
let addr_str = addr.to_string();
if !entry.ipv6_addresses.contains(&addr_str) {
entry.ipv6_addresses.push(addr_str);
}
}
}
}
}
// Get MAC addresses from sysinfo
let networks = Networks::new_with_refreshed_list();
for (name, data) in &networks {
if let Some(entry) = interface_map.get_mut(name) {
let mac = data.mac_address();
let mac_str = format!(
"{:02X}:{:02X}:{:02X}:{:02X}:{:02X}:{:02X}",
mac.0[0], mac.0[1], mac.0[2], mac.0[3], mac.0[4], mac.0[5]
);
// Don't store empty/null MACs
if mac_str != "00:00:00:00:00:00" {
entry.mac_address = Some(mac_str);
}
}
}
// Convert to sorted vec for consistent ordering
let mut interfaces: Vec<NetworkInterface> = interface_map.into_values().collect();
interfaces.sort_by(|a, b| a.name.cmp(&b.name));
// Filter out loopback and link-local only interfaces
interfaces.retain(|iface| {
// Keep if has any non-loopback IPv4
let has_real_ipv4 = iface.ipv4_addresses.iter().any(|ip| {
!ip.starts_with("127.") && !ip.starts_with("169.254.")
});
// Keep if has any non-link-local IPv6
let has_real_ipv6 = iface.ipv6_addresses.iter().any(|ip| {
!ip.starts_with("fe80:") && !ip.starts_with("::1")
});
has_real_ipv4 || has_real_ipv6
});
// Generate hash for change detection
let state_hash = Self::compute_hash(&interfaces);
NetworkState {
timestamp: Utc::now(),
interfaces,
state_hash,
}
}
/// Compute a simple hash of the network state for change detection
fn compute_hash(interfaces: &[NetworkInterface]) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
for iface in interfaces {
iface.name.hash(&mut hasher);
iface.mac_address.hash(&mut hasher);
for ip in &iface.ipv4_addresses {
ip.hash(&mut hasher);
}
for ip in &iface.ipv6_addresses {
ip.hash(&mut hasher);
}
}
format!("{:016x}", hasher.finish())
}
/// Check if network state has changed compared to another state
pub fn has_changed(&self, other: &NetworkState) -> bool {
self.state_hash != other.state_hash
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_metrics_collection() {
let collector = MetricsCollector::new();
let metrics = collector.collect().await;
// Basic sanity checks
assert!(metrics.cpu_percent >= 0.0 && metrics.cpu_percent <= 100.0);
assert!(metrics.memory_percent >= 0.0 && metrics.memory_percent <= 100.0);
assert!(metrics.memory_total_bytes > 0);
assert!(!metrics.os_type.is_empty());
assert!(!metrics.hostname.is_empty());
}
#[test]
fn test_system_info() {
let collector = MetricsCollector::new();
let info = collector.get_system_info();
assert!(!info.os_type.is_empty());
assert!(!info.hostname.is_empty());
assert!(info.cpu_count > 0);
assert!(info.total_memory_bytes > 0);
}
#[test]
fn test_network_state_collection() {
let state = NetworkState::collect();
// Should have a valid timestamp
assert!(state.timestamp <= Utc::now());
// Should have a hash
assert!(!state.state_hash.is_empty());
assert_eq!(state.state_hash.len(), 16); // 64-bit hash as hex
// Print for debugging
println!("Network state collected:");
for iface in &state.interfaces {
println!(" {}: IPv4={:?}, IPv6={:?}, MAC={:?}",
iface.name, iface.ipv4_addresses, iface.ipv6_addresses, iface.mac_address);
}
}
#[test]
fn test_network_state_change_detection() {
let state1 = NetworkState::collect();
let state2 = NetworkState::collect();
// Same state should have same hash
assert!(!state1.has_changed(&state2));
// Create a modified state
let mut modified = state1.clone();
if let Some(iface) = modified.interfaces.first_mut() {
iface.ipv4_addresses.push("10.99.99.99".to_string());
}
modified.state_hash = NetworkState::compute_hash(&modified.interfaces);
// Modified state should be detected as changed
assert!(state1.has_changed(&modified));
}
}

View File

@@ -0,0 +1,777 @@
//! Windows Service implementation for GuruRMM Agent
//!
//! This module implements the Windows Service Control Manager (SCM) protocol,
//! allowing the agent to run as a native Windows service without third-party wrappers.
#[cfg(all(windows, feature = "native-service"))]
pub mod windows {
use std::ffi::OsString;
use std::path::PathBuf;
use std::sync::mpsc;
use std::time::Duration;
use anyhow::{Context, Result};
use tracing::{error, info, warn};
use windows_service::{
define_windows_service,
service::{
ServiceAccess, ServiceControl, ServiceControlAccept, ServiceErrorControl,
ServiceExitCode, ServiceInfo, ServiceStartType, ServiceState, ServiceStatus,
ServiceType,
},
service_control_handler::{self, ServiceControlHandlerResult},
service_dispatcher, service_manager::{ServiceManager, ServiceManagerAccess},
};
pub const SERVICE_NAME: &str = "GuruRMMAgent";
pub const SERVICE_DISPLAY_NAME: &str = "GuruRMM Agent";
pub const SERVICE_DESCRIPTION: &str =
"GuruRMM Agent - Remote Monitoring and Management service";
pub const INSTALL_DIR: &str = r"C:\Program Files\GuruRMM";
pub const CONFIG_DIR: &str = r"C:\ProgramData\GuruRMM";
// Generate the Windows service boilerplate
define_windows_service!(ffi_service_main, service_main);
/// Entry point called by the Windows Service Control Manager
pub fn run_as_service() -> Result<()> {
// This function is called when Windows starts the service.
// It blocks until the service is stopped.
service_dispatcher::start(SERVICE_NAME, ffi_service_main)
.context("Failed to start service dispatcher")?;
Ok(())
}
/// Main service function called by the SCM
fn service_main(arguments: Vec<OsString>) {
if let Err(e) = run_service(arguments) {
error!("Service error: {}", e);
}
}
/// The actual service implementation
fn run_service(_arguments: Vec<OsString>) -> Result<()> {
// Create a channel to receive stop events
let (shutdown_tx, shutdown_rx) = mpsc::channel();
// Create the service control handler
let event_handler = move |control_event| -> ServiceControlHandlerResult {
match control_event {
ServiceControl::Stop => {
info!("Received stop command from SCM");
let _ = shutdown_tx.send(());
ServiceControlHandlerResult::NoError
}
ServiceControl::Interrogate => ServiceControlHandlerResult::NoError,
ServiceControl::Shutdown => {
info!("Received shutdown command from SCM");
let _ = shutdown_tx.send(());
ServiceControlHandlerResult::NoError
}
_ => ServiceControlHandlerResult::NotImplemented,
}
};
// Register the service control handler
let status_handle = service_control_handler::register(SERVICE_NAME, event_handler)
.context("Failed to register service control handler")?;
// Report that we're starting
status_handle
.set_service_status(ServiceStatus {
service_type: ServiceType::OWN_PROCESS,
current_state: ServiceState::StartPending,
controls_accepted: ServiceControlAccept::empty(),
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: Duration::from_secs(10),
process_id: None,
})
.context("Failed to set StartPending status")?;
// Determine config path
let config_path = PathBuf::from(format!(r"{}\\agent.toml", CONFIG_DIR));
// Create the tokio runtime for the agent
let runtime = tokio::runtime::Runtime::new().context("Failed to create tokio runtime")?;
// Start the agent in the runtime
let agent_result = runtime.block_on(async {
// Load configuration
let config = match crate::config::AgentConfig::load(&config_path) {
Ok(c) => c,
Err(e) => {
error!("Failed to load config from {:?}: {}", config_path, e);
return Err(anyhow::anyhow!("Config load failed: {}", e));
}
};
info!("GuruRMM Agent service starting...");
info!("Config loaded from {:?}", config_path);
info!("Server URL: {}", config.server.url);
// Initialize metrics collector
let metrics_collector = crate::metrics::MetricsCollector::new();
info!("Metrics collector initialized");
// Create shared state
let state = std::sync::Arc::new(crate::AppState {
config: config.clone(),
metrics_collector,
connected: tokio::sync::RwLock::new(false),
});
// Report that we're running
status_handle
.set_service_status(ServiceStatus {
service_type: ServiceType::OWN_PROCESS,
current_state: ServiceState::Running,
controls_accepted: ServiceControlAccept::STOP | ServiceControlAccept::SHUTDOWN,
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: Duration::default(),
process_id: None,
})
.context("Failed to set Running status")?;
// Start WebSocket client task
let ws_state = std::sync::Arc::clone(&state);
let ws_handle = tokio::spawn(async move {
loop {
info!("Connecting to server...");
match crate::transport::WebSocketClient::connect_and_run(std::sync::Arc::clone(
&ws_state,
))
.await
{
Ok(_) => {
warn!("WebSocket connection closed normally, reconnecting...");
}
Err(e) => {
error!("WebSocket error: {}, reconnecting in 10 seconds...", e);
}
}
*ws_state.connected.write().await = false;
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;
}
});
// Start metrics collection task
let metrics_state = std::sync::Arc::clone(&state);
let metrics_handle = tokio::spawn(async move {
let interval = metrics_state.config.metrics.interval_seconds;
let mut interval_timer =
tokio::time::interval(tokio::time::Duration::from_secs(interval));
loop {
interval_timer.tick().await;
let metrics = metrics_state.metrics_collector.collect().await;
if *metrics_state.connected.read().await {
info!(
"Metrics: CPU={:.1}%, Mem={:.1}%, Disk={:.1}%",
metrics.cpu_percent, metrics.memory_percent, metrics.disk_percent
);
}
}
});
// Wait for shutdown signal from SCM
// We use a separate task to poll the channel since it's not async
let shutdown_handle = tokio::spawn(async move {
loop {
match shutdown_rx.try_recv() {
Ok(_) => {
info!("Shutdown signal received");
break;
}
Err(mpsc::TryRecvError::Empty) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Err(mpsc::TryRecvError::Disconnected) => {
warn!("Shutdown channel disconnected");
break;
}
}
}
});
// Wait for shutdown
tokio::select! {
_ = shutdown_handle => {
info!("Service shutting down gracefully");
}
_ = ws_handle => {
error!("WebSocket task ended unexpectedly");
}
_ = metrics_handle => {
error!("Metrics task ended unexpectedly");
}
}
Ok::<(), anyhow::Error>(())
});
// Report that we're stopping
status_handle
.set_service_status(ServiceStatus {
service_type: ServiceType::OWN_PROCESS,
current_state: ServiceState::StopPending,
controls_accepted: ServiceControlAccept::empty(),
exit_code: ServiceExitCode::Win32(0),
checkpoint: 0,
wait_hint: Duration::from_secs(5),
process_id: None,
})
.ok();
// Report that we've stopped
status_handle
.set_service_status(ServiceStatus {
service_type: ServiceType::OWN_PROCESS,
current_state: ServiceState::Stopped,
controls_accepted: ServiceControlAccept::empty(),
exit_code: match &agent_result {
Ok(_) => ServiceExitCode::Win32(0),
Err(_) => ServiceExitCode::Win32(1),
},
checkpoint: 0,
wait_hint: Duration::default(),
process_id: None,
})
.ok();
agent_result
}
/// Known legacy service names to check and remove
const LEGACY_SERVICE_NAMES: &[&str] = &[
"GuruRMM-Agent", // NSSM-based service name
"gururmm-agent", // Alternative casing
];
/// Detect and remove legacy service installations (e.g., NSSM-based)
fn cleanup_legacy_services() -> Result<()> {
let manager = match ServiceManager::local_computer(
None::<&str>,
ServiceManagerAccess::CONNECT,
) {
Ok(m) => m,
Err(_) => return Ok(()), // Can't connect, skip legacy cleanup
};
for legacy_name in LEGACY_SERVICE_NAMES {
if let Ok(service) = manager.open_service(
*legacy_name,
ServiceAccess::QUERY_STATUS | ServiceAccess::STOP | ServiceAccess::DELETE,
) {
info!("Found legacy service '{}', removing...", legacy_name);
// Stop if running
if let Ok(status) = service.query_status() {
if status.current_state != ServiceState::Stopped {
info!("Stopping legacy service...");
let _ = service.stop();
std::thread::sleep(Duration::from_secs(3));
}
}
// Delete the service
match service.delete() {
Ok(_) => {
println!("** Removed legacy service: {}", legacy_name);
}
Err(e) => {
warn!("Failed to delete legacy service '{}': {}", legacy_name, e);
}
}
}
}
// Also check for NSSM in registry/service config
// NSSM services have specific registry keys under HKLM\SYSTEM\CurrentControlSet\Services\{name}\Parameters
for legacy_name in LEGACY_SERVICE_NAMES {
let params_key = format!(
r"SYSTEM\CurrentControlSet\Services\{}\Parameters",
legacy_name
);
// If this key exists, it was likely an NSSM service
if let Ok(output) = std::process::Command::new("reg")
.args(["query", &format!(r"HKLM\{}", params_key)])
.output()
{
if output.status.success() {
info!("Found NSSM registry keys for '{}', cleaning up...", legacy_name);
let _ = std::process::Command::new("reg")
.args(["delete", &format!(r"HKLM\{}", params_key), "/f"])
.output();
}
}
}
Ok(())
}
/// Install the agent as a Windows service using native APIs
pub fn install(
server_url: Option<String>,
api_key: Option<String>,
skip_legacy_check: bool,
) -> Result<()> {
info!("Installing GuruRMM Agent as Windows service...");
// Clean up legacy installations unless skipped
if !skip_legacy_check {
info!("Checking for legacy service installations...");
if let Err(e) = cleanup_legacy_services() {
warn!("Legacy cleanup warning: {}", e);
}
}
// Get the current executable path
let current_exe =
std::env::current_exe().context("Failed to get current executable path")?;
let binary_dest = PathBuf::from(format!(r"{}\\gururmm-agent.exe", INSTALL_DIR));
let config_dest = PathBuf::from(format!(r"{}\\agent.toml", CONFIG_DIR));
// Create directories
info!("Creating directories...");
std::fs::create_dir_all(INSTALL_DIR).context("Failed to create install directory")?;
std::fs::create_dir_all(CONFIG_DIR).context("Failed to create config directory")?;
// Copy binary
info!("Copying binary to: {:?}", binary_dest);
std::fs::copy(&current_exe, &binary_dest).context("Failed to copy binary")?;
// Handle configuration
let config_needs_manual_edit;
if !config_dest.exists() {
info!("Creating config: {:?}", config_dest);
// Start with sample config
let mut config = crate::config::AgentConfig::sample();
// Apply provided values
if let Some(url) = &server_url {
config.server.url = url.clone();
}
if let Some(key) = &api_key {
config.server.api_key = key.clone();
}
let toml_str = toml::to_string_pretty(&config)?;
std::fs::write(&config_dest, toml_str).context("Failed to write config file")?;
config_needs_manual_edit = server_url.is_none() || api_key.is_none();
} else {
info!("Config already exists: {:?}", config_dest);
config_needs_manual_edit = false;
// If server_url or api_key provided, update existing config
if server_url.is_some() || api_key.is_some() {
info!("Updating existing configuration...");
let config_content = std::fs::read_to_string(&config_dest)?;
let mut config: crate::config::AgentConfig = toml::from_str(&config_content)
.context("Failed to parse existing config")?;
if let Some(url) = &server_url {
config.server.url = url.clone();
}
if let Some(key) = &api_key {
config.server.api_key = key.clone();
}
let toml_str = toml::to_string_pretty(&config)?;
std::fs::write(&config_dest, toml_str)
.context("Failed to update config file")?;
}
}
// Open the service manager
let manager = ServiceManager::local_computer(
None::<&str>,
ServiceManagerAccess::CONNECT | ServiceManagerAccess::CREATE_SERVICE,
)
.context("Failed to connect to Service Control Manager. Run as Administrator.")?;
// Check if service already exists
if let Ok(service) = manager.open_service(
SERVICE_NAME,
ServiceAccess::QUERY_STATUS | ServiceAccess::DELETE | ServiceAccess::STOP,
) {
info!("Removing existing service...");
// Stop the service if running
if let Ok(status) = service.query_status() {
if status.current_state != ServiceState::Stopped {
let _ = service.stop();
std::thread::sleep(Duration::from_secs(2));
}
}
// Delete the service
service.delete().context("Failed to delete existing service")?;
drop(service);
// Wait for deletion to complete
std::thread::sleep(Duration::from_secs(2));
}
// Create the service
// The service binary is called with "service" subcommand when started by SCM
let service_binary_path = format!(r#""{}" service"#, binary_dest.display());
info!("Creating service with path: {}", service_binary_path);
let service_info = ServiceInfo {
name: OsString::from(SERVICE_NAME),
display_name: OsString::from(SERVICE_DISPLAY_NAME),
service_type: ServiceType::OWN_PROCESS,
start_type: ServiceStartType::AutoStart,
error_control: ServiceErrorControl::Normal,
executable_path: binary_dest.clone(),
launch_arguments: vec![OsString::from("service")],
dependencies: vec![],
account_name: None, // LocalSystem
account_password: None,
};
let service = manager
.create_service(&service_info, ServiceAccess::CHANGE_CONFIG | ServiceAccess::START)
.context("Failed to create service")?;
// Set description
service
.set_description(SERVICE_DESCRIPTION)
.context("Failed to set service description")?;
// Configure recovery options using sc.exe (windows-service crate doesn't support this directly)
info!("Configuring recovery options...");
let _ = std::process::Command::new("sc")
.args([
"failure",
SERVICE_NAME,
"reset=86400",
"actions=restart/60000/restart/60000/restart/60000",
])
.output();
println!("\n** GuruRMM Agent installed successfully!");
println!("\nInstalled files:");
println!(" Binary: {:?}", binary_dest);
println!(" Config: {:?}", config_dest);
if config_needs_manual_edit {
println!("\n** IMPORTANT: Edit {:?} with your server URL and API key!", config_dest);
println!("\nNext steps:");
println!(" 1. Edit {:?} with your server URL and API key", config_dest);
println!(" 2. Start the service:");
println!(" gururmm-agent start");
println!(" Or: sc start {}", SERVICE_NAME);
} else {
println!("\nStarting service...");
if let Err(e) = start() {
println!("** Failed to start service: {}. Start manually with:", e);
println!(" gururmm-agent start");
} else {
println!("** Service started successfully!");
}
}
println!("\nUseful commands:");
println!(" Status: gururmm-agent status");
println!(" Stop: gururmm-agent stop");
println!(" Start: gururmm-agent start");
Ok(())
}
/// Uninstall the Windows service
pub fn uninstall() -> Result<()> {
info!("Uninstalling GuruRMM Agent...");
let binary_path = PathBuf::from(format!(r"{}\\gururmm-agent.exe", INSTALL_DIR));
// Open the service manager
let manager = ServiceManager::local_computer(
None::<&str>,
ServiceManagerAccess::CONNECT,
)
.context("Failed to connect to Service Control Manager. Run as Administrator.")?;
// Open the service
match manager.open_service(
SERVICE_NAME,
ServiceAccess::QUERY_STATUS | ServiceAccess::STOP | ServiceAccess::DELETE,
) {
Ok(service) => {
// Stop if running
if let Ok(status) = service.query_status() {
if status.current_state != ServiceState::Stopped {
info!("Stopping service...");
let _ = service.stop();
std::thread::sleep(Duration::from_secs(3));
}
}
// Delete the service
info!("Deleting service...");
service.delete().context("Failed to delete service")?;
}
Err(_) => {
warn!("Service was not installed");
}
}
// Remove binary
if binary_path.exists() {
info!("Removing binary: {:?}", binary_path);
// Wait a bit for service to fully stop
std::thread::sleep(Duration::from_secs(1));
if let Err(e) = std::fs::remove_file(&binary_path) {
warn!("Failed to remove binary (may be in use): {}", e);
}
}
// Remove install directory if empty
let _ = std::fs::remove_dir(INSTALL_DIR);
println!("\n** GuruRMM Agent uninstalled successfully!");
println!(
"\nNote: Config directory {:?} was preserved.",
CONFIG_DIR
);
println!("Remove it manually if no longer needed.");
Ok(())
}
/// Start the installed service
pub fn start() -> Result<()> {
info!("Starting GuruRMM Agent service...");
let manager = ServiceManager::local_computer(
None::<&str>,
ServiceManagerAccess::CONNECT,
)
.context("Failed to connect to Service Control Manager")?;
let service = manager
.open_service(SERVICE_NAME, ServiceAccess::START | ServiceAccess::QUERY_STATUS)
.context("Failed to open service. Is it installed?")?;
service
.start::<String>(&[])
.context("Failed to start service")?;
// Wait briefly and check status
std::thread::sleep(Duration::from_secs(2));
let status = service.query_status()?;
match status.current_state {
ServiceState::Running => {
println!("** Service started successfully");
println!("Check status: gururmm-agent status");
}
ServiceState::StartPending => {
println!("** Service is starting...");
println!("Check status: gururmm-agent status");
}
other => {
println!("Service state: {:?}", other);
}
}
Ok(())
}
/// Stop the installed service
pub fn stop() -> Result<()> {
info!("Stopping GuruRMM Agent service...");
let manager = ServiceManager::local_computer(
None::<&str>,
ServiceManagerAccess::CONNECT,
)
.context("Failed to connect to Service Control Manager")?;
let service = manager
.open_service(SERVICE_NAME, ServiceAccess::STOP | ServiceAccess::QUERY_STATUS)
.context("Failed to open service. Is it installed?")?;
service.stop().context("Failed to stop service")?;
// Wait and verify
std::thread::sleep(Duration::from_secs(2));
let status = service.query_status()?;
match status.current_state {
ServiceState::Stopped => {
println!("** Service stopped successfully");
}
ServiceState::StopPending => {
println!("** Service is stopping...");
}
other => {
println!("Service state: {:?}", other);
}
}
Ok(())
}
/// Query service status
pub fn status() -> Result<()> {
let manager = ServiceManager::local_computer(
None::<&str>,
ServiceManagerAccess::CONNECT,
)
.context("Failed to connect to Service Control Manager")?;
match manager.open_service(SERVICE_NAME, ServiceAccess::QUERY_STATUS) {
Ok(service) => {
let status = service.query_status()?;
println!("GuruRMM Agent Service Status");
println!("============================");
println!("Service Name: {}", SERVICE_NAME);
println!("Display Name: {}", SERVICE_DISPLAY_NAME);
println!("State: {:?}", status.current_state);
println!(
"Binary: {}\\gururmm-agent.exe",
INSTALL_DIR
);
println!("Config: {}\\agent.toml", CONFIG_DIR);
}
Err(_) => {
println!("GuruRMM Agent Service Status");
println!("============================");
println!("Status: NOT INSTALLED");
println!("\nTo install: gururmm-agent install");
}
}
Ok(())
}
}
/// Legacy Windows stub module (when native-service is not enabled)
/// For legacy Windows (7, Server 2008 R2), use NSSM for service wrapper
#[cfg(all(windows, not(feature = "native-service")))]
pub mod windows {
use anyhow::{Result, bail};
pub const SERVICE_NAME: &str = "GuruRMMAgent";
pub const SERVICE_DISPLAY_NAME: &str = "GuruRMM Agent";
pub const SERVICE_DESCRIPTION: &str =
"GuruRMM Agent - Remote Monitoring and Management service";
pub const INSTALL_DIR: &str = r"C:\Program Files\GuruRMM";
pub const CONFIG_DIR: &str = r"C:\ProgramData\GuruRMM";
/// Legacy build doesn't support native service mode
pub fn run_as_service() -> Result<()> {
bail!("Native Windows service mode not available in legacy build. Use 'run' command with NSSM wrapper instead.")
}
/// Legacy install just copies binary and config, prints NSSM instructions
pub fn install(
server_url: Option<String>,
api_key: Option<String>,
_skip_legacy_check: bool,
) -> Result<()> {
use std::path::PathBuf;
use tracing::info;
info!("Installing GuruRMM Agent (legacy mode)...");
// Get the current executable path
let current_exe = std::env::current_exe()?;
let binary_dest = PathBuf::from(format!(r"{}\\gururmm-agent.exe", INSTALL_DIR));
let config_dest = PathBuf::from(format!(r"{}\\agent.toml", CONFIG_DIR));
// Create directories
std::fs::create_dir_all(INSTALL_DIR)?;
std::fs::create_dir_all(CONFIG_DIR)?;
// Copy binary
info!("Copying binary to: {:?}", binary_dest);
std::fs::copy(&current_exe, &binary_dest)?;
// Create config if needed
if !config_dest.exists() {
let mut config = crate::config::AgentConfig::sample();
if let Some(url) = &server_url {
config.server.url = url.clone();
}
if let Some(key) = &api_key {
config.server.api_key = key.clone();
}
let toml_str = toml::to_string_pretty(&config)?;
std::fs::write(&config_dest, toml_str)?;
}
println!("\n** GuruRMM Agent installed (legacy mode)!");
println!("\nInstalled files:");
println!(" Binary: {:?}", binary_dest);
println!(" Config: {:?}", config_dest);
println!("\n** IMPORTANT: This is a legacy build for Windows 7/Server 2008 R2");
println!(" Use NSSM to install as a service:");
println!();
println!(" nssm install {} {:?} run --config {:?}", SERVICE_NAME, binary_dest, config_dest);
println!(" nssm start {}", SERVICE_NAME);
println!();
println!(" Download NSSM from: https://nssm.cc/download");
Ok(())
}
pub fn uninstall() -> Result<()> {
use std::path::PathBuf;
let binary_path = PathBuf::from(format!(r"{}\\gururmm-agent.exe", INSTALL_DIR));
println!("** To uninstall legacy service, use NSSM:");
println!(" nssm stop {}", SERVICE_NAME);
println!(" nssm remove {} confirm", SERVICE_NAME);
println!();
if binary_path.exists() {
std::fs::remove_file(&binary_path)?;
println!("** Binary removed: {:?}", binary_path);
}
let _ = std::fs::remove_dir(INSTALL_DIR);
println!("\n** GuruRMM Agent uninstalled (legacy mode)!");
println!("Note: Config directory {} was preserved.", CONFIG_DIR);
Ok(())
}
pub fn start() -> Result<()> {
println!("** Legacy build: Use NSSM or sc.exe to start the service:");
println!(" nssm start {}", SERVICE_NAME);
println!(" -- OR --");
println!(" sc start {}", SERVICE_NAME);
Ok(())
}
pub fn stop() -> Result<()> {
println!("** Legacy build: Use NSSM or sc.exe to stop the service:");
println!(" nssm stop {}", SERVICE_NAME);
println!(" -- OR --");
println!(" sc stop {}", SERVICE_NAME);
Ok(())
}
pub fn status() -> Result<()> {
println!("GuruRMM Agent Service Status (Legacy Build)");
println!("==========================================");
println!("Service Name: {}", SERVICE_NAME);
println!();
println!("** Legacy build: Use sc.exe to query status:");
println!(" sc query {}", SERVICE_NAME);
println!();
println!("Binary: {}\\gururmm-agent.exe", INSTALL_DIR);
println!("Config: {}\\agent.toml", CONFIG_DIR);
Ok(())
}
}

View File

@@ -0,0 +1,299 @@
//! Transport layer for agent-server communication
//!
//! Handles WebSocket connection to the GuruRMM server with:
//! - Auto-reconnection on disconnect
//! - Authentication via API key
//! - Sending metrics and receiving commands
//! - Heartbeat to maintain connection
mod websocket;
pub use websocket::WebSocketClient;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// Messages sent from agent to server
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
#[serde(rename_all = "snake_case")]
pub enum AgentMessage {
/// Authentication message (sent on connect)
Auth(AuthPayload),
/// Metrics report
Metrics(crate::metrics::SystemMetrics),
/// Network state update (sent on connect and when interfaces change)
NetworkState(crate::metrics::NetworkState),
/// Command execution result
CommandResult(CommandResultPayload),
/// Watchdog event (service stopped, restarted, etc.)
WatchdogEvent(WatchdogEventPayload),
/// Update result (success, failure, rollback)
UpdateResult(UpdateResultPayload),
/// Heartbeat to keep connection alive
Heartbeat,
}
/// Authentication payload
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthPayload {
/// API key for this agent (or site)
pub api_key: String,
/// Unique device identifier (hardware-derived)
pub device_id: String,
/// Hostname of this machine
pub hostname: String,
/// Operating system type
pub os_type: String,
/// Operating system version
pub os_version: String,
/// Agent version
pub agent_version: String,
/// Architecture (amd64, arm64, etc.)
#[serde(default = "default_arch")]
pub architecture: String,
/// Previous version if reconnecting after update
#[serde(skip_serializing_if = "Option::is_none")]
pub previous_version: Option<String>,
/// Update ID if reconnecting after update
#[serde(skip_serializing_if = "Option::is_none")]
pub pending_update_id: Option<Uuid>,
}
fn default_arch() -> String {
#[cfg(target_arch = "x86_64")]
{ "amd64".to_string() }
#[cfg(target_arch = "aarch64")]
{ "arm64".to_string() }
#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))]
{ "unknown".to_string() }
}
/// Command execution result payload
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandResultPayload {
/// Command ID (from the server)
pub command_id: Uuid,
/// Exit code (0 = success)
pub exit_code: i32,
/// Standard output
pub stdout: String,
/// Standard error
pub stderr: String,
/// Execution duration in milliseconds
pub duration_ms: u64,
}
/// Watchdog event payload
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WatchdogEventPayload {
/// Service or process name
pub name: String,
/// Event type
pub event: WatchdogEvent,
/// Additional details
pub details: Option<String>,
}
/// Types of watchdog events
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum WatchdogEvent {
/// Service/process was found stopped
Stopped,
/// Service/process was restarted by the agent
Restarted,
/// Restart attempt failed
RestartFailed,
/// Max restart attempts reached
MaxRestartsReached,
/// Service/process recovered on its own
Recovered,
}
/// Messages sent from server to agent
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "payload")]
#[serde(rename_all = "snake_case")]
pub enum ServerMessage {
/// Authentication acknowledgment
AuthAck(AuthAckPayload),
/// Command to execute
Command(CommandPayload),
/// Configuration update
ConfigUpdate(ConfigUpdatePayload),
/// Agent update command
Update(UpdatePayload),
/// Acknowledgment of received message
Ack { message_id: Option<String> },
/// Error message
Error { code: String, message: String },
}
/// Authentication acknowledgment payload
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthAckPayload {
/// Whether authentication was successful
pub success: bool,
/// Agent ID assigned by server
pub agent_id: Option<Uuid>,
/// Error message if authentication failed
pub error: Option<String>,
}
/// Command payload from server
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandPayload {
/// Unique command ID
pub id: Uuid,
/// Type of command
pub command_type: CommandType,
/// Command text to execute
pub command: String,
/// Optional timeout in seconds
pub timeout_seconds: Option<u64>,
/// Whether to run as elevated/admin
pub elevated: bool,
}
/// Types of commands
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CommandType {
/// Shell command (cmd on Windows, bash on Unix)
Shell,
/// PowerShell command (Windows)
PowerShell,
/// Python script
Python,
/// Raw script (requires interpreter path)
Script { interpreter: String },
}
/// Configuration update payload
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConfigUpdatePayload {
/// New metrics interval (if changed)
pub metrics_interval_seconds: Option<u64>,
/// Updated watchdog config
pub watchdog: Option<WatchdogConfigUpdate>,
}
/// Watchdog configuration update
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WatchdogConfigUpdate {
/// Enable/disable watchdog
pub enabled: Option<bool>,
/// Check interval
pub check_interval_seconds: Option<u64>,
// Services and processes would be included here for remote config updates
}
/// Update command payload from server
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdatePayload {
/// Unique update ID for tracking
pub update_id: Uuid,
/// Target version to update to
pub target_version: String,
/// Download URL for the new binary
pub download_url: String,
/// SHA256 checksum of the binary
pub checksum_sha256: String,
/// Whether to force update (skip version check)
#[serde(default)]
pub force: bool,
}
/// Update result payload sent back to server
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateResultPayload {
/// Update ID (from the server)
pub update_id: Uuid,
/// Update status
pub status: UpdateStatus,
/// Old version before update
pub old_version: String,
/// New version after update (if successful)
pub new_version: Option<String>,
/// Error message if failed
pub error: Option<String>,
}
/// Update status codes
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum UpdateStatus {
/// Update starting
Starting,
/// Downloading new binary
Downloading,
/// Download complete, verifying
Verifying,
/// Installing (replacing binary)
Installing,
/// Restarting service
Restarting,
/// Update completed successfully
Completed,
/// Update failed
Failed,
/// Rolled back to previous version
RolledBack,
}

View File

@@ -0,0 +1,439 @@
//! WebSocket client for server communication
//!
//! Handles the WebSocket connection lifecycle including:
//! - Connection establishment
//! - Authentication handshake
//! - Message sending/receiving
//! - Heartbeat maintenance
//! - Command handling
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use futures_util::{SinkExt, StreamExt};
use tokio::sync::mpsc;
use tokio::time::{interval, timeout};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tracing::{debug, error, info, warn};
use super::{AgentMessage, AuthPayload, CommandPayload, ServerMessage, UpdatePayload, UpdateResultPayload, UpdateStatus};
use crate::metrics::NetworkState;
use crate::updater::{AgentUpdater, UpdaterConfig};
use crate::AppState;
/// WebSocket client for communicating with the GuruRMM server
pub struct WebSocketClient;
impl WebSocketClient {
/// Connect to the server and run the message loop
///
/// This function will return when the connection is closed or an error occurs.
/// The caller should handle reconnection logic.
pub async fn connect_and_run(state: Arc<AppState>) -> Result<()> {
let url = &state.config.server.url;
// Connect to WebSocket server
info!("Connecting to {}", url);
let (ws_stream, response) = connect_async(url)
.await
.context("Failed to connect to WebSocket server")?;
info!(
"WebSocket connected (HTTP status: {})",
response.status()
);
let (mut write, mut read) = ws_stream.split();
// Check for pending update (from previous update attempt)
let updater_config = UpdaterConfig::default();
let pending_update = AgentUpdater::load_pending_update(&updater_config).await;
// If we have pending update info, we just restarted after an update
let (previous_version, pending_update_id) = if let Some(ref info) = pending_update {
info!(
"Found pending update info: {} -> {} (id: {})",
info.old_version, info.target_version, info.update_id
);
(Some(info.old_version.clone()), Some(info.update_id))
} else {
(None, None)
};
// Send authentication message
let auth_msg = AgentMessage::Auth(AuthPayload {
api_key: state.config.server.api_key.clone(),
device_id: crate::device_id::get_device_id(),
hostname: state.config.get_hostname(),
os_type: std::env::consts::OS.to_string(),
os_version: sysinfo::System::os_version().unwrap_or_else(|| "unknown".to_string()),
agent_version: env!("CARGO_PKG_VERSION").to_string(),
architecture: Self::get_architecture().to_string(),
previous_version,
pending_update_id,
});
let auth_json = serde_json::to_string(&auth_msg)?;
write.send(Message::Text(auth_json)).await?;
debug!("Sent authentication message");
// Wait for auth response with timeout
let auth_response = timeout(Duration::from_secs(10), read.next())
.await
.context("Authentication timeout")?
.ok_or_else(|| anyhow::anyhow!("Connection closed before auth response"))?
.context("Failed to receive auth response")?;
// Parse auth response
if let Message::Text(text) = auth_response {
let server_msg: ServerMessage =
serde_json::from_str(&text).context("Failed to parse auth response")?;
match server_msg {
ServerMessage::AuthAck(ack) => {
if ack.success {
info!("Authentication successful, agent_id: {:?}", ack.agent_id);
*state.connected.write().await = true;
// Send initial network state immediately after auth
let network_state = NetworkState::collect();
info!(
"Sending initial network state ({} interfaces)",
network_state.interfaces.len()
);
let network_msg = AgentMessage::NetworkState(network_state);
let network_json = serde_json::to_string(&network_msg)?;
write.send(Message::Text(network_json)).await?;
} else {
error!("Authentication failed: {:?}", ack.error);
return Err(anyhow::anyhow!(
"Authentication failed: {}",
ack.error.unwrap_or_else(|| "Unknown error".to_string())
));
}
}
ServerMessage::Error { code, message } => {
error!("Server error during auth: {} - {}", code, message);
return Err(anyhow::anyhow!("Server error: {} - {}", code, message));
}
_ => {
warn!("Unexpected message during auth: {:?}", server_msg);
}
}
}
// Create channel for outgoing messages
let (tx, mut rx) = mpsc::channel::<AgentMessage>(100);
// Spawn metrics sender task
let metrics_tx = tx.clone();
let metrics_state = Arc::clone(&state);
let metrics_interval = state.config.metrics.interval_seconds;
let metrics_task = tokio::spawn(async move {
let mut timer = interval(Duration::from_secs(metrics_interval));
loop {
timer.tick().await;
let metrics = metrics_state.metrics_collector.collect().await;
if metrics_tx.send(AgentMessage::Metrics(metrics)).await.is_err() {
debug!("Metrics channel closed");
break;
}
}
});
// Spawn network state monitor task (checks for changes every 30 seconds)
let network_tx = tx.clone();
let network_task = tokio::spawn(async move {
// Check for network changes every 30 seconds
let mut timer = interval(Duration::from_secs(30));
let mut last_state = NetworkState::collect();
loop {
timer.tick().await;
let current_state = NetworkState::collect();
if current_state.has_changed(&last_state) {
info!(
"Network state changed (hash: {} -> {}), sending update",
last_state.state_hash, current_state.state_hash
);
// Log the changes for debugging
for iface in &current_state.interfaces {
debug!(
" Interface {}: IPv4={:?}",
iface.name, iface.ipv4_addresses
);
}
if network_tx
.send(AgentMessage::NetworkState(current_state.clone()))
.await
.is_err()
{
debug!("Network channel closed");
break;
}
last_state = current_state;
}
}
});
// Spawn heartbeat task
let heartbeat_tx = tx.clone();
let heartbeat_task = tokio::spawn(async move {
let mut timer = interval(Duration::from_secs(30));
loop {
timer.tick().await;
if heartbeat_tx.send(AgentMessage::Heartbeat).await.is_err() {
debug!("Heartbeat channel closed");
break;
}
}
});
// Main message loop
let result: Result<()> = loop {
tokio::select! {
// Handle outgoing messages
Some(msg) = rx.recv() => {
let json = serde_json::to_string(&msg)?;
if let Err(e) = write.send(Message::Text(json)).await {
break Err(e.into());
}
match &msg {
AgentMessage::Metrics(m) => {
debug!("Sent metrics: CPU={:.1}%", m.cpu_percent);
}
AgentMessage::NetworkState(n) => {
debug!("Sent network state: {} interfaces, hash={}",
n.interfaces.len(), n.state_hash);
}
AgentMessage::Heartbeat => {
debug!("Sent heartbeat");
}
_ => {
debug!("Sent message: {:?}", std::mem::discriminant(&msg));
}
}
}
// Handle incoming messages
Some(msg_result) = read.next() => {
match msg_result {
Ok(Message::Text(text)) => {
if let Err(e) = Self::handle_server_message(&text, &tx).await {
error!("Error handling message: {}", e);
}
}
Ok(Message::Ping(data)) => {
if let Err(e) = write.send(Message::Pong(data)).await {
break Err(e.into());
}
}
Ok(Message::Pong(_)) => {
debug!("Received pong");
}
Ok(Message::Close(frame)) => {
info!("Server closed connection: {:?}", frame);
break Ok(());
}
Ok(Message::Binary(_)) => {
warn!("Received unexpected binary message");
}
Ok(Message::Frame(_)) => {
// Raw frame, usually not seen
}
Err(e) => {
error!("WebSocket error: {}", e);
break Err(e.into());
}
}
}
// Connection timeout (no activity)
_ = tokio::time::sleep(Duration::from_secs(90)) => {
warn!("Connection timeout, no activity for 90 seconds");
break Err(anyhow::anyhow!("Connection timeout"));
}
}
};
// Cleanup
metrics_task.abort();
network_task.abort();
heartbeat_task.abort();
*state.connected.write().await = false;
result
}
/// Handle a message received from the server
async fn handle_server_message(
text: &str,
tx: &mpsc::Sender<AgentMessage>,
) -> Result<()> {
let msg: ServerMessage =
serde_json::from_str(text).context("Failed to parse server message")?;
match msg {
ServerMessage::Command(cmd) => {
info!("Received command: {:?} (id: {})", cmd.command_type, cmd.id);
Self::execute_command(cmd, tx.clone()).await;
}
ServerMessage::ConfigUpdate(update) => {
info!("Received config update: {:?}", update);
// Config updates will be handled in a future phase
}
ServerMessage::Ack { message_id } => {
debug!("Received ack for message: {:?}", message_id);
}
ServerMessage::AuthAck(_) => {
// Already handled during initial auth
}
ServerMessage::Error { code, message } => {
error!("Server error: {} - {}", code, message);
}
ServerMessage::Update(payload) => {
info!(
"Received update command: {} -> {} (id: {})",
env!("CARGO_PKG_VERSION"),
payload.target_version,
payload.update_id
);
Self::handle_update(payload, tx.clone()).await;
}
}
Ok(())
}
/// Handle an update command from the server
async fn handle_update(payload: UpdatePayload, tx: mpsc::Sender<AgentMessage>) {
// Send starting status
let starting_result = UpdateResultPayload {
update_id: payload.update_id,
status: UpdateStatus::Starting,
old_version: env!("CARGO_PKG_VERSION").to_string(),
new_version: None,
error: None,
};
let _ = tx.send(AgentMessage::UpdateResult(starting_result)).await;
// Spawn update in background (it will restart the service)
tokio::spawn(async move {
let config = UpdaterConfig::default();
let updater = AgentUpdater::new(config);
let result = updater.perform_update(payload).await;
// If we reach here, the update failed (successful update restarts the process)
let _ = tx.send(AgentMessage::UpdateResult(result)).await;
});
}
/// Get the current architecture
fn get_architecture() -> &'static str {
#[cfg(target_arch = "x86_64")]
{ "amd64" }
#[cfg(target_arch = "aarch64")]
{ "arm64" }
#[cfg(target_arch = "x86")]
{ "386" }
#[cfg(target_arch = "arm")]
{ "arm" }
#[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64", target_arch = "x86", target_arch = "arm")))]
{ "unknown" }
}
/// Execute a command received from the server
async fn execute_command(cmd: CommandPayload, tx: mpsc::Sender<AgentMessage>) {
let command_id = cmd.id;
// Spawn command execution in background
tokio::spawn(async move {
let start = std::time::Instant::now();
let result = Self::run_command(&cmd).await;
let duration_ms = start.elapsed().as_millis() as u64;
let (exit_code, stdout, stderr) = match result {
Ok((code, out, err)) => (code, out, err),
Err(e) => (-1, String::new(), format!("Execution error: {}", e)),
};
let result_msg = AgentMessage::CommandResult(super::CommandResultPayload {
command_id,
exit_code,
stdout,
stderr,
duration_ms,
});
if tx.send(result_msg).await.is_err() {
error!("Failed to send command result");
}
});
}
/// Run a command and capture output
async fn run_command(cmd: &CommandPayload) -> Result<(i32, String, String)> {
use tokio::process::Command;
let timeout_secs = cmd.timeout_seconds.unwrap_or(300); // 5 minute default
let mut command = match &cmd.command_type {
super::CommandType::Shell => {
#[cfg(windows)]
{
let mut c = Command::new("cmd");
c.args(["/C", &cmd.command]);
c
}
#[cfg(unix)]
{
let mut c = Command::new("sh");
c.args(["-c", &cmd.command]);
c
}
}
super::CommandType::PowerShell => {
let mut c = Command::new("powershell");
c.args(["-NoProfile", "-NonInteractive", "-Command", &cmd.command]);
c
}
super::CommandType::Python => {
let mut c = Command::new("python");
c.args(["-c", &cmd.command]);
c
}
super::CommandType::Script { interpreter } => {
let mut c = Command::new(interpreter);
c.args(["-c", &cmd.command]);
c
}
};
// Capture output
command.stdout(std::process::Stdio::piped());
command.stderr(std::process::Stdio::piped());
// Execute with timeout
let output = timeout(Duration::from_secs(timeout_secs), command.output())
.await
.context("Command timeout")?
.context("Failed to execute command")?;
let exit_code = output.status.code().unwrap_or(-1);
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
Ok((exit_code, stdout, stderr))
}
}

View File

@@ -0,0 +1,554 @@
//! Agent self-update module
//!
//! Handles downloading, verifying, and installing agent updates.
//! Features:
//! - Download new binary via HTTPS
//! - SHA256 checksum verification
//! - Atomic binary replacement
//! - Auto-rollback if agent fails to restart
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use sha2::{Sha256, Digest};
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::transport::{UpdatePayload, UpdateResultPayload, UpdateStatus};
/// Configuration for the updater
#[derive(Debug, Clone)]
pub struct UpdaterConfig {
/// Path to the current agent binary
pub binary_path: PathBuf,
/// Directory for config and backup files
pub config_dir: PathBuf,
/// Rollback timeout in seconds
pub rollback_timeout_secs: u64,
}
impl Default for UpdaterConfig {
fn default() -> Self {
Self {
binary_path: Self::detect_binary_path(),
config_dir: Self::detect_config_dir(),
rollback_timeout_secs: 180,
}
}
}
impl UpdaterConfig {
/// Detect the path to the currently running binary
fn detect_binary_path() -> PathBuf {
std::env::current_exe().unwrap_or_else(|_| {
#[cfg(windows)]
{ PathBuf::from(r"C:\Program Files\GuruRMM\gururmm-agent.exe") }
#[cfg(not(windows))]
{ PathBuf::from("/usr/local/bin/gururmm-agent") }
})
}
/// Detect the config directory
fn detect_config_dir() -> PathBuf {
#[cfg(windows)]
{ PathBuf::from(r"C:\ProgramData\GuruRMM") }
#[cfg(not(windows))]
{ PathBuf::from("/etc/gururmm") }
}
/// Get the backup binary path
pub fn backup_path(&self) -> PathBuf {
self.config_dir.join("gururmm-agent.backup")
}
/// Get the pending update info path (stores update_id for reconnection)
pub fn pending_update_path(&self) -> PathBuf {
self.config_dir.join("pending-update.json")
}
}
/// Pending update information (persisted to disk before restart)
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct PendingUpdateInfo {
pub update_id: Uuid,
pub old_version: String,
pub target_version: String,
}
/// Agent updater
pub struct AgentUpdater {
config: UpdaterConfig,
http_client: reqwest::Client,
}
impl AgentUpdater {
/// Create a new updater
pub fn new(config: UpdaterConfig) -> Self {
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(300))
.build()
.expect("Failed to create HTTP client");
Self { config, http_client }
}
/// Perform an update
///
/// Returns UpdateResultPayload to send back to server
pub async fn perform_update(&self, payload: UpdatePayload) -> UpdateResultPayload {
let old_version = env!("CARGO_PKG_VERSION").to_string();
info!(
"Starting update: {} -> {} (update_id: {})",
old_version, payload.target_version, payload.update_id
);
match self.do_update(&payload, &old_version).await {
Ok(()) => {
// If we get here, something went wrong - we should have restarted
// This means the update completed but restart failed
UpdateResultPayload {
update_id: payload.update_id,
status: UpdateStatus::Failed,
old_version,
new_version: None,
error: Some("Update installed but restart failed".into()),
}
}
Err(e) => {
error!("Update failed: {}", e);
UpdateResultPayload {
update_id: payload.update_id,
status: UpdateStatus::Failed,
old_version,
new_version: None,
error: Some(e.to_string()),
}
}
}
}
/// Internal update implementation
async fn do_update(&self, payload: &UpdatePayload, old_version: &str) -> Result<()> {
// Step 1: Download to temp file
info!("Downloading new binary from {}", payload.download_url);
let temp_path = self.download_binary(&payload.download_url).await
.context("Failed to download binary")?;
// Step 2: Verify checksum
info!("Verifying checksum...");
self.verify_checksum(&temp_path, &payload.checksum_sha256).await
.context("Checksum verification failed")?;
info!("Checksum verified");
// Step 3: Backup current binary
info!("Backing up current binary...");
self.backup_current_binary().await
.context("Failed to backup current binary")?;
// Step 4: Save pending update info (for reconnection after restart)
info!("Saving pending update info...");
self.save_pending_update(PendingUpdateInfo {
update_id: payload.update_id,
old_version: old_version.to_string(),
target_version: payload.target_version.clone(),
}).await
.context("Failed to save pending update info")?;
// Step 5: Create rollback watchdog
info!("Creating rollback watchdog...");
self.create_rollback_watchdog().await
.context("Failed to create rollback watchdog")?;
// Step 6: Replace binary
info!("Replacing binary...");
self.replace_binary(&temp_path).await
.context("Failed to replace binary")?;
// Step 7: Restart service
info!("Restarting service...");
self.restart_service().await
.context("Failed to restart service")?;
// We should never reach here - the restart should terminate this process
Ok(())
}
/// Download the new binary to a temp file
async fn download_binary(&self, url: &str) -> Result<PathBuf> {
let response = self.http_client.get(url)
.send()
.await
.context("HTTP request failed")?;
if !response.status().is_success() {
anyhow::bail!("Download failed with status: {}", response.status());
}
let temp_path = std::env::temp_dir().join(format!("gururmm-update-{}", Uuid::new_v4()));
let mut file = fs::File::create(&temp_path).await
.context("Failed to create temp file")?;
let bytes = response.bytes().await
.context("Failed to read response body")?;
file.write_all(&bytes).await
.context("Failed to write to temp file")?;
file.flush().await?;
debug!("Downloaded {} bytes to {:?}", bytes.len(), temp_path);
Ok(temp_path)
}
/// Verify SHA256 checksum of downloaded file
async fn verify_checksum(&self, path: &Path, expected: &str) -> Result<()> {
let bytes = fs::read(path).await
.context("Failed to read file for checksum")?;
let mut hasher = Sha256::new();
hasher.update(&bytes);
let actual = format!("{:x}", hasher.finalize());
if actual.to_lowercase() != expected.to_lowercase() {
anyhow::bail!(
"Checksum mismatch: expected {}, got {}",
expected.to_lowercase(),
actual.to_lowercase()
);
}
Ok(())
}
/// Backup the current binary
async fn backup_current_binary(&self) -> Result<()> {
let backup_path = self.config.backup_path();
// Ensure config directory exists
if let Some(parent) = backup_path.parent() {
fs::create_dir_all(parent).await.ok();
}
// Copy current binary to backup location
fs::copy(&self.config.binary_path, &backup_path).await
.context("Failed to copy binary to backup")?;
debug!("Backed up to {:?}", backup_path);
Ok(())
}
/// Save pending update info to disk
async fn save_pending_update(&self, info: PendingUpdateInfo) -> Result<()> {
let path = self.config.pending_update_path();
let json = serde_json::to_string(&info)?;
fs::write(&path, json).await?;
Ok(())
}
/// Load pending update info from disk (called on startup)
pub async fn load_pending_update(config: &UpdaterConfig) -> Option<PendingUpdateInfo> {
let path = config.pending_update_path();
if let Ok(json) = fs::read_to_string(&path).await {
if let Ok(info) = serde_json::from_str(&json) {
// Clear the file after loading
let _ = fs::remove_file(&path).await;
return Some(info);
}
}
None
}
/// Create a rollback watchdog that will restore the backup if agent fails to start
async fn create_rollback_watchdog(&self) -> Result<()> {
#[cfg(unix)]
self.create_unix_rollback_watchdog().await?;
#[cfg(windows)]
self.create_windows_rollback_watchdog().await?;
Ok(())
}
#[cfg(unix)]
async fn create_unix_rollback_watchdog(&self) -> Result<()> {
let backup_path = self.config.backup_path();
let binary_path = &self.config.binary_path;
let timeout = self.config.rollback_timeout_secs;
let script = format!(r#"#!/bin/bash
# GuruRMM Rollback Watchdog
# Auto-generated - will be deleted after successful update
BACKUP="{backup}"
BINARY="{binary}"
TIMEOUT={timeout}
sleep $TIMEOUT
# Check if agent service is running
if ! systemctl is-active --quiet gururmm-agent 2>/dev/null; then
echo "Agent not running after update, rolling back..."
if [ -f "$BACKUP" ]; then
cp "$BACKUP" "$BINARY"
chmod +x "$BINARY"
systemctl start gururmm-agent
echo "Rollback completed"
else
echo "No backup file found!"
fi
fi
# Clean up this script
rm -f /tmp/gururmm-rollback.sh
"#,
backup = backup_path.display(),
binary = binary_path.display(),
timeout = timeout
);
let script_path = PathBuf::from("/tmp/gururmm-rollback.sh");
fs::write(&script_path, script).await?;
// Make executable and run in background
tokio::process::Command::new("chmod")
.arg("+x")
.arg(&script_path)
.status()
.await?;
// Spawn as detached background process
tokio::process::Command::new("nohup")
.arg("bash")
.arg(&script_path)
.arg("&")
.spawn()
.context("Failed to spawn rollback watchdog")?;
info!("Rollback watchdog started (timeout: {}s)", timeout);
Ok(())
}
#[cfg(windows)]
async fn create_windows_rollback_watchdog(&self) -> Result<()> {
let backup_path = self.config.backup_path();
let binary_path = &self.config.binary_path;
let timeout = self.config.rollback_timeout_secs;
// Create a PowerShell script for rollback
let script = format!(r#"
# GuruRMM Rollback Watchdog
# Auto-generated - will be deleted after successful update
$Backup = "{backup}"
$Binary = "{binary}"
$Timeout = {timeout}
Start-Sleep -Seconds $Timeout
# Check if agent service is running
$service = Get-Service -Name "gururmm-agent" -ErrorAction SilentlyContinue
if ($service -and $service.Status -ne 'Running') {{
Write-Host "Agent not running after update, rolling back..."
if (Test-Path $Backup) {{
Stop-Service -Name "gururmm-agent" -Force -ErrorAction SilentlyContinue
Copy-Item -Path $Backup -Destination $Binary -Force
Start-Service -Name "gururmm-agent"
Write-Host "Rollback completed"
}} else {{
Write-Host "No backup file found!"
}}
}}
# Clean up
Remove-Item -Path $MyInvocation.MyCommand.Path -Force
"#,
backup = backup_path.display().to_string().replace('\\', "\\\\"),
binary = binary_path.display().to_string().replace('\\', "\\\\"),
timeout = timeout
);
let script_path = std::env::temp_dir().join("gururmm-rollback.ps1");
fs::write(&script_path, script).await?;
// Schedule a task to run the rollback script
tokio::process::Command::new("schtasks")
.args([
"/Create",
"/TN", "GuruRMM-Rollback",
"/TR", &format!("powershell.exe -ExecutionPolicy Bypass -File \"{}\"", script_path.display()),
"/SC", "ONCE",
"/ST", &Self::get_scheduled_time(timeout),
"/F",
])
.status()
.await?;
info!("Rollback watchdog scheduled (timeout: {}s)", timeout);
Ok(())
}
#[cfg(windows)]
fn get_scheduled_time(seconds_from_now: u64) -> String {
use chrono::Local;
let now = Local::now();
let scheduled = now + chrono::Duration::seconds(seconds_from_now as i64);
scheduled.format("%H:%M").to_string()
}
/// Replace the binary with the new one
async fn replace_binary(&self, new_binary: &Path) -> Result<()> {
#[cfg(unix)]
{
info!(
"Replacing binary: source={:?}, dest={:?}",
new_binary, self.config.binary_path
);
// Verify source exists
if !new_binary.exists() {
anyhow::bail!("Source binary does not exist: {:?}", new_binary);
}
let source_meta = fs::metadata(new_binary).await
.context("Failed to read source binary metadata")?;
info!("Source binary size: {} bytes", source_meta.len());
// Check destination directory
if let Some(parent) = self.config.binary_path.parent() {
if !parent.exists() {
anyhow::bail!("Destination directory does not exist: {:?}", parent);
}
}
// On Unix, we cannot overwrite a running binary directly.
// We need to remove/rename the old file first, then copy the new one.
let old_path = self.config.binary_path.with_extension("old");
// Rename current binary (works even while running)
if self.config.binary_path.exists() {
info!("Renaming current binary to {:?}", old_path);
fs::rename(&self.config.binary_path, &old_path).await
.with_context(|| format!(
"Failed to rename {:?} to {:?}",
self.config.binary_path, old_path
))?;
}
// Copy new binary to destination
fs::copy(new_binary, &self.config.binary_path).await
.with_context(|| format!(
"Failed to copy {:?} to {:?}",
new_binary, self.config.binary_path
))?;
info!("Binary copied successfully, setting executable permissions");
// Make executable
let chmod_status = tokio::process::Command::new("chmod")
.arg("+x")
.arg(&self.config.binary_path)
.status()
.await
.context("Failed to run chmod")?;
if !chmod_status.success() {
warn!("chmod returned non-zero exit code: {:?}", chmod_status.code());
}
// Clean up old binary
fs::remove_file(&old_path).await.ok();
info!("Old binary cleaned up");
}
#[cfg(windows)]
{
// On Windows, rename the current binary first
let old_path = self.config.binary_path.with_extension("old");
fs::rename(&self.config.binary_path, &old_path).await.ok();
fs::copy(new_binary, &self.config.binary_path).await
.context("Failed to copy new binary")?;
fs::remove_file(&old_path).await.ok();
}
// Clean up temp file
fs::remove_file(new_binary).await.ok();
Ok(())
}
/// Restart the agent service
async fn restart_service(&self) -> Result<()> {
#[cfg(unix)]
{
// Try systemctl first
let status = tokio::process::Command::new("systemctl")
.args(["restart", "gururmm-agent"])
.status()
.await;
if status.is_err() || !status.unwrap().success() {
// Fallback: exec the new binary directly
warn!("systemctl restart failed, attempting direct restart");
std::process::Command::new(&self.config.binary_path)
.spawn()
.context("Failed to spawn new agent")?;
}
}
#[cfg(windows)]
{
// Restart Windows service
tokio::process::Command::new("sc.exe")
.args(["stop", "gururmm-agent"])
.status()
.await?;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
tokio::process::Command::new("sc.exe")
.args(["start", "gururmm-agent"])
.status()
.await?;
}
// Give the new process a moment to start
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
// Exit this process - the new version should be running now
std::process::exit(0);
}
/// Cancel the rollback watchdog (called when update is confirmed successful)
pub async fn cancel_rollback_watchdog(&self) {
#[cfg(unix)]
{
// Kill the watchdog script
let _ = tokio::process::Command::new("pkill")
.args(["-f", "gururmm-rollback.sh"])
.status()
.await;
let _ = fs::remove_file("/tmp/gururmm-rollback.sh").await;
}
#[cfg(windows)]
{
// Delete the scheduled task
let _ = tokio::process::Command::new("schtasks")
.args(["/Delete", "/TN", "GuruRMM-Rollback", "/F"])
.status()
.await;
let script_path = std::env::temp_dir().join("gururmm-rollback.ps1");
let _ = fs::remove_file(script_path).await;
}
info!("Rollback watchdog cancelled");
}
/// Clean up backup files after successful update confirmation
pub async fn cleanup_backup(&self) {
let _ = fs::remove_file(self.config.backup_path()).await;
info!("Backup file cleaned up");
}
}

View File

@@ -0,0 +1,40 @@
//! Watchdog module for service/process monitoring
//!
//! Monitors configured services and processes, alerting and optionally
//! restarting them when they stop.
//!
//! This module will be implemented in Phase 3.
// Platform-specific implementations will go here:
// - windows.rs: Windows service monitoring via SCM
// - linux.rs: Systemd service monitoring
// - macos.rs: Launchd service monitoring
use serde::{Deserialize, Serialize};
/// Watchdog status for a single service/process
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WatchdogStatus {
pub name: String,
pub running: bool,
pub restart_count: u32,
pub last_checked: chrono::DateTime<chrono::Utc>,
}
/// Placeholder for the watchdog manager
/// Will be implemented in Phase 3
pub struct WatchdogManager {
// Will contain the watchdog configuration and state
}
impl WatchdogManager {
pub fn new(_config: &crate::config::WatchdogConfig) -> Self {
Self {}
}
/// Check all watched services/processes
pub async fn check_all(&self) -> Vec<WatchdogStatus> {
// Placeholder - will be implemented in Phase 3
Vec::new()
}
}