Kafka в production: паттерны и ошибки, о которых не пишут в документации
Когда мы только начинали с Kafka, я думал что понимаю как она работает — прочитал документацию, запустил локально, написал пару тестов. Через три месяца production-эксплуатации стало ясно: документация описывает happy path. Всё интересное происходит за её пределами. Эта статья — про то, что происходит когда что-то идёт не так, и как строить системы которые с этим справляются.
Партиционирование: как выбрать ключ и сколько партиций нужно
Начнём с фундаментального решения, которое сложно изменить потом.
Выбор ключа партиции определяет два свойства: порядок сообщений и распределение нагрузки. Сообщения с одинаковым ключом всегда попадают в одну партицию и обрабатываются в порядке записи. Сообщения с разными ключами могут обрабатываться параллельно.
// Типичная ошибка: использование user_id как ключа без анализа распределения
const producer = kafka.producer();
await producer.send({
topic: 'user-events',
messages: [{
key: String(userId), // Если у вас есть "горячие" пользователи — беда
value: JSON.stringify(event)
}]
});
Если у вас есть крупные клиенты (B2B-сервис, мультитенантность) — один tenant может генерировать 80% трафика. Все его события попадут в одну партицию. Один consumer будет захлёбываться, остальные — простаивать.
Решение: составной ключ или хеширование:
// Составной ключ: tenant_id + сегмент внутри tenant
function partitionKey(tenantId: string, entityId: string, buckets = 8): string {
// Разбиваем горячий tenant на несколько "виртуальных шардов"
const bucket = hashCode(entityId) % buckets;
return `${tenantId}:${bucket}`;
}
function hashCode(str: string): number {
let hash = 0;
for (let i = 0; i < str.length; i++) {
const char = str.charCodeAt(i);
hash = ((hash << 5) - hash) + char;
hash = hash & hash; // Convert to 32bit integer
}
return Math.abs(hash);
}
Сколько партиций нужно на самом деле?
Правило: максимальное количество партиций = максимальное число параллельных consumers в группе. Больше партиций, чем consumers — лишние партиции. Меньше партиций, чем хотелось бы consumers — не можете масштабироваться.
Практическое правило для начала: (peak_throughput / single_consumer_throughput) * 3. Умножитель 3 — запас для пиков и будущего роста. Переразбить тему потом можно (kafka-reassign-partitions), но это операция с риском.
У нас для топика заказов с 50k событий/мин и consumer throughput 5k/мин — минимум 10 партиций, мы взяли 30.
Consumer groups и rebalancing: почему это больно
Rebalancing — это процесс перераспределения партиций между consumers в группе. Происходит когда: добавляется новый consumer, consumer падает, consumer не успевает отправить heartbeat, тема получает новые партиции.
Проблема: во время rebalancing ВСЕ consumers в группе останавливаются. Никто не читает ни одну партицию. В production с большой нагрузкой rebalancing на 30 секунд — это 30 секунд накапливающегося lag.
До Kafka 2.4 был только eager rebalancing — все consumers отдают партиции, потом получают новые. С Kafka 2.4+ есть cooperative (incremental) rebalancing:
import { Kafka, PartitionAssigners } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-service',
brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
});
const consumer = kafka.consumer({
groupId: 'order-processor',
// Cooperative rebalancing — partitions перераспределяются постепенно
// без полной остановки
partitionAssigners: [PartitionAssigners.roundRobin],
sessionTimeout: 30000, // 30 сек — consumer считается мёртвым
heartbeatInterval: 3000, // 3 сек — частота heartbeat
maxBytesPerPartition: 1048576, // 1MB — важно для контроля памяти
// Ключевой параметр: сколько миллисекунд между poll() вызовами максимум
// Если ваш обработчик медленный — увеличьте
maxWaitTimeInMs: 5000,
});
Практика минимизации rebalancing:
Правильно настройте sessionTimeout. По умолчанию 10 секунд — для медленных обработчиков (DB запросы, HTTP вызовы) этого может не хватить. Если обработка сообщения занимает 8 секунд, а heartbeat должен прийти за 10 — вы будете постоянно получать ложные rebalance.
// Критически важно: не коммитим offset пока не обработали
const consumer = kafka.consumer({
groupId: 'order-processor',
sessionTimeout: 60000, // 60 секунд для медленных обработчиков
heartbeatInterval: 5000,
});
await consumer.run({
// Отключаем авто-коммит
eachBatchAutoResolve: false,
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
for (const message of batch.messages) {
if (!isRunning()) break;
try {
await processMessage(message);
// Коммитим только успешно обработанные
resolveOffset(message.offset);
} catch (error) {
// Логируем, отправляем в DLQ, но не коммитим offset
await sendToDLQ(message, error);
resolveOffset(message.offset); // в DLQ — тоже коммитим
}
// Heartbeat внутри цикла — важно для медленной обработки
await heartbeat();
}
}
});
Идемпотентность продьюсера и exactly-once semantics
Идемпотентный продьюсер — это гарантия at-least-once без дублей на уровне Kafka. Включается одной настройкой:
const producer = kafka.producer({
// Включает идемпотентность
idempotent: true,
// При idempotent: true эти настройки выставляются автоматически:
// acks: 'all'
// maxInFlightRequests: 5
// retries: Infinity (с экспоненциальным backoff)
});
С idempotent: true Kafka присваивает каждому продьюсеру PID и sequence number каждому сообщению. Брокер отклоняет дубли. Это защищает от случая "сетевая ошибка при записи → retry → дублирование".
Когда exactly-once реально нужен?
Честный ответ: реже чем вы думаете. Exactly-once (транзакции Kafka) дают гарантию что сообщение записано В KAFKA ровно один раз. Но если ваш consumer пишет в БД — вы всё равно должны обеспечивать идемпотентность на стороне consumer.
Overhead exactly-once: +15-30% к latency из-за двухфазного коммита. Используйте только для Kafka→Kafka трансформаций (например, в Kafka Streams).
// Exactly-once для Kafka→Kafka трансформации
const producer = kafka.producer({
transactionalId: 'my-transactional-producer-1', // уникальный ID
idempotent: true,
});
await producer.connect();
// Транзакция охватывает и запись сообщений, и коммит consumer offset
await producer.transaction(async (tx) => {
await tx.send({
topic: 'enriched-orders',
messages: [{ value: JSON.stringify(enrichedOrder) }]
});
// Коммитим offset в рамках той же транзакции
await tx.sendOffsets({
consumerGroupId: 'order-enricher',
topics: [{ topic: 'raw-orders', partitions: [{ partition: 0, offset: '42' }] }]
});
});
Мониторинг: consumer lag — главная метрика
Consumer lag = разница между последним offset в партиции и последним закоммиченным offset consumer'а. Это единственная метрика которая скажет вам "система не справляется".
# Посмотреть lag по группе
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group order-processor \
--describe
# Output:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order-processor orders 0 1000 1050 50
# order-processor orders 1 2000 2000 0
# order-processor orders 2 1500 1800 300 ← проблема
Партиция 2 накапливает lag. Возможные причины: consumer медленный, consumer завис, consumer мёртв (но Kafka ещё не знает).
// Программный мониторинг lag через AdminClient
import { Kafka } from 'kafkajs';
async function getConsumerLag(groupId: string, topic: string) {
const admin = kafka.admin();
await admin.connect();
try {
const offsets = await admin.fetchOffsets({ groupId, topics: [topic] });
const topicOffsets = await admin.fetchTopicOffsets(topic);
const lagByPartition = topicOffsets.map(topicPartition => {
const consumerOffset = offsets[0].partitions.find(
p => p.partition === topicPartition.partition
);
const lag = BigInt(topicPartition.offset) - BigInt(consumerOffset?.offset ?? '0');
return {
partition: topicPartition.partition,
lag: lag.toString(),
isHealthy: lag < 1000n
};
});
const totalLag = lagByPartition.reduce((sum, p) => sum + BigInt(p.lag), 0n);
return {
totalLag: totalLag.toString(),
partitions: lagByPartition,
isHealthy: totalLag < 5000n
};
} finally {
await admin.disconnect();
}
}
Экспортируйте эту метрику в Prometheus и настройте alert: если lag растёт более 5 минут подряд — что-то не так.
Dead Letter Queue pattern
Реальность такова: некоторые сообщения не получится обработать никогда. Битый JSON, несуществующий внешний ID, логическая ошибка в данных. Если вы застряли на таком сообщении и не коммитите offset — весь partition встаёт. DLQ решает это:
interface DLQMessage<T = unknown> {
originalTopic: string;
originalPartition: number;
originalOffset: string;
originalKey: string | null;
payload: string; // raw value
error: string;
errorStack?: string;
attemptCount: number;
firstFailedAt: string;
lastFailedAt: string;
}
class KafkaConsumerWithDLQ {
private producer: Producer;
private consumer: Consumer;
private dlqTopic: string;
private maxRetries: number;
constructor(options: {
kafka: Kafka;
groupId: string;
dlqTopic: string;
maxRetries?: number;
}) {
this.producer = options.kafka.producer({ idempotent: true });
this.consumer = options.kafka.consumer({ groupId: options.groupId });
this.dlqTopic = options.dlqTopic;
this.maxRetries = options.maxRetries ?? 3;
}
async processWithRetry(
message: KafkaMessage,
topic: string,
partition: number,
handler: (msg: KafkaMessage) => Promise<void>
): Promise<void> {
let attemptCount = 0;
let lastError: Error | null = null;
// Экспоненциальный backoff для retry
while (attemptCount < this.maxRetries) {
try {
await handler(message);
return; // Успех
} catch (error) {
lastError = error as Error;
attemptCount++;
if (attemptCount < this.maxRetries) {
const delay = Math.min(1000 * Math.pow(2, attemptCount), 30000);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
// Исчерпали попытки — отправляем в DLQ
const dlqPayload: DLQMessage = {
originalTopic: topic,
originalPartition: partition,
originalOffset: message.offset,
originalKey: message.key?.toString() ?? null,
payload: message.value?.toString() ?? '',
error: lastError?.message ?? 'Unknown error',
errorStack: lastError?.stack,
attemptCount,
firstFailedAt: new Date().toISOString(),
lastFailedAt: new Date().toISOString(),
};
await this.producer.send({
topic: this.dlqTopic,
messages: [{
key: message.key,
value: JSON.stringify(dlqPayload),
headers: {
'dlq-source-topic': topic,
'dlq-error': lastError?.message ?? 'unknown',
}
}]
});
console.error(`Message sent to DLQ`, { topic, partition, offset: message.offset });
}
}
DLQ топик обрабатывается отдельным сервисом: оповещение команды, логирование в ClickHouse для анализа, ручная перепроверка и повторная отправка в основной топик после фикса.
Compacted topics для state store
Compacted topic — это топик где Kafka гарантирует, что для каждого ключа хранится хотя бы последнее значение. Старые версии значений периодически удаляются.
// Создание compacted topic
await admin.createTopics({
topics: [{
topic: 'user-profiles',
numPartitions: 10,
replicationFactor: 3,
configEntries: [
{ name: 'cleanup.policy', value: 'compact' },
{ name: 'min.cleanable.dirty.ratio', value: '0.1' },
{ name: 'segment.ms', value: '3600000' }, // 1 час
]
}]
});
// Запись: ключ = user_id, значение = текущее состояние профиля
await producer.send({
topic: 'user-profiles',
messages: [{
key: String(userId),
value: JSON.stringify(userProfile) // null = tombstone (удаление)
}]
});
Практическое применение: микросервис при старте может прочитать compacted topic с начала и восстановить текущее состояние всех сущностей — без запроса к БД. Это паттерн event sourcing с "бесплатным" snapshot.
Подводим итог
Kafka — мощный инструмент, но её complexity реальна. Главные вещи которые мы узнали на практике:
- Выбор ключа партиции — архитектурное решение, меняется болезненно
- Включайте
idempotent: trueна продьюсере — это бесплатно и спасает от дублей - Consumer lag — мониторьте и ставьте алерты, это ваш главный health indicator
- DLQ обязателен в production — сообщение которое не обрабатывается должно куда-то деться
- Cooperative rebalancing снижает downtime при деплоях
Строите event-driven архитектуру или внедряете Kafka в существующую систему? Aunimeda помогает с проектированием и внедрением. Напишите нам о вашей задаче.