realtime-analytics
SKILL.md
Real-Time Analytics
Overview
This skill enables AI agents to build self-hosted, real-time analytics systems. It covers the full pipeline from event ingestion through storage to query and visualization, using ClickHouse as the analytical database for sub-second query performance at scale.
Instructions
Event Schema Design
-
Every event must have these base fields:
event_name— LowCardinality(String) for efficient storagetimestamp— DateTime64(3) for millisecond precisionsession_id— String, client-generated UUIDuser_id— Nullable(String) for anonymous trackingdevice_type— LowCardinality(String): desktop, mobile, tabletcountry_code— LowCardinality(FixedString(2))properties— String containing JSON for event-specific data
-
ClickHouse table optimization rules:
- Use
MergeTree()engine, partition bytoYYYYMM(date) - ORDER BY should start with the most filtered column (usually
event_name) - Add TTL for automatic data expiration (default 90 days)
- Use
LowCardinality()for any string column with fewer than 10,000 distinct values
- Use
Ingestion Service
- Build as a stateless HTTP service accepting
POST /eventswith JSON array body. - Validate incoming events: reject if
event_nameortimestampis missing. - Buffer events in memory. Flush when either condition is met:
- Buffer reaches 1,000 events
- 2 seconds have elapsed since last flush
- Use ClickHouse's
INSERT ... FORMAT JSONEachRowfor batch inserts. - On flush failure, retry 3 times with exponential backoff, then write to a dead-letter file.
- Expose
GET /healthreturning:{ "buffer_size": N, "last_flush": "ISO timestamp", "status": "ok" }.
Aggregation Queries
Write queries as named .sql files. Common dashboard panels:
Active users (last N minutes):
SELECT count(DISTINCT session_id) AS active_users
FROM events
WHERE timestamp > now() - INTERVAL 5 MINUTE;
Funnel analysis (two-step):
SELECT
toStartOfHour(timestamp) AS hour,
countIf(event_name = 'signup_started') AS started,
countIf(event_name = 'signup_completed') AS completed,
round(completed / started * 100, 1) AS conversion_pct
FROM events
WHERE date = today()
GROUP BY hour
ORDER BY hour;
Top features by unique users:
SELECT
JSONExtractString(properties, 'feature') AS feature,
count(DISTINCT user_id) AS unique_users
FROM events
WHERE event_name = 'feature_used' AND date = today()
GROUP BY feature
ORDER BY unique_users DESC
LIMIT 10;
Error rate per minute:
SELECT
toStartOfMinute(timestamp) AS minute,
count() AS error_count
FROM events
WHERE event_name = 'error_occurred'
AND timestamp > now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;
Dashboard API Layer
- Create one REST endpoint per dashboard panel (e.g.,
GET /api/dashboard/active-users). - Each endpoint runs its SQL query and returns JSON.
- Add a
time_rangequery param:1h,6h,24h,7d. Adjust the WHERE clause accordingly. - Cache results for 3-5 seconds to avoid hammering ClickHouse on concurrent dashboard loads.
Docker Compose Setup
Provide a docker-compose.yml with:
- ClickHouse service (pinned version, volume for data)
- Ingestion service (builds from Dockerfile)
- Dashboard frontend (builds from Dockerfile or served via nginx)
- An init container or entrypoint script that creates the events table
Examples
Example 1 — Schema for an e-commerce app
Input: "Design analytics events for an e-commerce checkout flow."
Output:
-- Core events: product_viewed, add_to_cart, checkout_started,
-- payment_submitted, order_completed, order_failed
CREATE TABLE events (
event_name LowCardinality(String),
timestamp DateTime64(3),
session_id String,
user_id Nullable(String),
device_type LowCardinality(String),
country_code LowCardinality(FixedString(2)),
properties String,
date Date DEFAULT toDate(timestamp)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (event_name, timestamp)
TTL date + INTERVAL 180 DAY;
Properties JSON per event:
product_viewed:{"product_id": "p-42", "category": "electronics", "price_cents": 4999}add_to_cart:{"product_id": "p-42", "quantity": 1}payment_submitted:{"method": "card", "amount_cents": 9998}
Example 2 — Ingestion buffer logic
Input: "Show the buffer flush logic in Node.js."
Output:
const FLUSH_INTERVAL_MS = 2000;
const FLUSH_SIZE = 1000;
let buffer = [];
async function addEvents(events) {
buffer.push(...events);
if (buffer.length >= FLUSH_SIZE) await flush();
}
setInterval(async () => {
if (buffer.length > 0) await flush();
}, FLUSH_INTERVAL_MS);
async function flush() {
const batch = buffer.splice(0, buffer.length);
const rows = batch.map(e => JSON.stringify(e)).join('\n');
await clickhouse.insert({
table: 'events',
values: batch,
format: 'JSONEachRow',
});
}
Guidelines
- Partition wisely. Monthly partitions work for most apps. Daily only if you ingest > 100M events/day.
- Do not use Kafka unless asked. For under 50K events/second, direct HTTP ingestion with in-memory buffering is simpler and sufficient.
- Always add TTL. Unbounded analytical tables grow fast. Default to 90 days; let the user override.
- Test with realistic volume. Generate synthetic events to validate the pipeline handles expected throughput before going live.
- *Avoid SELECT . Always specify columns in aggregation queries to minimize I/O.
Weekly Installs
1
Repository
terminalskills/skillsGitHub Stars
17
First Seen
5 days ago
Security Audits
Installed on
amp1
cline1
augment1
opencode1
cursor1
kimi-cli1