О насБлогКонтакты
Backend Development17 апреля 2026 г. 9 мин 2

Event-Driven Architecture в Node.js: Outbox Pattern, Kafka и гарантии доставки

AunimedaAunimeda
📋 Содержание

Event-Driven Architecture в Node.js: Outbox Pattern, Kafka и гарантии доставки

Расскажу про баг который мы поймали в production. Финтех проект, интеграция с платёжным шлюзом. Пользователь оплатил — записываем в БД payment.status = 'paid', потом публикуем событие в Kafka, потом сервис уведомлений отправляет SMS. Схема простая.

Однажды ночью упал Kafka брокер на 8 минут. За это время 47 платежей записались в БД как оплаченные, но события в Kafka не ушли. SMS не отправились. Пользователи не получили подтверждения. Несколько человек написали в поддержку что думают что деньги списались без оплаты.

Проблема называется двойная запись (dual write). Вы пишете в два места (БД + Kafka), и гарантии что оба успешно завершатся — нет. Outbox Pattern решает это на уровне архитектуры.


Проблема двойной записи

// ОПАСНЫЙ КОД — двойная запись без гарантий
async function processPayment(paymentId: string) {
  // Шаг 1: записываем в БД
  await db.payment.update({
    where: { id: paymentId },
    data: { status: 'paid', paidAt: new Date() },
  });

  // Шаг 2: публикуем событие
  // Если здесь упадёт — событие потеряно, БД уже обновлена
  await kafka.producer.send({
    topic: 'payments',
    messages: [{ value: JSON.stringify({ paymentId, event: 'payment.completed' }) }],
  });
}

Варианты что может пойти не так:

  • Kafka недоступен (8 минут как в нашем случае)
  • Network timeout между сервисом и Kafka
  • Приложение упало между шагом 1 и 2 (OOM kill, деплой)
  • Kafka принял сообщение но не подтвердил до таймаута

Outbox Pattern: транзакционная гарантия

Идея: вместо прямой записи в Kafka, пишем событие в таблицу outbox в той же транзакции что и основные данные. Отдельный worker читает outbox и публикует в Kafka.

-- Таблица для исходящих событий
CREATE TABLE outbox (
    id           UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_id UUID NOT NULL,         -- ID сущности (paymentId, orderId и т.д.)
    aggregate_type VARCHAR(100) NOT NULL, -- 'Payment', 'Order', 'User'
    event_type   VARCHAR(100) NOT NULL,  -- 'payment.completed', 'order.created'
    payload      JSONB NOT NULL,
    created_at   TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at TIMESTAMPTZ,           -- NULL = ещё не опубликовано
    attempts     INT NOT NULL DEFAULT 0,
    last_error   TEXT
);

-- Индекс для worker'а — читаем только неопубликованные
CREATE INDEX outbox_unpublished_idx
    ON outbox(created_at)
    WHERE published_at IS NULL;
// ПРАВИЛЬНЫЙ КОД — атомарная транзакция
async function processPayment(paymentId: string) {
  await db.$transaction(async (tx) => {
    // Шаг 1: обновляем статус платежа
    const payment = await tx.payment.update({
      where: { id: paymentId },
      data: { status: 'paid', paidAt: new Date() },
    });

    // Шаг 2: записываем событие в outbox (та же транзакция!)
    await tx.outbox.create({
      data: {
        aggregateId: paymentId,
        aggregateType: 'Payment',
        eventType: 'payment.completed',
        payload: {
          paymentId,
          userId: payment.userId,
          amount: payment.amount,
          currency: payment.currency,
          paidAt: payment.paidAt,
        },
      },
    });
    // Если транзакция откатится — ни БД, ни outbox не обновятся
    // Если успешно — оба обновятся атомарно
  });
}

Теперь либо оба записались, либо ни один. Проблема двойной записи исчезла на уровне архитектуры.


Outbox Worker: публикация в Kafka

// outbox-worker.ts
import { db } from './db';
import { kafka } from './kafka';
import type { Outbox } from '@prisma/client';

const BATCH_SIZE = 100;
const POLL_INTERVAL_MS = 1000;
const MAX_ATTEMPTS = 5;

