//! WebSocket transport for viewer-server communication use crate::proto; use anyhow::{anyhow, Result}; use bytes::Bytes; use futures_util::{SinkExt, StreamExt}; use prost::Message as ProstMessage; use std::sync::Arc; use tokio::sync::Mutex; use tokio_tungstenite::{ connect_async, tungstenite::protocol::Message as WsMessage, MaybeTlsStream, WebSocketStream, }; use tokio::net::TcpStream; use tracing::{debug, error, trace}; pub type WsSender = futures_util::stream::SplitSink< WebSocketStream>, WsMessage, >; pub type WsReceiver = futures_util::stream::SplitStream< WebSocketStream>, >; /// Receiver wrapper that parses protobuf messages pub struct MessageReceiver { inner: WsReceiver, } impl MessageReceiver { pub async fn recv(&mut self) -> Option { loop { match self.inner.next().await { Some(Ok(WsMessage::Binary(data))) => { match proto::Message::decode(Bytes::from(data)) { Ok(msg) => return Some(msg), Err(e) => { error!("Failed to decode message: {}", e); continue; } } } Some(Ok(WsMessage::Close(_))) => { debug!("WebSocket closed"); return None; } Some(Ok(WsMessage::Ping(_))) => { trace!("Received ping"); continue; } Some(Ok(WsMessage::Pong(_))) => { trace!("Received pong"); continue; } Some(Ok(_)) => continue, Some(Err(e)) => { error!("WebSocket error: {}", e); return None; } None => return None, } } } } /// Connect to the GuruConnect server pub async fn connect(url: &str, api_key: &str) -> Result<(WsSender, MessageReceiver)> { // Add API key to URL let full_url = if url.contains('?') { format!("{}&api_key={}", url, api_key) } else { format!("{}?api_key={}", url, api_key) }; debug!("Connecting to {}", full_url); let (ws_stream, _) = connect_async(&full_url) .await .map_err(|e| anyhow!("Failed to connect: {}", e))?; let (sender, receiver) = ws_stream.split(); Ok((sender, MessageReceiver { inner: receiver })) } /// Send a protobuf message over the WebSocket pub async fn send_message( sender: &Arc>, msg: &proto::Message, ) -> Result<()> { let mut buf = Vec::with_capacity(msg.encoded_len()); msg.encode(&mut buf)?; let mut sender = sender.lock().await; sender.send(WsMessage::Binary(buf)).await?; Ok(()) }