О насБлогКонтакты
Backend20 июля 2015 г. 4 мин 14

Реальное время на Node.js: WebSockets и MongoDB вместо бесконечного polling

AunimedaAunimeda
📋 Содержание

Реальное время на Node.js: WebSockets и MongoDB вместо бесконечного polling

В 2015 году мы делали дашборд для отслеживания заказов в реальном времени. Первая версия обновлялась через AJAX-опрос каждые 5 секунд: клиент спрашивал «есть новые данные?», сервер отвечал - и так по кругу.

При 100 одновременных пользователях это давало 1200 запросов в минуту. Большинство возвращали один и тот же ответ - «ничего нового».

Правильное решение: не клиент спрашивает сервер, а сервер сообщает клиенту, когда что-то изменилось.


Почему Node.js для real-time

PHP/Apache создаёт один процесс или поток на каждое соединение. WebSocket - это постоянное долгоживущее соединение: при 200 пользователях = 200 открытых соединений одновременно. Поток на соединение не масштабируется.

Node.js работает на одном потоке с событийным циклом. Он обрабатывает тысячи соединений без создания потоков - через неблокирующий I/O и коллбэки:

// Псевдокод событийного цикла Node.js
while (true) {
    событие = очередь.взять();
    if (событие) {
        событие.коллбэк(); // выполнить и сразу вернуться
        // Не ждём - если коллбэк делает I/O,
        // регистрируем следующий коллбэк и идём дальше
    }
}

Ограничение: коллбэки не должны блокировать поток. Тяжёлые CPU-операции (обработка изображений, шифрование больших данных) заморозят все соединения. Node.js отлично подходит для I/O-нагрузки, плохо - для CPU.


Socket.io: комнаты и пространства имён

// server.js (Node.js 0.12, Socket.io 1.3)
var app = require('express')();
var http = require('http').Server(app);
var io = require('socket.io')(http);

io.on('connection', function(socket) {
    console.log('Подключился клиент:', socket.id);

    // Клиент подписывается на нужный раздел дашборда
    socket.on('subscribe', function(section) {
        socket.leaveAll();
        socket.join('dashboard:' + section);

        // Сразу отправляем текущее состояние
        getDashboardData(section, function(err, data) {
            socket.emit('update', data);
        });
    });

    socket.on('disconnect', function() {
        console.log('Клиент отключился:', socket.id);
    });
});

// Когда данные изменились - рассылаем всем в нужной комнате
function broadcastUpdate(section, data) {
    io.to('dashboard:' + section).emit('update', data);
}

http.listen(3000);

На клиенте:

var socket = io();

socket.on('connect', function() {
    socket.emit('subscribe', 'orders'); // Подписываемся на заказы
});

socket.on('update', function(data) {
    refreshTable(data); // Обновляем таблицу без перезагрузки
});

socket.on('disconnect', function() {
    showReconnectingSpinner();
    // Socket.io переподключится автоматически
});

MongoDB: слушаем oplog

MongoDB записывает все операции записи в oplog - специальную коллекцию в базе local. Можно подписаться на этот поток и реагировать на изменения в реальном времени.

В 2015 году Change Streams ещё не было (появились в MongoDB 3.6). Использовали tailable cursor на oplog:

var MongoClient = require('mongodb').MongoClient;

MongoClient.connect('mongodb://localhost:27017/local', function(err, db) {
    var oplog = db.collection('oplog.rs');

    // Берём текущую позицию в логе
    oplog.find({}).sort({ $natural: -1 }).limit(1).toArray(function(err, docs) {
        var lastTs = docs[0].ts;

        // Tailable cursor - остаётся открытым и возвращает новые записи по мере поступления
        var cursor = oplog.find({
            ts: { $gt: lastTs },
            ns: 'myapp.orders'   // Следим только за коллекцией orders
        }, {
            tailable: true,
            awaitdata: true,
            numberOfRetries: -1,
            tailableRetryInterval: 200
        });

        cursor.each(function(err, doc) {
            if (!doc) return;

            if (doc.op === 'i') {
                // Новый заказ создан
                handleNewOrder(doc.o);
            } else if (doc.op === 'u') {
                // Заказ обновлён
                handleOrderUpdate(doc.o2._id, doc.o.$set);
            }
        });
    });
});