async function publishOutboxEvents() {
  const producer = kafka.producer({
    // Идемпотентность: Kafka гарантирует exactly-once доставку на уровне producer
    idempotent: true,
    maxInFlightRequests: 5, // при idempotent: true максимум 5
  });

  await producer.connect();

  while (true) {
    try {
      await processNextBatch(producer);
    } catch (err) {
      console.error('Outbox worker error:', err);
    }
    await sleep(POLL_INTERVAL_MS);
  }
}

async function processNextBatch(producer: Producer) {
  // Читаем пачку неопубликованных событий
  const events = await db.outbox.findMany({
    where: {
      publishedAt: null,
      attempts: { lt: MAX_ATTEMPTS },
    },
    orderBy: { createdAt: 'asc' },
    take: BATCH_SIZE,
  });

  if (events.length === 0) return;

  // Группируем по топику для batch публикации
  const byTopic = groupBy(events, (e) => eventTypeToTopic(e.eventType));

  for (const [topic, topicEvents] of Object.entries(byTopic)) {
    await publishBatch(producer, topic, topicEvents);
  }
}

async function publishBatch(producer: Producer, topic: string, events: Outbox[]) {
  const messages = events.map((event) => ({
    key: event.aggregateId, // key обеспечивает ordering в рамках partition
    value: JSON.stringify({
      id: event.id,          // ID события — для идемпотентности на consumer
      type: event.eventType,
      aggregateId: event.aggregateId,
      aggregateType: event.aggregateType,
      payload: event.payload,
      occurredAt: event.createdAt,
    }),
    headers: {
      eventType: event.eventType,
      aggregateType: event.aggregateType,
    },
  }));

  try {
    await producer.send({ topic, messages });

    // Помечаем как опубликованные
    await db.outbox.updateMany({
      where: { id: { in: events.map((e) => e.id) } },
      data: { publishedAt: new Date() },
    });

    console.log(`Published ${events.length} events to topic ${topic}`);
  } catch (err) {
    // Увеличиваем счётчик попыток — после MAX_ATTEMPTS событие "застрянет"
    // и не будет мешать более новым событиям
    await db.outbox.updateMany({
      where: { id: { in: events.map((e) => e.id) } },
      data: {
        attempts: { increment: 1 },
        lastError: err instanceof Error ? err.message : String(err),
      },
    });

    throw err; // пусть worker-loop обработает
  }
}

function eventTypeToTopic(eventType: string): string {
  // 'payment.completed' → 'payments'
  // 'order.created' → 'orders'
  const prefix = eventType.split('.')[0];
  return `${prefix}s`; // payments, orders, users...
}

PostgreSQL триггер для уведомления worker'а (вместо polling)

Polling каждую секунду работает, но добавляет latency до 1 сек. Для лучшей latency — используем LISTEN/NOTIFY:

