salvo-realtime
Salvo Real-time Communication
Overview / decision guide for Salvo's real-time transports. For full APIs see salvo-websocket and salvo-sse.
Pick a transport
| WebSocket | SSE | |
|---|---|---|
| Direction | Full-duplex | Server → client only |
| Transport | Custom over TCP | Plain HTTP (text/event-stream) |
| Reconnection | Manual (heartbeat) | Automatic (browser) |
| Binary payloads | Yes | No (text only) |
| Proxies / firewalls | Occasionally blocked | Same as any HTTP request |
| Salvo feature | websocket |
sse |
Use WebSocket for chat, multiplayer games, collaborative editing, any bidirectional stream. Use SSE for notifications, tickers, progress bars, dashboards — anything that only pushes from server.
WebSocket quick start
use salvo::prelude::*;
use salvo::websocket::WebSocketUpgrade;
#[handler]
async fn ws(req: &mut Request, res: &mut Response) -> Result<(), StatusError> {
WebSocketUpgrade::new()
.upgrade(req, res, |mut ws| async move {
while let Some(Ok(msg)) = ws.recv().await {
if ws.send(msg).await.is_err() { return; }
}
})
.await
}
#[tokio::main]
async fn main() {
let router = Router::new().push(Router::with_path("ws").goal(ws));
Server::new(TcpListener::new("0.0.0.0:8080").bind().await).serve(router).await;
}
SSE quick start
use std::{convert::Infallible, time::Duration};
use futures_util::StreamExt;
use salvo::prelude::*;
use salvo::sse::{self, SseEvent};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
#[handler]
async fn events(res: &mut Response) {
let mut n: u64 = 0;
let stream = IntervalStream::new(interval(Duration::from_secs(1))).map(move |_| {
n += 1;
Ok::<_, Infallible>(SseEvent::default().text(n.to_string()))
});
sse::stream(res, stream);
}
Fan-out patterns
Per-client channel map (WebSocket broadcast)
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
use salvo::websocket::Message;
type Tx = mpsc::UnboundedSender<Result<Message, salvo::Error>>;
type Users = Arc<RwLock<HashMap<usize, Tx>>>;
async fn broadcast(users: &Users, sender_id: usize, text: &str) {
let out = format!("<#{sender_id}>: {text}");
for (&uid, tx) in users.read().await.iter() {
if uid != sender_id { let _ = tx.send(Ok(Message::text(out.clone()))); }
}
}
Pub/sub via broadcast (fits both transports)
use tokio::sync::broadcast;
#[derive(Clone)]
struct Hub { tx: broadcast::Sender<String> }
impl Hub {
fn new(cap: usize) -> Self { let (tx, _) = broadcast::channel(cap); Self { tx } }
fn publish(&self, msg: String) { let _ = self.tx.send(msg); }
fn subscribe(&self) -> broadcast::Receiver<String> { self.tx.subscribe() }
}
Share a Hub via Depot (typically stashed by an affix_state hoop) and wrap BroadcastStream into an SSE stream or forward into a WebSocket sink.
Rooms
type Rooms = Arc<RwLock<HashMap<String, HashMap<usize, Tx>>>>;
Insert on join, remove on disconnect (both while holding the write guard, no .await in between).
Connection lifecycle
- Auth before upgrade: validate JWT / session on the route before
WebSocketUpgrade::upgrade. Query params and headers are only accessible before the closure runs. - Track counts:
AtomicUsizeon connect/disconnect if you want metrics without locking. - WebSocket heartbeat: server-side
Message::pingevery 30 s inside atokio::select!alongsiderecv(). - SSE keep-alive: use
SseKeepAlive::new(stream).max_interval(Duration::from_secs(15))(note:max_interval, notinterval). - Back-pressure: slow clients will fill unbounded channels — prefer bounded
mpscfor per-client queues, or drop lagged subscribers onbroadcast::error::RecvError::Lagged. - Cap fanout: pair with
salvo-concurrency-limiterto bound total subscribers.
Mixing both transports
let router = Router::new()
.push(Router::with_path("chat").goal(ws_chat_handler)) // bidirectional
.push(Router::with_path("notifications").get(sse_notifications)) // push-only
.push(Router::with_path("feed").get(sse_feed));
Related Skills
- salvo-websocket: Full WebSocket reference
- salvo-sse: Full SSE reference
- salvo-concurrency-limiter: Cap concurrent real-time connections
- salvo-auth: Authenticate upgrade handshakes
More from salvo-rs/salvo-skills
salvo-csrf
Implement CSRF (Cross-Site Request Forgery) protection using cookie or session storage. Use for protecting forms and state-changing endpoints.
16salvo-auth
Implement authentication and authorization using JWT, Basic Auth, or custom schemes. Use for securing API endpoints and user management.
15salvo-cors
Configure Cross-Origin Resource Sharing (CORS) and security headers. Use for APIs accessed from browsers on different domains.
15salvo-middleware
Implement middleware for authentication, logging, CORS, and request processing. Use for cross-cutting concerns and request/response modification.
15salvo-caching
Implement caching strategies for improved performance. Use for reducing database load and speeding up responses.
15salvo-proxy
Implement reverse proxy to forward requests to backend services. Use for load balancing, API gateways, and microservices routing.
15