data-fair-ws
WebSocket Integration in data-fair Services
The data-fair stack provides a turnkey pub/sub websocket system built on three coordinated packages. Messages flow through a MongoDB capped collection that acts as a durable bus, so the system scales across multiple server processes without an external broker.
Architecture Overview
Vue UI (browser) Express API server Worker / background process
───────────────── ────────────────── ──────────────────────────
useWS('/my-svc/api/') wsServer.start(server, db, wsEmitter.init(db)
.subscribe(channel, cb) ◄── canSubscribe) wsEmitter.emit(channel, data)
│ ▲ │
Node.js client (tests) │ │ ws connection │
────────────────────── │ │ │
WsClient / DataFairWsClient ─────┘ │ │
.subscribe(channel) ◄────────┘ │
.waitFor(channel, filter) │ tailable cursor │ insert
▼ ▼
┌──────────────────────────────────────────┐
│ MongoDB capped collection "ws-messages" │
└──────────────────────────────────────────┘
@data-fair/lib-node/ws-emitter— writes messages to the capped collection.@data-fair/lib-express/ws-server— runs aWebSocketServerattached to the HTTP server. Uses a MongoDB tailable cursor to watch for new messages and forwards them to subscribed browser clients. Handles subscribe/unsubscribe protocol and authorization.@data-fair/lib-vue/ws(useWS) — Vue composable that manages aReconnectingWebSocketconnection, tracks subscriptions reactively, and auto-cleans up viaonScopeDispose.@data-fair/lib-node/ws-client(WsClient/DataFairWsClient) — Node.js WebSocket client with auto-reconnect, JSON parsing, and channel subscriptions. Designed for programmatic use in workers and integration tests.
Step-by-step Integration Guide
1. Server-side: start the WS server
In your Express API entry point (typically server.ts):
import * as wsServer from '@data-fair/lib-express/ws-server.js'
import * as wsEmitter from '@data-fair/lib-node/ws-emitter.js'
// Inside your start() function, after creating the HTTP server:
await wsServer.start(server, mongo.db, async (channel, sessionState) => {
// Authorization callback — return true if the user may subscribe to `channel`.
// Parse the channel string to extract the resource and check permissions.
// sessionState is the user's session from @data-fair/lib-express session middleware.
// Admin mode users bypass this check automatically in ws-server.
return myAuthCheck(channel, sessionState)
})
await wsEmitter.init(mongo.db)
On shutdown:
await wsServer.stop()
The canSubscribe callback receives (channel: string, sessionState: SessionState, message: any).
It is the place to enforce per-channel authorization. Common patterns:
- User-scoped channels (
user:{userId}:notifications): checksessionState.user.id === ownerId - Resource-scoped channels (
things/{thingId}/updates): load the resource, check the user's permission profile
Read references/server-examples.md for full examples from events and processings.
2. Server-side: emit events
Anywhere you need to push data (API routes, workers, background tasks):
import * as wsEmitter from '@data-fair/lib-node/ws-emitter.js'
// wsEmitter.init(db) must have been called first in the same process.
await wsEmitter.emit('things/abc123/updated', { status: 'done', progress: 100 })
The emitter writes a document { type: 'message', channel, data, date } into the
ws-messages capped collection. The ws-server's tailable cursor picks it up and
forwards it to all clients subscribed to that channel.
Because the bus is MongoDB, any process connected to the same database can emit. This is how workers/background tasks push updates to the API server's WS clients.
3. Channel naming conventions
Use colon or slash-separated hierarchical names that encode the authorization scope:
| Pattern | Use case |
|---|---|
user:{userId}:notifications |
Per-user channels (events service) |
things/{thingId}/updates |
Per-resource channels (general) |
things/{thingId}/run-log |
Sub-resource event streams (processings) |
The first segment(s) should let canSubscribe extract the resource identifier
and look up permissions efficiently.
4. Client-side: subscribe in Vue components
import { useWS } from '@data-fair/lib-vue/ws.js'
// or rely on auto-import if configured
const ws = useWS('/my-service/api/')
// Subscribe to a channel. The callback fires for each incoming message.
// Subscription is auto-cleaned when the Vue scope is disposed.
ws?.subscribe<MyDataType>('things/abc123/updates', (data) => {
// data is the payload passed to wsEmitter.emit()
applyUpdate(data)
})
Key behaviors of useWS:
- Converts the path to
ws:///wss://based onwindow.location.origin - Uses
ReconnectingWebSocket— auto-reconnects and re-subscribes on reconnect - One singleton connection per path (multiple
useWS('/same/')calls share it) onScopeDisposeauto-unsubscribes — no manual cleanup needed in most cases- Manual unsubscribe:
ws?.unsubscribe(channel, listener)when needed outside of scope disposal
5. Wire protocol
The WS connection uses a simple JSON protocol:
Client to server:
{"type": "subscribe", "channel": "my_channel"}
{"type": "unsubscribe", "channel": "my_channel"}
Server to client:
{"type": "subscribe-confirm", "channel": "my_channel"}
{"type": "unsubscribe-confirm", "channel": "my_channel"}
{"type": "message", "channel": "my_channel", "data": {...}, "date": "2025-01-01T00:00:00.000Z"}
{"type": "error", "status": 400, "data": "error message", "channel": "my_channel"}
The Message type is defined in @data-fair/lib-common-types/ws:
interface Message { type: string; channel: string; data?: any; status?: number }
6. Nginx / proxy configuration
WebSocket upgrade headers must be set in your reverse proxy:
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
7. Node.js client: @data-fair/lib-node/ws-client
The ws-client module provides two classes for consuming websockets from Node.js
(not from the browser). This is essential for integration tests and for
programmatic use in workers or CLI tools.
WsClient — generic WS client
import { WsClient } from '@data-fair/lib-node/ws-client.js'
const client = new WsClient({
url: 'http://localhost:8080', // HTTP URL — auto-converted to ws://
headers: { Cookie: '...' }, // optional: forward auth cookies
apiKey: 'my-api-key', // optional: sent in subscribe messages
adminMode: true, // optional: enables admin mode
account: { type: 'organization', id: 'org1', name: 'Org 1' }, // optional
log: console // optional: defaults to console
})
Key methods:
-
subscribe(channel, force?, timeout?)— Subscribes to a channel. Opens the WS connection on first call. Sends a subscribe message includingapiKeyandaccountif configured. Waits forsubscribe-confirmor throws onerror.await client.subscribe('things/abc123/updates') -
waitFor(channel, filter?, timeout?, skipSubscribe?, fullMessage?)— Subscribes (unlessskipSubscribe) then returns a promise that resolves when a matching message arrives on the channel, or rejects on timeout (default 5 minutes).// Wait for a specific event type const event = await client.waitFor( 'things/abc123/updates', (data) => data.status === 'done', 10000 // 10s timeout ) -
close()— Terminates the underlying WebSocket connection.
Auto-reconnect: if the connection drops, WsClient automatically reconnects
and re-subscribes to all previously subscribed channels.
DataFairWsClient — specialized for Data Fair datasets
Extends WsClient with a convenience method for waiting on dataset journal events:
import { DataFairWsClient } from '@data-fair/lib-node/ws-client.js'
const ws = new DataFairWsClient({
url: 'http://localhost:8080',
apiKey: 'my-api-key',
log: console
})
// Wait for a dataset to finish indexing
const event = await ws.waitForJournal(datasetId, 'finalize-end', 60000)
// Throws if an 'error' event arrives before the expected event
Integration testing with WsClient
The WsClient is the primary tool for writing integration tests that verify
websocket behavior end-to-end. Typical pattern:
import { WsClient } from '@data-fair/lib-node/ws-client.js'
import * as wsEmitter from '@data-fair/lib-node/ws-emitter.js'
describe('websocket integration', () => {
let client: WsClient
beforeAll(async () => {
// Point at the test server
client = new WsClient({
url: 'http://localhost:' + testPort,
headers: { Cookie: testSessionCookie } // or use apiKey
})
})
afterAll(() => {
client.close()
})
it('should receive emitted events', async () => {
// Subscribe and set up a waiter BEFORE emitting
const eventPromise = client.waitFor(
'things/abc123/updates',
(data) => data.status === 'done'
)
// Trigger the action that emits
await wsEmitter.emit('things/abc123/updates', { status: 'done', result: 42 })
// Assert the received event
const event = await eventPromise
expect(event.status).toBe('done')
expect(event.result).toBe(42)
})
it('should reject unauthorized subscriptions', async () => {
const unauthorizedClient = new WsClient({
url: 'http://localhost:' + testPort
// no auth headers
})
try {
await unauthorizedClient.subscribe('private/channel')
throw new Error('should have thrown')
} catch (err) {
expect(err.message).toMatch(/Permission/)
} finally {
unauthorizedClient.close()
}
})
})
Testing tips:
- Always call
waitFor()orsubscribe()before the action that emits — otherwise the event may fire before the subscription is active. - Use short timeouts in tests (e.g. 5000ms) to fail fast instead of the default 5min.
- Call
client.close()inafterAll/afterEachto avoid hanging connections. - The
WsClientsendsapiKeyandaccountin the subscribe message payload, which the server'scanSubscribecallback receives asmessage.apiKeyandmessage.account. This allows tests to authenticate without cookies.
Common Pitfalls
- Forgetting
wsEmitter.init(db)in workers: each process that emits must callinit()with its own MongoDBdbhandle. The API server AND the worker both need it. - canSubscribe not parsing the channel: the channel string is the only input for authorization. Design channels so the auth callback can extract the resource ID without extra lookups when possible.
- SSR guard:
useWSchecksimport.meta.env?.SSRand bails out during server-side rendering. No action needed, but be aware it returnsundefinedin SSR — always use optional chaining (ws?.subscribe).
Reference Files
references/server-examples.md— Full canSubscribe and emit examples from the events and processings services, plus integration test patterns usingWsClient. Read this when implementing a new service's WS layer or writing tests.references/source-api.md— Abridged source of the four library modules (ws-server, ws-emitter, useWS, ws-client). Read this only if you need to understand internal behavior or debug an issue.