Event-Driven Architecture в Node.js: Outbox Pattern, Kafka и гарантии доставки
Расскажу про баг который мы поймали в production. Финтех проект, интеграция с платёжным шлюзом. Пользователь оплатил — записываем в БД payment.status = 'paid', потом публикуем событие в Kafka, потом сервис уведомлений отправляет SMS. Схема простая.
Однажды ночью упал Kafka брокер на 8 минут. За это время 47 платежей записались в БД как оплаченные, но события в Kafka не ушли. SMS не отправились. Пользователи не получили подтверждения. Несколько человек написали в поддержку что думают что деньги списались без оплаты.
Проблема называется двойная запись (dual write). Вы пишете в два места (БД + Kafka), и гарантии что оба успешно завершатся — нет. Outbox Pattern решает это на уровне архитектуры.
Проблема двойной записи
// ОПАСНЫЙ КОД — двойная запись без гарантий
async function processPayment(paymentId: string) {
// Шаг 1: записываем в БД
await db.payment.update({
where: { id: paymentId },
data: { status: 'paid', paidAt: new Date() },
});
// Шаг 2: публикуем событие
// Если здесь упадёт — событие потеряно, БД уже обновлена
await kafka.producer.send({
topic: 'payments',
messages: [{ value: JSON.stringify({ paymentId, event: 'payment.completed' }) }],
});
}
Варианты что может пойти не так:
- Kafka недоступен (8 минут как в нашем случае)
- Network timeout между сервисом и Kafka
- Приложение упало между шагом 1 и 2 (OOM kill, деплой)
- Kafka принял сообщение но не подтвердил до таймаута
Outbox Pattern: транзакционная гарантия
Идея: вместо прямой записи в Kafka, пишем событие в таблицу outbox в той же транзакции что и основные данные. Отдельный worker читает outbox и публикует в Kafka.
-- Таблица для исходящих событий
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id UUID NOT NULL, -- ID сущности (paymentId, orderId и т.д.)
aggregate_type VARCHAR(100) NOT NULL, -- 'Payment', 'Order', 'User'
event_type VARCHAR(100) NOT NULL, -- 'payment.completed', 'order.created'
payload JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ, -- NULL = ещё не опубликовано
attempts INT NOT NULL DEFAULT 0,
last_error TEXT
);
-- Индекс для worker'а — читаем только неопубликованные
CREATE INDEX outbox_unpublished_idx
ON outbox(created_at)
WHERE published_at IS NULL;
// ПРАВИЛЬНЫЙ КОД — атомарная транзакция
async function processPayment(paymentId: string) {
await db.$transaction(async (tx) => {
// Шаг 1: обновляем статус платежа
const payment = await tx.payment.update({
where: { id: paymentId },
data: { status: 'paid', paidAt: new Date() },
});
// Шаг 2: записываем событие в outbox (та же транзакция!)
await tx.outbox.create({
data: {
aggregateId: paymentId,
aggregateType: 'Payment',
eventType: 'payment.completed',
payload: {
paymentId,
userId: payment.userId,
amount: payment.amount,
currency: payment.currency,
paidAt: payment.paidAt,
},
},
});
// Если транзакция откатится — ни БД, ни outbox не обновятся
// Если успешно — оба обновятся атомарно
});
}
Теперь либо оба записались, либо ни один. Проблема двойной записи исчезла на уровне архитектуры.
Outbox Worker: публикация в Kafka
// outbox-worker.ts
import { db } from './db';
import { kafka } from './kafka';
import type { Outbox } from '@prisma/client';
const BATCH_SIZE = 100;
const POLL_INTERVAL_MS = 1000;
const MAX_ATTEMPTS = 5;
async function publishOutboxEvents() {
const producer = kafka.producer({
// Идемпотентность: Kafka гарантирует exactly-once доставку на уровне producer
idempotent: true,
maxInFlightRequests: 5, // при idempotent: true максимум 5
});
await producer.connect();
while (true) {
try {
await processNextBatch(producer);
} catch (err) {
console.error('Outbox worker error:', err);
}
await sleep(POLL_INTERVAL_MS);
}
}
async function processNextBatch(producer: Producer) {
// Читаем пачку неопубликованных событий
const events = await db.outbox.findMany({
where: {
publishedAt: null,
attempts: { lt: MAX_ATTEMPTS },
},
orderBy: { createdAt: 'asc' },
take: BATCH_SIZE,
});
if (events.length === 0) return;
// Группируем по топику для batch публикации
const byTopic = groupBy(events, (e) => eventTypeToTopic(e.eventType));
for (const [topic, topicEvents] of Object.entries(byTopic)) {
await publishBatch(producer, topic, topicEvents);
}
}
async function publishBatch(producer: Producer, topic: string, events: Outbox[]) {
const messages = events.map((event) => ({
key: event.aggregateId, // key обеспечивает ordering в рамках partition
value: JSON.stringify({
id: event.id, // ID события — для идемпотентности на consumer
type: event.eventType,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
payload: event.payload,
occurredAt: event.createdAt,
}),
headers: {
eventType: event.eventType,
aggregateType: event.aggregateType,
},
}));
try {
await producer.send({ topic, messages });
// Помечаем как опубликованные
await db.outbox.updateMany({
where: { id: { in: events.map((e) => e.id) } },
data: { publishedAt: new Date() },
});
console.log(`Published ${events.length} events to topic ${topic}`);
} catch (err) {
// Увеличиваем счётчик попыток — после MAX_ATTEMPTS событие "застрянет"
// и не будет мешать более новым событиям
await db.outbox.updateMany({
where: { id: { in: events.map((e) => e.id) } },
data: {
attempts: { increment: 1 },
lastError: err instanceof Error ? err.message : String(err),
},
});
throw err; // пусть worker-loop обработает
}
}
function eventTypeToTopic(eventType: string): string {
// 'payment.completed' → 'payments'
// 'order.created' → 'orders'
const prefix = eventType.split('.')[0];
return `${prefix}s`; // payments, orders, users...
}
PostgreSQL триггер для уведомления worker'а (вместо polling)
Polling каждую секунду работает, но добавляет latency до 1 сек. Для лучшей latency — используем LISTEN/NOTIFY:
-- Триггер на INSERT в outbox
CREATE OR REPLACE FUNCTION notify_outbox_insert()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('outbox_insert', NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER outbox_insert_trigger
AFTER INSERT ON outbox
FOR EACH ROW
EXECUTE FUNCTION notify_outbox_insert();
// Заменяем polling на LISTEN
import { Client } from 'pg';
const pgClient = new Client({ connectionString: process.env.DATABASE_URL });
await pgClient.connect();
await pgClient.query('LISTEN outbox_insert');
pgClient.on('notification', async (msg) => {
if (msg.channel === 'outbox_insert') {
// Не ждём: обрабатываем сразу
await processNextBatch(producer).catch(console.error);
}
});
// Всё равно делаем периодический polling как fallback
// (на случай если уведомление потерялось при перезапуске)
setInterval(() => processNextBatch(producer).catch(console.error), 30_000);
С LISTEN/NOTIFY события доставляются в Kafka за ~10ms вместо до 1000ms.
Kafka Consumer: at-least-once + идемпотентная обработка
// notification-consumer.ts
import { kafka } from './kafka';
import { redis } from './redis';
import { smsService } from './sms';
const consumer = kafka.consumer({
groupId: 'notification-service',
});
await consumer.connect();
await consumer.subscribe({ topics: ['payments'], fromBeginning: false });
await consumer.run({
// eachBatch vs eachMessage — eachBatch даёт больше контроля над offset commit
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString());
const eventId = event.id; // ID из outbox — уникален глобально
// Идемпотентность: проверяем не обрабатывали ли уже
const processed = await redis.get(`event:processed:${eventId}`);
if (processed) {
console.log(`Event ${eventId} already processed, skipping`);
return; // Kafka commit произойдёт, дубликат пропущен
}
try {
await handleEvent(event);
// Помечаем как обработанное (TTL 7 дней — дольше retention Kafka)
await redis.set(`event:processed:${eventId}`, '1', 'EX', 604800);
} catch (err) {
console.error(`Failed to process event ${eventId}:`, err);
// Бросаем ошибку — Kafka не закоммитит offset
// Consumer перечитает это сообщение
throw err;
}
},
});
async function handleEvent(event: OutboxEvent) {
switch (event.type) {
case 'payment.completed':
await smsService.send({
phone: event.payload.userPhone,
message: `Оплата на ${event.payload.amount} ${event.payload.currency} прошла успешно`,
});
break;
case 'payment.failed':
await smsService.send({
phone: event.payload.userPhone,
message: 'Не удалось провести оплату. Проверьте данные карты.',
});
break;
default:
console.warn(`Unknown event type: ${event.type}`);
}
}
Exactly-once: нужно ли реально?
Короткий ответ: в большинстве случаев — нет, достаточно at-least-once + идемпотентный consumer.
Kafka Transactions и Exactly-Once Semantics (EOS) — реальная возможность, но с высокой ценой:
- Пропускная способность падает на 20-40%
- Сложность конфигурации и эксплуатации
- Работает только если читаешь из Kafka и пишешь в Kafka в одной транзакции
// Exactly-once в Kafka (только Kafka → Kafka, не Kafka → PostgreSQL)
const producer = kafka.producer({
transactionalId: 'my-transactional-producer', // уникальный ID
idempotent: true,
});
await producer.transaction(async (tx) => {
// Читаем offset потребителя
await tx.sendOffsets({
consumerGroupId: 'my-consumer',
topics: [{ topic: 'input', partitions: [{ partition: 0, offset: '42' }] }],
});
// Публикуем в выходной топик
await tx.send({
topic: 'output',
messages: [{ value: 'processed' }],
});
});
// Если упадёт — оба откатятся
На практике: для большинства задач (уведомления, аналитика, синхронизация данных) — at-least-once + идемпотентность дешевле и надёжнее.
Saga Pattern: распределённые транзакции
Когда нужна "транзакция" через несколько сервисов — используйте Saga. Не двухфазный коммит (2PC) — он не масштабируется и убивает производительность.
Пример: создание заказа
1. Order Service: создать заказ (status: PENDING)
2. Inventory Service: зарезервировать товар
3. Payment Service: списать оплату
4. Order Service: подтвердить заказ (status: CONFIRMED)
При ошибке на любом шаге — компенсирующие транзакции:
- Payment failed → Inventory: отменить резервирование
- Inventory failed → Order: отменить заказ
// Choreography-based Saga — через события, без центрального оркестратора
// Order Service
async function createOrder(orderData: CreateOrderInput) {
await db.$transaction(async (tx) => {
const order = await tx.order.create({
data: { ...orderData, status: 'PENDING' },
});
// Событие запускает цепочку Saga
await tx.outbox.create({
data: {
aggregateId: order.id,
aggregateType: 'Order',
eventType: 'order.created',
payload: { orderId: order.id, items: orderData.items, userId: orderData.userId },
},
});
});
}
// Inventory Service слушает order.created
async function handleOrderCreated(event: Event) {
try {
await reserveInventory(event.payload.items);
// Успех → следующий шаг Saga
await publishEvent('inventory.reserved', {
orderId: event.payload.orderId,
reservationId: uuid(),
});
} catch (err) {
// Ошибка → компенсирующее событие
await publishEvent('inventory.reservation.failed', {
orderId: event.payload.orderId,
reason: err.message,
});
}
}
// Order Service слушает inventory.reservation.failed
async function handleInventoryFailed(event: Event) {
await db.order.update({
where: { id: event.payload.orderId },
data: { status: 'CANCELLED', cancelReason: 'inventory_unavailable' },
});
}
Мониторинг Outbox — важно для production
// Метрики для alerting
// Если outbox накапливается — что-то сломалось с Kafka
async function outboxHealthCheck() {
const unpublished = await db.outbox.count({
where: { publishedAt: null, attempts: { lt: MAX_ATTEMPTS } },
});
const oldestUnpublished = await db.outbox.findFirst({
where: { publishedAt: null },
orderBy: { createdAt: 'asc' },
});
const lagSeconds = oldestUnpublished
? (Date.now() - oldestUnpublished.createdAt.getTime()) / 1000
: 0;
// Экспортируем в Prometheus
outboxQueueSize.set(unpublished);
outboxLagSeconds.set(lagSeconds);
// Alert если lag > 60 секунд
if (lagSeconds > 60) {
console.error(`ALERT: Outbox lag is ${lagSeconds}s — check Kafka connectivity`);
}
}
setInterval(outboxHealthCheck, 10_000);
Что мы не учли поначалу
Worker конкуренция. Если запустить несколько экземпляров worker'а — они будут читать одни и те же события. Решение: SELECT ... FOR UPDATE SKIP LOCKED вместо обычного SELECT:
-- Атомарная "блокировка" пачки событий
SELECT id FROM outbox
WHERE published_at IS NULL AND attempts < 5
ORDER BY created_at ASC
LIMIT 100
FOR UPDATE SKIP LOCKED;
SKIP LOCKED — другие worker'ы пропускают заблокированные строки и берут следующие. Параллельная обработка без дублирования.
Очистка outbox. Таблица будет расти без конца. Добавьте job для очистки старых опубликованных событий:
-- Удаляем опубликованные события старше 7 дней
DELETE FROM outbox
WHERE published_at IS NOT NULL
AND published_at < NOW() - INTERVAL '7 days';
Порядок событий. Используйте aggregate_id как Kafka message key — события одной сущности попадут в одну partition и сохранят порядок.
Строите event-driven архитектуру или мигрируете с монолита на микросервисы? Aunimeda — посмотрите наши услуги или напишите нам.