Реальное время на Node.js: WebSockets и MongoDB вместо бесконечного polling
В 2015 году мы делали дашборд для отслеживания заказов в реальном времени. Первая версия обновлялась через AJAX-опрос каждые 5 секунд: клиент спрашивал «есть новые данные?», сервер отвечал - и так по кругу.
При 100 одновременных пользователях это давало 1200 запросов в минуту. Большинство возвращали один и тот же ответ - «ничего нового».
Правильное решение: не клиент спрашивает сервер, а сервер сообщает клиенту, когда что-то изменилось.
Почему Node.js для real-time
PHP/Apache создаёт один процесс или поток на каждое соединение. WebSocket - это постоянное долгоживущее соединение: при 200 пользователях = 200 открытых соединений одновременно. Поток на соединение не масштабируется.
Node.js работает на одном потоке с событийным циклом. Он обрабатывает тысячи соединений без создания потоков - через неблокирующий I/O и коллбэки:
// Псевдокод событийного цикла Node.js
while (true) {
событие = очередь.взять();
if (событие) {
событие.коллбэк(); // выполнить и сразу вернуться
// Не ждём - если коллбэк делает I/O,
// регистрируем следующий коллбэк и идём дальше
}
}
Ограничение: коллбэки не должны блокировать поток. Тяжёлые CPU-операции (обработка изображений, шифрование больших данных) заморозят все соединения. Node.js отлично подходит для I/O-нагрузки, плохо - для CPU.
Socket.io: комнаты и пространства имён
// server.js (Node.js 0.12, Socket.io 1.3)
var app = require('express')();
var http = require('http').Server(app);
var io = require('socket.io')(http);
io.on('connection', function(socket) {
console.log('Подключился клиент:', socket.id);
// Клиент подписывается на нужный раздел дашборда
socket.on('subscribe', function(section) {
socket.leaveAll();
socket.join('dashboard:' + section);
// Сразу отправляем текущее состояние
getDashboardData(section, function(err, data) {
socket.emit('update', data);
});
});
socket.on('disconnect', function() {
console.log('Клиент отключился:', socket.id);
});
});
// Когда данные изменились - рассылаем всем в нужной комнате
function broadcastUpdate(section, data) {
io.to('dashboard:' + section).emit('update', data);
}
http.listen(3000);
На клиенте:
var socket = io();
socket.on('connect', function() {
socket.emit('subscribe', 'orders'); // Подписываемся на заказы
});
socket.on('update', function(data) {
refreshTable(data); // Обновляем таблицу без перезагрузки
});
socket.on('disconnect', function() {
showReconnectingSpinner();
// Socket.io переподключится автоматически
});
MongoDB: слушаем oplog
MongoDB записывает все операции записи в oplog - специальную коллекцию в базе local. Можно подписаться на этот поток и реагировать на изменения в реальном времени.
В 2015 году Change Streams ещё не было (появились в MongoDB 3.6). Использовали tailable cursor на oplog:
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect('mongodb://localhost:27017/local', function(err, db) {
var oplog = db.collection('oplog.rs');
// Берём текущую позицию в логе
oplog.find({}).sort({ $natural: -1 }).limit(1).toArray(function(err, docs) {
var lastTs = docs[0].ts;
// Tailable cursor - остаётся открытым и возвращает новые записи по мере поступления
var cursor = oplog.find({
ts: { $gt: lastTs },
ns: 'myapp.orders' // Следим только за коллекцией orders
}, {
tailable: true,
awaitdata: true,
numberOfRetries: -1,
tailableRetryInterval: 200
});
cursor.each(function(err, doc) {
if (!doc) return;
if (doc.op === 'i') {
// Новый заказ создан
handleNewOrder(doc.o);
} else if (doc.op === 'u') {
// Заказ обновлён
handleOrderUpdate(doc.o2._id, doc.o.$set);
}
});
});
});
function handleNewOrder(order) {
broadcastUpdate('orders', { type: 'new', order: order });
}
Каждый новый заказ в MongoDB немедленно триггерил broadcast всем клиентам дашборда. Задержка - меньше 50 миллисекунд вместо 0–5 секунд при polling.
Несколько экземпляров Node.js и Redis Pub/Sub
Node.js однопоточный - не использует все ядра CPU. Запускаем несколько процессов через PM2:
# ecosystem.config.js
apps: [{
name: 'dashboard',
script: 'server.js',
instances: 4, # По одному на каждое ядро
exec_mode: 'cluster'
}]
Проблема: WebSocket-соединение от клиента А попадает в процесс 1. Broadcast из процесса 2 до клиента А не дойдёт.
Решение - Redis Pub/Sub как шина сообщений между процессами:
var redis = require('redis');
var sub = redis.createClient();
var pub = redis.createClient();
// Каждый процесс подписывается на канал Redis
sub.subscribe('broadcast');
sub.on('message', function(channel, message) {
var payload = JSON.parse(message);
// Отправляем локальным клиентам этого процесса
io.to(payload.room).emit('update', payload.data);
});
// При изменении данных - публикуем в Redis
// Redis доставляет всем процессам, каждый отправляет своим клиентам
function broadcastUpdate(section, data) {
pub.publish('broadcast', JSON.stringify({
room: 'dashboard:' + section,
data: data
}));
}
У Socket.io был готовый адаптер socket.io-redis, который делал это под капотом - но понимание механизма pub/sub было ценным само по себе.
Результат
| Метрика | Polling (AJAX каждые 5 сек) | WebSockets |
|---|---|---|
| Запросов к базе в минуту (100 пользователей) | 1200 | ~5 (только при реальных изменениях) |
| Задержка обновления | 0–5 секунд | < 100ms |
| RAM сервера при 200 соединениях | 480MB (PHP-FPM воркеры) | 90MB |
| CPU в простое (нет новых данных) | 40% | 2% |
Ошибки, которые мы допустили
Утечки памяти. PHP перезапускается после каждого запроса - утечки не накапливаются. Node.js работает сутками. Через 3 дня один из процессов занял 6GB и был убит OOM-killer'ом. Нашли утечку через node --inspect и Chrome DevTools: замыкание внутри функции слежения за oplog держало ссылку на растущий массив.
Unhandled Promise rejections. В Node.js 0.12 необработанные ошибки в промисах давали только предупреждение в консоли - процесс продолжал работать. Мы несколько раз ловили ситуацию «broadcast перестал работать» - причиной была непойманная ошибка в цепочке промисов. Добавили .catch() везде.
Событийная модель Node.js сложнее для отладки, чем синхронный PHP. Но возможности масштабирования real-time функциональности несопоставимы.