О насБлогКонтакты
Инфраструктура17 апреля 2026 г. 8 мин 1

Kafka в production: паттерны и ошибки, о которых не пишут в документации

AunimedaAunimeda
📋 Содержание

Kafka в production: паттерны и ошибки, о которых не пишут в документации

Когда мы только начинали с Kafka, я думал что понимаю как она работает — прочитал документацию, запустил локально, написал пару тестов. Через три месяца production-эксплуатации стало ясно: документация описывает happy path. Всё интересное происходит за её пределами. Эта статья — про то, что происходит когда что-то идёт не так, и как строить системы которые с этим справляются.

Партиционирование: как выбрать ключ и сколько партиций нужно

Начнём с фундаментального решения, которое сложно изменить потом.

Выбор ключа партиции определяет два свойства: порядок сообщений и распределение нагрузки. Сообщения с одинаковым ключом всегда попадают в одну партицию и обрабатываются в порядке записи. Сообщения с разными ключами могут обрабатываться параллельно.

// Типичная ошибка: использование user_id как ключа без анализа распределения
const producer = kafka.producer();
await producer.send({
    topic: 'user-events',
    messages: [{
        key: String(userId),  // Если у вас есть "горячие" пользователи — беда
        value: JSON.stringify(event)
    }]
});

Если у вас есть крупные клиенты (B2B-сервис, мультитенантность) — один tenant может генерировать 80% трафика. Все его события попадут в одну партицию. Один consumer будет захлёбываться, остальные — простаивать.

Решение: составной ключ или хеширование:

// Составной ключ: tenant_id + сегмент внутри tenant
function partitionKey(tenantId: string, entityId: string, buckets = 8): string {
    // Разбиваем горячий tenant на несколько "виртуальных шардов"
    const bucket = hashCode(entityId) % buckets;
    return `${tenantId}:${bucket}`;
}

function hashCode(str: string): number {
    let hash = 0;
    for (let i = 0; i < str.length; i++) {
        const char = str.charCodeAt(i);
        hash = ((hash << 5) - hash) + char;
        hash = hash & hash; // Convert to 32bit integer
    }
    return Math.abs(hash);
}

Сколько партиций нужно на самом деле?

Правило: максимальное количество партиций = максимальное число параллельных consumers в группе. Больше партиций, чем consumers — лишние партиции. Меньше партиций, чем хотелось бы consumers — не можете масштабироваться.

Практическое правило для начала: (peak_throughput / single_consumer_throughput) * 3. Умножитель 3 — запас для пиков и будущего роста. Переразбить тему потом можно (kafka-reassign-partitions), но это операция с риском.

У нас для топика заказов с 50k событий/мин и consumer throughput 5k/мин — минимум 10 партиций, мы взяли 30.

Consumer groups и rebalancing: почему это больно

Rebalancing — это процесс перераспределения партиций между consumers в группе. Происходит когда: добавляется новый consumer, consumer падает, consumer не успевает отправить heartbeat, тема получает новые партиции.

Проблема: во время rebalancing ВСЕ consumers в группе останавливаются. Никто не читает ни одну партицию. В production с большой нагрузкой rebalancing на 30 секунд — это 30 секунд накапливающегося lag.

До Kafka 2.4 был только eager rebalancing — все consumers отдают партиции, потом получают новые. С Kafka 2.4+ есть cooperative (incremental) rebalancing:

import { Kafka, PartitionAssigners } from 'kafkajs';

const kafka = new Kafka({
    clientId: 'my-service',
    brokers: ['kafka1:9092', 'kafka2:9092', 'kafka3:9092'],
});

const consumer = kafka.consumer({
    groupId: 'order-processor',
    // Cooperative rebalancing — partitions перераспределяются постепенно
    // без полной остановки
    partitionAssigners: [PartitionAssigners.roundRobin],
    sessionTimeout: 30000,    // 30 сек — consumer считается мёртвым
    heartbeatInterval: 3000,  // 3 сек — частота heartbeat
    maxBytesPerPartition: 1048576,  // 1MB — важно для контроля памяти
    // Ключевой параметр: сколько миллисекунд между poll() вызовами максимум
    // Если ваш обработчик медленный — увеличьте
    maxWaitTimeInMs: 5000,
});

Практика минимизации rebalancing:

Правильно настройте sessionTimeout. По умолчанию 10 секунд — для медленных обработчиков (DB запросы, HTTP вызовы) этого может не хватить. Если обработка сообщения занимает 8 секунд, а heartbeat должен прийти за 10 — вы будете постоянно получать ложные rebalance.

// Критически важно: не коммитим offset пока не обработали
const consumer = kafka.consumer({
    groupId: 'order-processor',
    sessionTimeout: 60000,   // 60 секунд для медленных обработчиков
    heartbeatInterval: 5000,
});

