Some checks failed
Build and Test / Build Server (Linux) (push) Failing after 2m59s
Build and Test / Build Agent (Windows) (push) Has started running
Build and Test / Security Audit (push) Has been cancelled
Build and Test / Build Summary (push) Has been cancelled
Run Tests / Test Server (push) Has been cancelled
Run Tests / Test Agent (push) Has been cancelled
Run Tests / Code Coverage (push) Has been cancelled
Run Tests / Lint and Format Check (push) Has been cancelled
First run of the build-and-test CI gate (cargo fmt --all -- --check) surfaced pre-existing formatting drift across the agent and server crates. Apply rustfmt across the workspace so the codebase meets its own CI gate. Pure formatting; no logic changes. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
94 lines
2.9 KiB
Rust
94 lines
2.9 KiB
Rust
//! WebSocket transport for viewer-server communication
|
|
|
|
use crate::proto;
|
|
use anyhow::{anyhow, Result};
|
|
use bytes::Bytes;
|
|
use futures_util::{SinkExt, StreamExt};
|
|
use prost::Message as ProstMessage;
|
|
use std::sync::Arc;
|
|
use tokio::net::TcpStream;
|
|
use tokio::sync::Mutex;
|
|
use tokio_tungstenite::{
|
|
connect_async, tungstenite::protocol::Message as WsMessage, MaybeTlsStream, WebSocketStream,
|
|
};
|
|
use tracing::{debug, error, trace};
|
|
|
|
pub type WsSender =
|
|
futures_util::stream::SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, WsMessage>;
|
|
|
|
pub type WsReceiver = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
|
|
|
|
/// Receiver wrapper that parses protobuf messages
|
|
pub struct MessageReceiver {
|
|
inner: WsReceiver,
|
|
}
|
|
|
|
impl MessageReceiver {
|
|
pub async fn recv(&mut self) -> Option<proto::Message> {
|
|
loop {
|
|
match self.inner.next().await {
|
|
Some(Ok(WsMessage::Binary(data))) => {
|
|
match proto::Message::decode(Bytes::from(data)) {
|
|
Ok(msg) => return Some(msg),
|
|
Err(e) => {
|
|
error!("Failed to decode message: {}", e);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
Some(Ok(WsMessage::Close(_))) => {
|
|
debug!("WebSocket closed");
|
|
return None;
|
|
}
|
|
Some(Ok(WsMessage::Ping(_))) => {
|
|
trace!("Received ping");
|
|
continue;
|
|
}
|
|
Some(Ok(WsMessage::Pong(_))) => {
|
|
trace!("Received pong");
|
|
continue;
|
|
}
|
|
Some(Ok(_)) => continue,
|
|
Some(Err(e)) => {
|
|
error!("WebSocket error: {}", e);
|
|
return None;
|
|
}
|
|
None => return None,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Connect to the GuruConnect server
|
|
pub async fn connect(url: &str, token: &str) -> Result<(WsSender, MessageReceiver)> {
|
|
// Add auth token to URL
|
|
let full_url = if token.is_empty() {
|
|
url.to_string()
|
|
} else if url.contains('?') {
|
|
format!("{}&token={}", url, urlencoding::encode(token))
|
|
} else {
|
|
format!("{}?token={}", url, urlencoding::encode(token))
|
|
};
|
|
|
|
debug!("Connecting to {}", full_url);
|
|
|
|
let (ws_stream, _) = connect_async(&full_url)
|
|
.await
|
|
.map_err(|e| anyhow!("Failed to connect: {}", e))?;
|
|
|
|
let (sender, receiver) = ws_stream.split();
|
|
|
|
Ok((sender, MessageReceiver { inner: receiver }))
|
|
}
|
|
|
|
/// Send a protobuf message over the WebSocket
|
|
pub async fn send_message(sender: &Arc<Mutex<WsSender>>, msg: &proto::Message) -> Result<()> {
|
|
let mut buf = Vec::with_capacity(msg.encoded_len());
|
|
msg.encode(&mut buf)?;
|
|
|
|
let mut sender = sender.lock().await;
|
|
sender.send(WsMessage::Binary(buf)).await?;
|
|
|
|
Ok(())
|
|
}
|