//! WebSocket client transport //! //! Handles WebSocket connection to the GuruConnect server with: //! - TLS encryption //! - Automatic reconnection //! - Protobuf message serialization use crate::proto::Message; use anyhow::{Context, Result}; use bytes::Bytes; use futures_util::{SinkExt, StreamExt}; use prost::Message as ProstMessage; use std::collections::VecDeque; use std::sync::Arc; use tokio::net::TcpStream; use tokio::sync::Mutex; use tokio_tungstenite::{ connect_async, tungstenite::protocol::Message as WsMessage, MaybeTlsStream, WebSocketStream, }; type WsStream = WebSocketStream>; /// WebSocket transport for server communication pub struct WebSocketTransport { stream: Arc>, incoming: VecDeque, connected: bool, } impl WebSocketTransport { /// Connect to the server pub async fn connect( url: &str, agent_id: &str, api_key: &str, hostname: Option<&str>, support_code: Option<&str>, machine_uid: Option<&str>, ) -> Result { // Build query parameters. agent_id + api_key are kept exactly as-is; // machine_uid is appended ALONGSIDE them (v2 stable-identity Task 1) so // the server sees the deterministic identity at connect time. It does not // change registration keying (a separate server-side task). let mut params = format!("agent_id={}&api_key={}", agent_id, api_key); if let Some(hostname) = hostname { params.push_str(&format!("&hostname={}", urlencoding::encode(hostname))); } if let Some(machine_uid) = machine_uid { params.push_str(&format!( "&machine_uid={}", urlencoding::encode(machine_uid) )); } if let Some(code) = support_code { params.push_str(&format!("&support_code={}", code)); } // Append parameters to URL let url_with_params = if url.contains('?') { format!("{}&{}", url, params) } else { format!("{}?{}", url, params) }; tracing::info!("Connecting to {} as agent {}", url, agent_id); if let Some(code) = support_code { tracing::info!("Using support code: {}", code); } let (ws_stream, response) = connect_async(&url_with_params) .await .context("Failed to connect to WebSocket server")?; tracing::info!("Connected, status: {}", response.status()); Ok(Self { stream: Arc::new(Mutex::new(ws_stream)), incoming: VecDeque::new(), connected: true, }) } /// Send a protobuf message pub async fn send(&mut self, msg: Message) -> Result<()> { let mut stream = self.stream.lock().await; // Serialize to protobuf binary let mut buf = Vec::with_capacity(msg.encoded_len()); msg.encode(&mut buf)?; // Send as binary WebSocket message stream .send(WsMessage::Binary(buf)) .await .context("Failed to send message")?; Ok(()) } /// Try to receive a message (non-blocking) pub fn try_recv(&mut self) -> Result> { // Return buffered message if available if let Some(msg) = self.incoming.pop_front() { return Ok(Some(msg)); } // Try to receive more messages let stream = self.stream.clone(); let result = tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(async { let mut stream = stream.lock().await; // Use try_next for non-blocking receive match tokio::time::timeout(std::time::Duration::from_millis(1), stream.next()).await { Ok(Some(Ok(ws_msg))) => Ok(Some(ws_msg)), Ok(Some(Err(e))) => Err(anyhow::anyhow!("WebSocket error: {}", e)), Ok(None) => { // Connection closed Ok(None) } Err(_) => { // Timeout - no message available Ok(None) } } }) }); match result? { Some(ws_msg) => { if let Some(msg) = self.parse_message(ws_msg)? { Ok(Some(msg)) } else { Ok(None) } } None => Ok(None), } } /// Receive a message (blocking) #[allow(dead_code)] pub async fn recv(&mut self) -> Result> { // Return buffered message if available if let Some(msg) = self.incoming.pop_front() { return Ok(Some(msg)); } let result = { let mut stream = self.stream.lock().await; stream.next().await }; match result { Some(Ok(ws_msg)) => self.parse_message(ws_msg), Some(Err(e)) => { self.connected = false; Err(anyhow::anyhow!("WebSocket error: {}", e)) } None => { self.connected = false; Ok(None) } } } /// Parse a WebSocket message into a protobuf message fn parse_message(&mut self, ws_msg: WsMessage) -> Result> { match ws_msg { WsMessage::Binary(data) => { let msg = Message::decode(Bytes::from(data)) .context("Failed to decode protobuf message")?; Ok(Some(msg)) } WsMessage::Ping(_data) => { // Pong is sent automatically by tungstenite tracing::trace!("Received ping"); Ok(None) } WsMessage::Pong(_) => { tracing::trace!("Received pong"); Ok(None) } WsMessage::Close(frame) => { tracing::info!("Connection closed: {:?}", frame); self.connected = false; Ok(None) } WsMessage::Text(text) => { // We expect binary protobuf, but log text messages tracing::warn!("Received unexpected text message: {}", text); Ok(None) } _ => Ok(None), } } /// Check if connected pub fn is_connected(&self) -> bool { self.connected } /// Close the connection #[allow(dead_code)] pub async fn close(&mut self) -> Result<()> { let mut stream = self.stream.lock().await; stream.close(None).await?; self.connected = false; Ok(()) } }