await consumer.run({
    // Отключаем авто-коммит
    eachBatchAutoResolve: false,
    eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning }) => {
        for (const message of batch.messages) {
            if (!isRunning()) break;
            
            try {
                await processMessage(message);
                // Коммитим только успешно обработанные
                resolveOffset(message.offset);
            } catch (error) {
                // Логируем, отправляем в DLQ, но не коммитим offset
                await sendToDLQ(message, error);
                resolveOffset(message.offset); // в DLQ — тоже коммитим
            }
            
            // Heartbeat внутри цикла — важно для медленной обработки
            await heartbeat();
        }
    }
});

Идемпотентность продьюсера и exactly-once semantics

Идемпотентный продьюсер — это гарантия at-least-once без дублей на уровне Kafka. Включается одной настройкой:

const producer = kafka.producer({
    // Включает идемпотентность
    idempotent: true,
    // При idempotent: true эти настройки выставляются автоматически:
    // acks: 'all'
    // maxInFlightRequests: 5
    // retries: Infinity (с экспоненциальным backoff)
});

С idempotent: true Kafka присваивает каждому продьюсеру PID и sequence number каждому сообщению. Брокер отклоняет дубли. Это защищает от случая "сетевая ошибка при записи → retry → дублирование".

Когда exactly-once реально нужен?

Честный ответ: реже чем вы думаете. Exactly-once (транзакции Kafka) дают гарантию что сообщение записано В KAFKA ровно один раз. Но если ваш consumer пишет в БД — вы всё равно должны обеспечивать идемпотентность на стороне consumer.

Overhead exactly-once: +15-30% к latency из-за двухфазного коммита. Используйте только для Kafka→Kafka трансформаций (например, в Kafka Streams).

// Exactly-once для Kafka→Kafka трансформации
const producer = kafka.producer({
    transactionalId: 'my-transactional-producer-1', // уникальный ID
    idempotent: true,
});

await producer.connect();

// Транзакция охватывает и запись сообщений, и коммит consumer offset
await producer.transaction(async (tx) => {
    await tx.send({
        topic: 'enriched-orders',
        messages: [{ value: JSON.stringify(enrichedOrder) }]
    });
    
    // Коммитим offset в рамках той же транзакции
    await tx.sendOffsets({
        consumerGroupId: 'order-enricher',
        topics: [{ topic: 'raw-orders', partitions: [{ partition: 0, offset: '42' }] }]
    });
});

Мониторинг: consumer lag — главная метрика

Consumer lag = разница между последним offset в партиции и последним закоммиченным offset consumer'а. Это единственная метрика которая скажет вам "система не справляется".

# Посмотреть lag по группе
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
    --group order-processor \
    --describe

# Output:
# GROUP           TOPIC          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order-processor orders         0          1000            1050            50
# order-processor orders         1          2000            2000            0
# order-processor orders         2          1500            1800            300  ← проблема

Партиция 2 накапливает lag. Возможные причины: consumer медленный, consumer завис, consumer мёртв (но Kafka ещё не знает).

// Программный мониторинг lag через AdminClient
import { Kafka } from 'kafkajs';

async function getConsumerLag(groupId: string, topic: string) {
    const admin = kafka.admin();
    await admin.connect();
    
    try {
        const offsets = await admin.fetchOffsets({ groupId, topics: [topic] });
        const topicOffsets = await admin.fetchTopicOffsets(topic);
        
        const lagByPartition = topicOffsets.map(topicPartition => {
            const consumerOffset = offsets[0].partitions.find(
                p => p.partition === topicPartition.partition
            );
            const lag = BigInt(topicPartition.offset) - BigInt(consumerOffset?.offset ?? '0');
            return {
                partition: topicPartition.partition,
                lag: lag.toString(),
                isHealthy: lag < 1000n
            };
        });
        
        const totalLag = lagByPartition.reduce((sum, p) => sum + BigInt(p.lag), 0n);
        
        return {
            totalLag: totalLag.toString(),
            partitions: lagByPartition,
            isHealthy: totalLag < 5000n
        };
    } finally {
        await admin.disconnect();
    }
}

Экспортируйте эту метрику в Prometheus и настройте alert: если lag растёт более 5 минут подряд — что-то не так.

Dead Letter Queue pattern

Реальность такова: некоторые сообщения не получится обработать никогда. Битый JSON, несуществующий внешний ID, логическая ошибка в данных. Если вы застряли на таком сообщении и не коммитите offset — весь partition встаёт. DLQ решает это:

interface DLQMessage<T = unknown> {
    originalTopic: string;
    originalPartition: number;
    originalOffset: string;
    originalKey: string | null;
    payload: string; // raw value
    error: string;
    errorStack?: string;
    attemptCount: number;
    firstFailedAt: string;
    lastFailedAt: string;
}

class KafkaConsumerWithDLQ {
    private producer: Producer;
    private consumer: Consumer;
    private dlqTopic: string;
    private maxRetries: number;

    constructor(options: {
        kafka: Kafka;
        groupId: string;
        dlqTopic: string;
        maxRetries?: number;
    }) {
        this.producer = options.kafka.producer({ idempotent: true });
        this.consumer = options.kafka.consumer({ groupId: options.groupId });
        this.dlqTopic = options.dlqTopic;
        this.maxRetries = options.maxRetries ?? 3;
    }