-- Триггер на INSERT в outbox
CREATE OR REPLACE FUNCTION notify_outbox_insert()
RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify('outbox_insert', NEW.id::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER outbox_insert_trigger
    AFTER INSERT ON outbox
    FOR EACH ROW
    EXECUTE FUNCTION notify_outbox_insert();
// Заменяем polling на LISTEN
import { Client } from 'pg';

const pgClient = new Client({ connectionString: process.env.DATABASE_URL });
await pgClient.connect();
await pgClient.query('LISTEN outbox_insert');

pgClient.on('notification', async (msg) => {
  if (msg.channel === 'outbox_insert') {
    // Не ждём: обрабатываем сразу
    await processNextBatch(producer).catch(console.error);
  }
});

// Всё равно делаем периодический polling как fallback
// (на случай если уведомление потерялось при перезапуске)
setInterval(() => processNextBatch(producer).catch(console.error), 30_000);

С LISTEN/NOTIFY события доставляются в Kafka за ~10ms вместо до 1000ms.


Kafka Consumer: at-least-once + идемпотентная обработка

// notification-consumer.ts
import { kafka } from './kafka';
import { redis } from './redis';
import { smsService } from './sms';

const consumer = kafka.consumer({
  groupId: 'notification-service',
});

await consumer.connect();
await consumer.subscribe({ topics: ['payments'], fromBeginning: false });

await consumer.run({
  // eachBatch vs eachMessage — eachBatch даёт больше контроля над offset commit
  eachMessage: async ({ topic, partition, message }) => {
    const event = JSON.parse(message.value!.toString());
    const eventId = event.id; // ID из outbox — уникален глобально

    // Идемпотентность: проверяем не обрабатывали ли уже
    const processed = await redis.get(`event:processed:${eventId}`);
    if (processed) {
      console.log(`Event ${eventId} already processed, skipping`);
      return; // Kafka commit произойдёт, дубликат пропущен
    }

    try {
      await handleEvent(event);

      // Помечаем как обработанное (TTL 7 дней — дольше retention Kafka)
      await redis.set(`event:processed:${eventId}`, '1', 'EX', 604800);
    } catch (err) {
      console.error(`Failed to process event ${eventId}:`, err);
      // Бросаем ошибку — Kafka не закоммитит offset
      // Consumer перечитает это сообщение
      throw err;
    }
  },
});

async function handleEvent(event: OutboxEvent) {
  switch (event.type) {
    case 'payment.completed':
      await smsService.send({
        phone: event.payload.userPhone,
        message: `Оплата на ${event.payload.amount} ${event.payload.currency} прошла успешно`,
      });
      break;

    case 'payment.failed':
      await smsService.send({
        phone: event.payload.userPhone,
        message: 'Не удалось провести оплату. Проверьте данные карты.',
      });
      break;

    default:
      console.warn(`Unknown event type: ${event.type}`);
  }
}

Exactly-once: нужно ли реально?

Короткий ответ: в большинстве случаев — нет, достаточно at-least-once + идемпотентный consumer.

Kafka Transactions и Exactly-Once Semantics (EOS) — реальная возможность, но с высокой ценой:

  • Пропускная способность падает на 20-40%
  • Сложность конфигурации и эксплуатации
  • Работает только если читаешь из Kafka и пишешь в Kafka в одной транзакции
// Exactly-once в Kafka (только Kafka → Kafka, не Kafka → PostgreSQL)
const producer = kafka.producer({
  transactionalId: 'my-transactional-producer', // уникальный ID
  idempotent: true,
});

await producer.transaction(async (tx) => {
  // Читаем offset потребителя
  await tx.sendOffsets({
    consumerGroupId: 'my-consumer',
    topics: [{ topic: 'input', partitions: [{ partition: 0, offset: '42' }] }],
  });

  // Публикуем в выходной топик
  await tx.send({
    topic: 'output',
    messages: [{ value: 'processed' }],
  });
});
// Если упадёт — оба откатятся

На практике: для большинства задач (уведомления, аналитика, синхронизация данных) — at-least-once + идемпотентность дешевле и надёжнее.


Saga Pattern: распределённые транзакции

Когда нужна "транзакция" через несколько сервисов — используйте Saga. Не двухфазный коммит (2PC) — он не масштабируется и убивает производительность.

Пример: создание заказа
1. Order Service: создать заказ (status: PENDING)
2. Inventory Service: зарезервировать товар
3. Payment Service: списать оплату
4. Order Service: подтвердить заказ (status: CONFIRMED)

При ошибке на любом шаге — компенсирующие транзакции:
- Payment failed → Inventory: отменить резервирование
- Inventory failed → Order: отменить заказ
// Choreography-based Saga — через события, без центрального оркестратора
// Order Service
async function createOrder(orderData: CreateOrderInput) {
  await db.$transaction(async (tx) => {
    const order = await tx.order.create({
      data: { ...orderData, status: 'PENDING' },
    });

    // Событие запускает цепочку Saga
    await tx.outbox.create({
      data: {
        aggregateId: order.id,
        aggregateType: 'Order',
        eventType: 'order.created',
        payload: { orderId: order.id, items: orderData.items, userId: orderData.userId },
      },
    });
  });
}

// Inventory Service слушает order.created
async function handleOrderCreated(event: Event) {
  try {
    await reserveInventory(event.payload.items);

    // Успех → следующий шаг Saga
    await publishEvent('inventory.reserved', {
      orderId: event.payload.orderId,
      reservationId: uuid(),
    });
  } catch (err) {
    // Ошибка → компенсирующее событие
    await publishEvent('inventory.reservation.failed', {
      orderId: event.payload.orderId,
      reason: err.message,
    });
  }
}

// Order Service слушает inventory.reservation.failed
async function handleInventoryFailed(event: Event) {
  await db.order.update({
    where: { id: event.payload.orderId },
    data: { status: 'CANCELLED', cancelReason: 'inventory_unavailable' },
  });
}

Мониторинг Outbox — важно для production

// Метрики для alerting
// Если outbox накапливается — что-то сломалось с Kafka
async function outboxHealthCheck() {
  const unpublished = await db.outbox.count({
    where: { publishedAt: null, attempts: { lt: MAX_ATTEMPTS } },
  });

  const oldestUnpublished = await db.outbox.findFirst({
    where: { publishedAt: null },
    orderBy: { createdAt: 'asc' },
  });

  const lagSeconds = oldestUnpublished
    ? (Date.now() - oldestUnpublished.createdAt.getTime()) / 1000
    : 0;

  // Экспортируем в Prometheus
  outboxQueueSize.set(unpublished);
  outboxLagSeconds.set(lagSeconds);

  // Alert если lag > 60 секунд
  if (lagSeconds > 60) {
    console.error(`ALERT: Outbox lag is ${lagSeconds}s — check Kafka connectivity`);
  }
}

setInterval(outboxHealthCheck, 10_000);

Что мы не учли поначалу

Worker конкуренция. Если запустить несколько экземпляров worker'а — они будут читать одни и те же события. Решение: SELECT ... FOR UPDATE SKIP LOCKED вместо обычного SELECT:

-- Атомарная "блокировка" пачки событий
SELECT id FROM outbox
WHERE published_at IS NULL AND attempts < 5
ORDER BY created_at ASC
LIMIT 100
FOR UPDATE SKIP LOCKED;

SKIP LOCKED — другие worker'ы пропускают заблокированные строки и берут следующие. Параллельная обработка без дублирования.

Очистка outbox. Таблица будет расти без конца. Добавьте job для очистки старых опубликованных событий:

-- Удаляем опубликованные события старше 7 дней
DELETE FROM outbox
WHERE published_at IS NOT NULL
  AND published_at < NOW() - INTERVAL '7 days';

Порядок событий. Используйте aggregate_id как Kafka message key — события одной сущности попадут в одну partition и сохранят порядок.


Строите event-driven архитектуру или мигрируете с монолита на микросервисы? Aunimedaпосмотрите наши услуги или напишите нам.

Читайте также

tRPC + Zod: типобезопасный fullstack без кодогенерации — практическое руководствоaunimeda
Backend Development

tRPC + Zod: типобезопасный fullstack без кодогенерации — практическое руководство

Полное практическое руководство по tRPC v11 + Zod + Next.js: роутеры, процедуры, middleware авторизации, обработка ошибок и React Query интеграция на реальном CRUD примере.

ClickHouse vs PostgreSQL для аналитики: реальные бенчмарки и когда что использоватьaunimeda
Databases

ClickHouse vs PostgreSQL для аналитики: реальные бенчмарки и когда что использовать

Практическое сравнение ClickHouse и PostgreSQL для аналитических нагрузок: реальные бенчмарки на 100M строк, когда колоночное хранение выигрывает и когда Postgres + TimescaleDB достаточно.

15 лет в IT: уроки проектов 2010-2014, которые актуальны для казахстанского рынка сегодняaunimeda
Разработка

15 лет в IT: уроки проектов 2010-2014, которые актуальны для казахстанского рынка сегодня

Мы прошли через PhoneGap, Node.js 0.6, Hadoop и Bootstrap. Большинство тех инструментов ушло. Но принципы, которые стояли за выбором — что именно мы решали и почему — работают на каждом проекте в Казахстане сегодня. 10 уроков, которые не устарели.

Нужна IT-разработка для вашего бизнеса?

Разрабатываем сайты, мобильные приложения и AI-решения для бизнеса в Казахстане. Бесплатная консультация.

Получить консультацию Все статьи