function handleNewOrder(order) {
    broadcastUpdate('orders', { type: 'new', order: order });
}

Каждый новый заказ в MongoDB немедленно триггерил broadcast всем клиентам дашборда. Задержка - меньше 50 миллисекунд вместо 0–5 секунд при polling.


Несколько экземпляров Node.js и Redis Pub/Sub

Node.js однопоточный - не использует все ядра CPU. Запускаем несколько процессов через PM2:

# ecosystem.config.js
apps: [{
    name: 'dashboard',
    script: 'server.js',
    instances: 4,         # По одному на каждое ядро
    exec_mode: 'cluster'
}]

Проблема: WebSocket-соединение от клиента А попадает в процесс 1. Broadcast из процесса 2 до клиента А не дойдёт.

Решение - Redis Pub/Sub как шина сообщений между процессами:

var redis = require('redis');
var sub = redis.createClient();
var pub = redis.createClient();

// Каждый процесс подписывается на канал Redis
sub.subscribe('broadcast');

sub.on('message', function(channel, message) {
    var payload = JSON.parse(message);
    // Отправляем локальным клиентам этого процесса
    io.to(payload.room).emit('update', payload.data);
});

// При изменении данных - публикуем в Redis
// Redis доставляет всем процессам, каждый отправляет своим клиентам
function broadcastUpdate(section, data) {
    pub.publish('broadcast', JSON.stringify({
        room: 'dashboard:' + section,
        data: data
    }));
}

У Socket.io был готовый адаптер socket.io-redis, который делал это под капотом - но понимание механизма pub/sub было ценным само по себе.


Результат

Метрика Polling (AJAX каждые 5 сек) WebSockets
Запросов к базе в минуту (100 пользователей) 1200 ~5 (только при реальных изменениях)
Задержка обновления 0–5 секунд < 100ms
RAM сервера при 200 соединениях 480MB (PHP-FPM воркеры) 90MB
CPU в простое (нет новых данных) 40% 2%

Ошибки, которые мы допустили

Утечки памяти. PHP перезапускается после каждого запроса - утечки не накапливаются. Node.js работает сутками. Через 3 дня один из процессов занял 6GB и был убит OOM-killer'ом. Нашли утечку через node --inspect и Chrome DevTools: замыкание внутри функции слежения за oplog держало ссылку на растущий массив.

Unhandled Promise rejections. В Node.js 0.12 необработанные ошибки в промисах давали только предупреждение в консоли - процесс продолжал работать. Мы несколько раз ловили ситуацию «broadcast перестал работать» - причиной была непойманная ошибка в цепочке промисов. Добавили .catch() везде.

Событийная модель Node.js сложнее для отладки, чем синхронный PHP. Но возможности масштабирования real-time функциональности несопоставимы.

Читайте также

Почему мы выбрали Node.js в 2011 годуaunimeda
Backend

Почему мы выбрали Node.js в 2011 году

В 2011 году Node.js было 2 года, у него не было LTS, и большинство PHP-разработчиков считали его игрушкой. Мы всё равно перенесли на него реал-тайм дашборд. Вот точное рассуждение, принятые риски и что получилось.

Революция Node.js (2011): как JavaScript завоевал серверaunimeda
Backend

Революция Node.js (2011): как JavaScript завоевал сервер

Райан Даль показал Node.js на JSConf в 2009. К 2011 мы запускали его в production. Неблокирующий I/O, npm и осознание, что один язык может работать везде, изменили наём, разработку и мышление.

PostgreSQL на VPS: от 10 до 1000 запросов в секунду без смены железаaunimeda
Backend

PostgreSQL на VPS: от 10 до 1000 запросов в секунду без смены железа

Практическое руководство по настройке PostgreSQL на VPS с 4 CPU и 8 ГБ RAM: правильные параметры postgresql.conf с расчётами, частичные и covering индексы, настройка autovacuum, PgBouncer для connection pooling. Реальные цифры: 23 мс → 1.2 мс на запрос.

Нужна IT-разработка для вашего бизнеса?

Разрабатываем сайты, мобильные приложения и AI-решения для бизнеса в Кыргызстане. Бесплатная консультация.

Получить консультацию Все статьи