    async processWithRetry(
        message: KafkaMessage,
        topic: string,
        partition: number,
        handler: (msg: KafkaMessage) => Promise<void>
    ): Promise<void> {
        let attemptCount = 0;
        let lastError: Error | null = null;
        
        // Экспоненциальный backoff для retry
        while (attemptCount < this.maxRetries) {
            try {
                await handler(message);
                return; // Успех
            } catch (error) {
                lastError = error as Error;
                attemptCount++;
                
                if (attemptCount < this.maxRetries) {
                    const delay = Math.min(1000 * Math.pow(2, attemptCount), 30000);
                    await new Promise(resolve => setTimeout(resolve, delay));
                }
            }
        }
        
        // Исчерпали попытки — отправляем в DLQ
        const dlqPayload: DLQMessage = {
            originalTopic: topic,
            originalPartition: partition,
            originalOffset: message.offset,
            originalKey: message.key?.toString() ?? null,
            payload: message.value?.toString() ?? '',
            error: lastError?.message ?? 'Unknown error',
            errorStack: lastError?.stack,
            attemptCount,
            firstFailedAt: new Date().toISOString(),
            lastFailedAt: new Date().toISOString(),
        };
        
        await this.producer.send({
            topic: this.dlqTopic,
            messages: [{
                key: message.key,
                value: JSON.stringify(dlqPayload),
                headers: {
                    'dlq-source-topic': topic,
                    'dlq-error': lastError?.message ?? 'unknown',
                }
            }]
        });
        
        console.error(`Message sent to DLQ`, { topic, partition, offset: message.offset });
    }
}

DLQ топик обрабатывается отдельным сервисом: оповещение команды, логирование в ClickHouse для анализа, ручная перепроверка и повторная отправка в основной топик после фикса.

Compacted topics для state store

Compacted topic — это топик где Kafka гарантирует, что для каждого ключа хранится хотя бы последнее значение. Старые версии значений периодически удаляются.

// Создание compacted topic
await admin.createTopics({
    topics: [{
        topic: 'user-profiles',
        numPartitions: 10,
        replicationFactor: 3,
        configEntries: [
            { name: 'cleanup.policy', value: 'compact' },
            { name: 'min.cleanable.dirty.ratio', value: '0.1' },
            { name: 'segment.ms', value: '3600000' }, // 1 час
        ]
    }]
});

// Запись: ключ = user_id, значение = текущее состояние профиля
await producer.send({
    topic: 'user-profiles',
    messages: [{
        key: String(userId),
        value: JSON.stringify(userProfile) // null = tombstone (удаление)
    }]
});

Практическое применение: микросервис при старте может прочитать compacted topic с начала и восстановить текущее состояние всех сущностей — без запроса к БД. Это паттерн event sourcing с "бесплатным" snapshot.

Подводим итог

Kafka — мощный инструмент, но её complexity реальна. Главные вещи которые мы узнали на практике:

  • Выбор ключа партиции — архитектурное решение, меняется болезненно
  • Включайте idempotent: true на продьюсере — это бесплатно и спасает от дублей
  • Consumer lag — мониторьте и ставьте алерты, это ваш главный health indicator
  • DLQ обязателен в production — сообщение которое не обрабатывается должно куда-то деться
  • Cooperative rebalancing снижает downtime при деплоях

Строите event-driven архитектуру или внедряете Kafka в существующую систему? Aunimeda помогает с проектированием и внедрением. Напишите нам о вашей задаче.

Читайте также

PostgreSQL блокировки и дедлоки: диагностика и устранение в productionaunimeda
Базы данных

PostgreSQL блокировки и дедлоки: диагностика и устранение в production

Глубокий разбор системы блокировок PostgreSQL: как читать pg_locks, почему ALTER TABLE страшен в production, как воспроизвести и исправить дедлоки, и паттерн SELECT FOR UPDATE SKIP LOCKED для job queues.

WebSocket-сервер на 100,000 соединений: архитектура и реальные подводные камниaunimeda
Бэкенд

WebSocket-сервер на 100,000 соединений: архитектура и реальные подводные камни

Как масштабировать WebSocket от 100 до 100k+ соединений: uWebSockets.js vs ws, Redis Pub/Sub между инстансами, heartbeat и reconnect логика, memory leak на неочищенных обработчиках.

ClickHouse в production: колоночная база данных для реальной аналитикиaunimeda
Базы данных

ClickHouse в production: колоночная база данных для реальной аналитики

Глубокое погружение в ClickHouse: почему колоночное хранение выигрывает у строчного для аналитики, движки таблиц MergeTree, материализованные представления и реальные подводные камни при переносе данных из PostgreSQL.

Нужна IT-разработка для вашего бизнеса?

Разрабатываем сайты, мобильные приложения и AI-решения для бизнеса в России. Бесплатная консультация.

Получить консультацию Все статьи