WebSocket-сервер на 100,000 соединений: архитектура и реальные подводные камни
Когда мы запустили систему real-time уведомлений, всё работало отлично на тестах. 500 соединений, latency 5ms, память стабильная. Потом пришёл production — 15,000 одновременных соединений, и сервер начал есть 3GB RAM. При 30,000 соединений процесс падал с OOM. Мы потратили неделю чтобы понять почему — и всё оказалось в неочищенных обработчиках событий и неправильном выборе библиотеки.
Почему стандартный Node.js + ws не масштабируется горизонтально
Первая проблема: ws — хорошая библиотека, но медленная по меркам low-level WebSocket серверов. Каждое соединение в ws — это отдельный объект JavaScript с несколькими слоями абстракции над нативными сокетами.
Вторая проблема — горизонтальное масштабирование. Когда клиент подключается к WebSocket серверу через load balancer, он должен попасть на тот же инстанс при реконнекте. Это называется sticky sessions. Без них клиент может попасть на инстанс, который ничего не знает о его сессии.
Третья проблема — broadcast. Если у вас 5 инстансов, и вы хотите отправить сообщение всем подключённым пользователям — каждый инстанс знает только о своих соединениях. Вам нужен механизм межпроцессного общения.
[Client 1] ──┐
[Client 2] ──┤── [Load Balancer] ──┬── [Node Instance 1: clients 1,2,3]
[Client 3] ──┘ ├── [Node Instance 2: clients 4,5,6]
[Client 4] ──┐ └── [Node Instance 3: clients 7,8,9]
[Client 5] ──┤── [Load Balancer] ──┘
...
Когда Instance 1 хочет отправить broadcast — Instances 2 и 3 должны об этом узнать.
uWebSockets.js vs ws: реальные цифры
uWebSockets.js (uWS) — это Node.js binding для C++ библиотеки uWebSockets. Она в 10-20 раз быстрее чем ws в тестах на throughput.
Реальные цифры из нашего нагрузочного тестирования (сервер: 4 CPU, 8GB RAM):
| Метрика | ws | uWebSockets.js |
|---|---|---|
| Max соединений (стабильно) | ~25,000 | ~100,000+ |
| RAM на 10k соединений | ~800 MB | ~150 MB |
| Broadcast 10k клиентам | ~45 ms | ~3 ms |
| CPU при 10k echo msgs/sec | 85% | 12% |
Разница огромная. Но есть цена: uWS — это нативный модуль, его нельзя легко использовать с некоторыми инструментами, и API менее удобный.
Полный пример uWS сервера
import uWS, { WebSocket, HttpRequest, HttpResponse } from 'uWebSockets.js';
import { createClient } from 'redis';
interface ClientData {
userId: string;
subscriptions: Set<string>;
lastPing: number;
}
const PORT = 9001;
// Хранилище соединений: userId → WebSocket
const connections = new Map<string, WebSocket<ClientData>>();
// Redis для pub/sub между инстансами
const redisSubscriber = createClient({ url: process.env.REDIS_URL });
const redisPublisher = redisSubscriber.duplicate();
await redisSubscriber.connect();
await redisPublisher.connect();
const app = uWS.App();
app.ws<ClientData>('/ws', {
// Параметры соединения
compression: uWS.SHARED_COMPRESSOR,
maxPayloadLength: 16 * 1024, // 16KB
idleTimeout: 60, // Секунды без активности → закрыть
maxBackpressure: 64 * 1024, // Backpressure буфер
upgrade: (res: HttpResponse, req: HttpRequest, context) => {
// Аутентификация при upgrade — до WebSocket handshake
const token = req.getHeader('authorization').replace('Bearer ', '');
const userId = validateToken(token); // ваша логика валидации
if (!userId) {
res.writeStatus('401 Unauthorized').end('Unauthorized');
return;
}
// Upgrade с передачей user data — он будет доступен в ws.getUserData()
res.upgrade(
{ userId, subscriptions: new Set(), lastPing: Date.now() } satisfies ClientData,
req.getHeader('sec-websocket-key'),
req.getHeader('sec-websocket-protocol'),
req.getHeader('sec-websocket-extensions'),
context
);
},
open: (ws: WebSocket<ClientData>) => {
const { userId } = ws.getUserData();
// Если пользователь уже подключён с другого устройства — закрываем старое
const existing = connections.get(userId);
if (existing) {
existing.end(1008, 'Duplicate connection');
}
connections.set(userId, ws);
// Подписываем на персональный канал Redis
ws.subscribe(`user:${userId}`);
console.log(`Client connected: ${userId}, total: ${connections.size}`);
ws.send(JSON.stringify({ type: 'connected', userId }));
},
message: (ws: WebSocket<ClientData>, message: ArrayBuffer, isBinary: boolean) => {
if (isBinary) return; // Игнорируем бинарные сообщения
const data = ws.getUserData();
data.lastPing = Date.now();
try {
const msg = JSON.parse(Buffer.from(message).toString());
handleClientMessage(ws, msg);
} catch {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
}
},
close: (ws: WebSocket<ClientData>, code: number, message: ArrayBuffer) => {
const { userId } = ws.getUserData();
connections.delete(userId);
console.log(`Client disconnected: ${userId}, code: ${code}, total: ${connections.size}`);
},
// Важно: дренируем backpressure
drain: (ws: WebSocket<ClientData>) => {
// Когда клиент медленный — сообщения буферизуются
// drain вызывается когда буфер опустел
console.log(`Backpressure drained for ${ws.getUserData().userId}`);
}
});
function handleClientMessage(ws: WebSocket<ClientData>, msg: Record<string, unknown>) {
const data = ws.getUserData();
switch (msg.type) {
case 'subscribe':
if (typeof msg.channel === 'string') {
ws.subscribe(msg.channel);
data.subscriptions.add(msg.channel);
}
break;
case 'unsubscribe':
if (typeof msg.channel === 'string') {
ws.unsubscribe(msg.channel);
data.subscriptions.delete(msg.channel);
}
break;
case 'ping':
ws.send(JSON.stringify({ type: 'pong', timestamp: Date.now() }));
break;
}
}
app.listen(PORT, (token) => {
if (token) {
console.log(`WebSocket server listening on port ${PORT}`);
} else {
console.error(`Failed to listen on port ${PORT}`);
process.exit(1);
}
});
Redis Pub/Sub bridge между инстансами
Когда нужно отправить сообщение конкретному пользователю или broadcast — используем Redis как шину:
interface BroadcastMessage {
type: 'user' | 'channel' | 'broadcast';
target?: string; // userId или channel name
payload: Record<string, unknown>;
}
// Подписываемся на Redis канал этого инстанса
const INSTANCE_CHANNEL = `ws-instance:${process.env.INSTANCE_ID ?? 'local'}`;
const BROADCAST_CHANNEL = 'ws-broadcast';
async function setupRedisBridge() {
// Подписка на персональный канал инстанса И broadcast канал
await redisSubscriber.subscribe([INSTANCE_CHANNEL, BROADCAST_CHANNEL], (rawMessage) => {
try {
const msg: BroadcastMessage = JSON.parse(rawMessage);
switch (msg.type) {
case 'user':
// Отправить конкретному пользователю на ЭТОМ инстансе
if (msg.target) {
const ws = connections.get(msg.target);
if (ws) {
const result = ws.send(JSON.stringify(msg.payload));
if (result === 2) {
// 2 = BACKPRESSURE — клиент не успевает
console.warn(`Backpressure for user ${msg.target}`);
}
}
}
break;
case 'channel':
// Broadcast в канал — uWS сделает сам
if (msg.target) {
app.publish(msg.target, JSON.stringify(msg.payload), false, false);
}
break;
case 'broadcast':
// Всем соединениям на этом инстансе
for (const ws of connections.values()) {
ws.send(JSON.stringify(msg.payload), false, false);
}
break;
}
} catch (error) {
console.error('Redis bridge error:', error);
}
});
}
// API для отправки сообщений из других сервисов
export async function sendToUser(userId: string, payload: Record<string, unknown>) {
// Сначала пробуем локально (быстро)
const localWs = connections.get(userId);
if (localWs) {
localWs.send(JSON.stringify(payload));
return;
}
// Пользователь на другом инстансе — через Redis
// Нужен маппинг userId → instanceId (например, в Redis Hash)
const instanceId = await redisPublisher.hGet('user-instances', userId);
if (instanceId) {
await redisPublisher.publish(`ws-instance:${instanceId}`, JSON.stringify({
type: 'user',
target: userId,
payload
} satisfies BroadcastMessage));
}
}
export async function broadcastToAll(payload: Record<string, unknown>) {
await redisPublisher.publish(BROADCAST_CHANNEL, JSON.stringify({
type: 'broadcast',
payload
} satisfies BroadcastMessage));
}
Почему Redis становится узким местом: при большом количестве инстансов и высоком throughput broadcast'ов, Redis Pub/Sub начинает насыщаться. Мы столкнулись с этим при 50k broadcast/сек — Redis был на 90% CPU. Решение: батчинг сообщений (100ms window), переход на Redis Streams вместо Pub/Sub для надёжной доставки, или NATS/Centrifugo как специализированное решение.
Heartbeat и reconnect логика на клиенте
Нативный WebSocket API в браузере не имеет встроенного heartbeat. Если промежуточный proxy/firewall закрыл idle соединение — клиент узнает об этом только при следующей попытке отправить сообщение. Правильная клиентская реализация:
class ReconnectingWebSocket {
private ws: WebSocket | null = null;
private url: string;
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
private pingInterval: ReturnType<typeof setInterval> | null = null;
private pingTimeout: ReturnType<typeof setTimeout> | null = null;
private messageHandlers = new Map<string, Set<(data: unknown) => void>>();
private isIntentionallyClosed = false;
constructor(url: string) {
this.url = url;
}
connect() {
this.isIntentionallyClosed = false;
this.createConnection();
}
private createConnection() {
// Очищаем предыдущее соединение
if (this.ws) {
this.ws.onclose = null; // Важно! Иначе получим лишний reconnect
this.ws.close();
}
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('WebSocket connected');
this.reconnectAttempts = 0;
this.startHeartbeat();
this.emit('connected', null);
};
this.ws.onmessage = (event) => {
try {
const msg = JSON.parse(event.data);
// Обрабатываем pong от сервера — сбрасываем таймаут
if (msg.type === 'pong') {
this.clearPingTimeout();
return;
}
this.emit(msg.type, msg);
} catch {
console.error('Failed to parse WS message');
}
};
this.ws.onclose = (event) => {
this.stopHeartbeat();
if (this.isIntentionallyClosed) return;
console.log(`WebSocket closed: ${event.code} ${event.reason}`);
this.emit('disconnected', { code: event.code });
this.scheduleReconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
// onclose будет вызван после onerror — там и reconnect
};
}
private startHeartbeat() {
this.pingInterval = setInterval(() => {
if (this.ws?.readyState !== WebSocket.OPEN) return;
this.ws.send(JSON.stringify({ type: 'ping' }));
// Если pong не придёт за 5 секунд — считаем соединение мёртвым
this.pingTimeout = setTimeout(() => {
console.warn('Ping timeout — forcing reconnect');
this.ws?.close(4000, 'Ping timeout');
}, 5000);
}, 25000); // Ping каждые 25 секунд
}
private stopHeartbeat() {
if (this.pingInterval) {
clearInterval(this.pingInterval);
this.pingInterval = null;
}
this.clearPingTimeout();
}
private clearPingTimeout() {
if (this.pingTimeout) {
clearTimeout(this.pingTimeout);
this.pingTimeout = null;
}
}
private scheduleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error('Max reconnect attempts reached');
this.emit('failed', null);
return;
}
// Экспоненциальный backoff с jitter: [1s, 2s, 4s, 8s, ..., 30s] + random
const base = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
const jitter = Math.random() * 1000; // До 1 секунды jitter
const delay = base + jitter;
this.reconnectAttempts++;
console.log(`Reconnecting in ${Math.round(delay)}ms (attempt ${this.reconnectAttempts})`);
setTimeout(() => {
if (!this.isIntentionallyClosed) {
this.createConnection();
}
}, delay);
}
send(type: string, data: Record<string, unknown>) {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify({ type, ...data }));
} else {
console.warn('WebSocket not connected, message dropped');
}
}
on(event: string, handler: (data: unknown) => void) {
if (!this.messageHandlers.has(event)) {
this.messageHandlers.set(event, new Set());
}
this.messageHandlers.get(event)!.add(handler);
// Возвращаем функцию отписки — важно для избежания memory leak!
return () => {
this.messageHandlers.get(event)?.delete(handler);
};
}
private emit(event: string, data: unknown) {
this.messageHandlers.get(event)?.forEach(handler => handler(data));
}
disconnect() {
this.isIntentionallyClosed = true;
this.stopHeartbeat();
this.ws?.close(1000, 'Normal closure');
}
}
Memory leak на неочищенных обработчиках событий
Это убило наш первый production-деплой. Каждый раз когда клиент переподключался, мы добавляли новый обработчик событий, но не удаляли старый. За ночь — тысячи подключений/переподключений, и Node.js process ел 4GB RAM.
// НЕПРАВИЛЬНО: утечка памяти
class NotificationManager {
private wsClient: ReconnectingWebSocket;
init() {
// Каждый вызов добавляет ещё один обработчик!
this.wsClient.on('notification', (data) => {
this.handleNotification(data);
});
// После reconnect снова вызываем init() → ещё один обработчик
this.wsClient.on('connected', () => {
this.init(); // Рекурсивная утечка
});
}
}
// ПРАВИЛЬНО: храним unsubscribe функции
class NotificationManager {
private wsClient: ReconnectingWebSocket;
private cleanup: Array<() => void> = [];
init() {
// Сначала очищаем старые обработчики
this.destroy();
this.cleanup.push(
this.wsClient.on('notification', (data) => {
this.handleNotification(data);
}),
this.wsClient.on('connected', () => {
this.onReconnect();
})
);
}
destroy() {
this.cleanup.forEach(unsub => unsub());
this.cleanup = [];
}
}
В React-компонентах это особенно критично — очищайте в useEffect cleanup:
useEffect(() => {
const unsubscribe = wsClient.on('message', handleMessage);
return () => unsubscribe(); // Вызовется при размонтировании компонента
}, []);
Итоги: что выбрать на старте
- До 5,000 соединений и не критична latency:
ws+ Redis Pub/Sub, просто и надёжно - До 50,000 соединений:
uWebSockets.js+ Redis Pub/Sub, сложнее в отладке но держит нагрузку - Больше 50,000 соединений: рассмотрите специализированные решения — Centrifugo (Go), NATS, или Pusher/Ably как managed service
- Sticky sessions обязательны при нескольких инстансах (настройки Nginx:
ip_hashилиhash $cookie_session)
Строите real-time систему или хотите понять как масштабировать существующую? Aunimeda проектирует высоконагруженные backend-системы. Расскажите нам о вашем проекте.