Kafka + FastStream + FastAPI: асинхронные события и небольшой проект

Курс показывает, как строить событийные сервисы на Kafka с использованием FastStream и интеграцией в FastAPI. В конце вы соберёте небольшой проект: API + продюсер/консьюмер, обработка событий, ретраи и наблюдаемость.

1. Архитектура событий: Kafka, топики, партиции, consumer groups

Архитектура событий: Kafka, топики, партиции, consumer groups

Событийная архитектура нужна, когда системе важно реагировать на факты, а не только отвечать на запросы. Вместо прямых синхронных вызовов сервисы публикуют события (например, order.created), а другие сервисы подписываются и обрабатывают их асинхронно.

В этом курсе Kafka будет транспортом событий, FastStream — удобным слоем для продюсеров и консьюмеров в Python, а FastAPI — HTTP-входом в систему (например, создание заказа через API и публикация события в Kafka).

Что такое Kafka и какие роли она выполняет

Apache Kafka — распределённая платформа для потоковой передачи событий. Её ключевая идея: события записываются в устойчивый журнал (log), а потребители читают их в своём темпе.

Основные свойства Kafka, полезные в микросервисной архитектуре:

  • Буферизация: продюсер может отправить событие, даже если консьюмер временно недоступен.
  • Масштабирование: параллельная обработка достигается партициями.
  • Повторная обработка: консьюмер может перечитать события, если нужно пересобрать состояние.
  • Полезные ссылки:

  • Документация Apache Kafka
  • Kafka Concepts (Confluent)
  • Базовые сущности: broker, topic, record

    Broker и кластер

    Broker — сервер Kafka. Обычно Kafka работает как кластер из нескольких broker-ов.

  • Кластер даёт отказоустойчивость и масштабирование.
  • Данные в Kafka обычно реплицируются между broker-ами (для устойчивости).
  • Topic

    Topic — логическое имя потока событий, например:

  • orders
  • payments
  • notifications
  • Важно: топик — это не «очередь» в классическом смысле. Один и тот же топик могут независимо читать разные группы консьюмеров.

    Record (сообщение, событие)

    Событие в Kafka часто называют record. Обычно оно включает:

  • key (опционально): используется для выбора партиции и сохранения порядка для связанных событий.
  • value: полезная нагрузка (часто JSON/Avro/Protobuf).
  • headers (опционально): метаданные (например, correlation_id).
  • timestamp.
  • Партиции: параллелизм и порядок

    Зачем нужны партиции

    Каждый топик состоит из партиций (partition). Партиция — это упорядоченный append-only журнал.

    Партиции дают два главных эффекта:

  • Параллельная обработка: разные партиции можно читать одновременно.
  • Масштабирование хранения: данные распределяются по broker-ам.
  • Порядок сообщений

    Гарантия порядка в Kafka ограничена:

  • Порядок гарантирован только внутри одной партиции.
  • Между партициями общего порядка нет.
  • Практический вывод:

  • Если вам критично, чтобы события по одному order_id шли строго последовательно, отправляйте их с key=order_id, чтобы они попадали в одну и ту же партицию.
  • Как выбирается партиция

    Упрощённо Kafka выбирает партицию так:

  • Если key задан, Kafka использует хеш ключа, чтобы стабильно направлять одинаковые ключи в одну партицию.
  • Если key не задан, события распределяются по партициям более-менее равномерно.
  • Это решение — архитектурное:

  • key нужен, когда важны порядок и локализация состояния.
  • отсутствие key подходит, когда события независимы и важна равномерная нагрузка.
  • !Диаграмма показывает связь ключей, партиций и параллельного чтения в consumer group

    Replication и устойчивость (коротко, но важно)

    Kafka обычно хранит партиции с репликацией.

  • У партиции есть leader (через него идут чтение/запись) и followers (реплики).
  • При сбое broker-а лидер может смениться на одну из реплик.
  • Для продюсера это важно через настройки подтверждений записи (acks), но на уровне архитектуры достаточно помнить:

  • репликация — это причина, почему Kafka может переживать падение узлов без потери данных (в пределах настроек).
  • Consumer groups: масштабирование обработки

    Что такое consumer group

    Consumer group — это логическое объединение консьюмеров, которые делят работу по чтению топика.

    Правило распределения:

  • внутри одной consumer group одна партиция может читаться только одним консьюмером одновременно.
  • Следствия:

  • Если партиций 3, то максимум параллелизма в группе — 3 консьюмера.
  • Если консьюмеров больше, чем партиций, «лишние» будут простаивать.
  • Если консьюмеров меньше, чем партиций, один консьюмер будет читать несколько партиций.
  • Паттерн: разные группы для разных задач

    Ключевая сила Kafka: один топик могут читать разные consumer groups независимо.

    Пример для orders:

  • группа billing-service читает события и выставляет счёт
  • группа analytics-service читает события и строит метрики
  • группа notifications-service читает события и отправляет уведомления
  • Каждая группа получает свою копию потока и продвигается по нему независимо.

    Rebalance

    Когда состав группы меняется (консьюмер упал/поднялся, изменилось число экземпляров), Kafka делает rebalance — перераспределяет партиции между консьюмерами.

    Архитектурный нюанс:

  • ребаланс может временно приостанавливать обработку
  • обработчик должен быть готов к тому, что «владение партицией» может переехать на другой экземпляр
  • Offset: позиция чтения и повторная обработка

    Kafka не «удаляет сообщение после чтения» как классические очереди. Вместо этого консьюмер хранит offset — номер позиции в партиции, до которой он дочитал.

  • Offset хранится по каждой партиции и по каждой consumer group.
  • Коммит offset-а означает: «мы считаем события до этого места обработанными».
  • Почему это важно для проектирования:

  • Если приложение упало до коммита, после рестарта оно может прочитать события повторно.
  • Поэтому обработчики обычно делают идемпотентную обработку (повтор не должен ломать данные) или используют дедупликацию по идентификатору события.
  • Retention: как долго Kafka хранит события

    Kafka хранит события по правилам retention:

  • по времени (например, 7 дней)
  • по размеру (например, пока партиция не достигнет лимита)
  • Это влияет на архитектуру:

  • Kafka — не вечный архив по умолчанию
  • если нужно «вечное» хранение, настраивают retention или выносят данные в хранилище (S3/DB) через коннекторы
  • Отдельный режим — log compaction (компактация). Он полезен для топиков-состояний, где важна последняя запись по ключу (например, текущий профиль пользователя). Концептуально:

  • для каждого ключа Kafka стремится сохранить последнюю версию значения
  • Официально:

  • Kafka: Log Compaction
  • Как эти понятия будут использоваться с FastStream и FastAPI

    В рамках курса мы будем строить небольшой проект, где:

  • FastAPI принимает HTTP-запрос (например, создать заказ).
  • Приложение публикует событие в Kafka (через FastStream).
  • Консьюмеры читают события в своих consumer groups и выполняют асинхронные действия (например, резерв товара, отправка уведомления).
  • Соответствие терминов Kafka и будущего кода:

    | Концепция Kafka | Как проявится в проекте | Практический смысл | |---|---|---| | topic | строка имени топика (например, orders) | канал событий домена | | partition | настраивается у топика в Kafka | масштабирование обработки | | key | поле (например, order_id) при публикации | порядок и локальность событий | | consumer group | group_id у консьюмера | «делим работу» между инстансами | | offset commit | настройка и поведение консьюмера | риск дублей и требования к идемпотентности |

    Ссылки на инструменты курса:

  • FastStream: документация
  • FastAPI: документация
  • Частые ошибки в событийной архитектуре (и как их избежать)

  • Ожидать глобального порядка: в Kafka порядок только внутри партиции.
  • Игнорировать ключ: без key связанные события могут оказаться в разных партициях.
  • Думать, что Kafka удаляет сообщения после чтения: удаление определяется retention, а чтение — offset-ами.
  • Не учитывать дубли: при сбоях возможна повторная доставка, значит обработчики должны быть идемпотентными.
  • Масштабировать консьюмеров без увеличения партиций: параллелизм ограничен числом партиций.
  • Что дальше

    В следующей статье мы перейдём от концепций к практике окружения: поднимем Kafka локально (обычно через Docker), создадим топики и посмотрим, как продюсер/консьюмер взаимодействуют с ними на минимальном примере — это станет основой для дальнейшей интеграции с FastStream и FastAPI.

    2. Быстрый старт с Kafka локально: Docker Compose и базовые утилиты

    Быстрый старт с Kafka локально: Docker Compose и базовые утилиты

    В прошлой статье мы разобрали, как устроена Kafka: топики, партиции, ключи, consumer groups и offsets. Теперь сделаем следующий шаг: поднимем Kafka локально через Docker Compose, создадим топики и научимся быстро проверять поток событий стандартными утилитами.

    Цель этой статьи — чтобы у вас было рабочее окружение, к которому позже подключатся FastStream (продюсер/консьюмер) и FastAPI (HTTP-вход в систему).

    Что потребуется

  • Docker и Docker Compose
  • Терминал
  • Официальные источники:

  • Docker Compose overview
  • Apache Kafka documentation
  • Локальная Kafka через Docker Compose

    Мы поднимем Kafka в режиме KRaft (без ZooKeeper) и добавим веб-интерфейс Kafka UI, чтобы удобно смотреть топики и сообщения.

    Docker Compose файл

    Создайте файл docker-compose.yml:

    Что здесь важно:

  • 9092:9092 — порт Kafka для подключений с вашей машины.
  • KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 — Kafka сообщает клиентам, как до неё правильно добраться (в нашем случае с хоста это localhost:9092).
  • Kafka UI ходит к Kafka внутри docker-сети, поэтому использует kafka:9092.
  • !Схема показывает, как хост и контейнеры подключаются к Kafka разными адресами

    Запуск и проверка

    Запуск:

    Проверка контейнеров:

    Логи Kafka (если что-то не стартует):

    Остановка:

    Если нужно удалить данные Kafka (все топики и сообщения):

    Kafka UI: быстрый визуальный контроль

    Откройте в браузере:

  • Kafka UI
  • В UI удобно:

  • смотреть список топиков
  • видеть партиции
  • читать сообщения
  • наблюдать consumer groups
  • UI не обязателен, но на старте сильно ускоряет понимание.

    Базовые утилиты Kafka внутри контейнера

    Kafka поставляется с CLI-скриптами. Быстрее всего запускать их прямо внутри контейнера Kafka, чтобы не ставить ничего локально.

    Договоримся: --bootstrap-server kafka:9092 используется внутри docker-сети, поэтому в командах ниже мы выполняем CLI из контейнера kafka.

    Шаблон запуска команды:

    Создание топика

    Создадим топик orders с 3 партициями и фактором репликации 1 (у нас один брокер):

    Проверим список топиков:

    Посмотрим детали конкретного топика:

    Как связать это с архитектурой из прошлой статьи:

  • orders — канал событий домена заказов
  • 3 партиции — потенциальный параллелизм обработки (внутри одной consumer group максимум 3 консьюмера будут реально работать параллельно)
  • Отправка событий: console producer

    Отправим несколько сообщений в orders:

    После запуска вводите строки и нажимайте Enter:

    Завершить ввод: Ctrl+D.

    Отправка сообщений с ключом

    Ключ нужен, чтобы связанные события (например, по одному order_id) стабильно попадали в одну партицию.

    Запустите продюсер с включённым парсингом ключа (разделитель :):

    Вводите так:

    Идея: одинаковый ключ (1) будет маршрутизироваться в одну и ту же партицию, а значит порядок по этому ключу сохранится.

    Чтение событий: console consumer

    Прочитаем сообщения из топика с начала:

    Если сообщения продолжают приходить, consumer будет читать поток дальше.

    Показ ключа, партиции и оффсета

    Для отладки часто важно видеть метаданные:

    Так вы наглядно увидите:

  • в какие партиции распределились сообщения
  • какие offset они получили
  • как ключ влияет на маршрутизацию
  • Consumer group на практике

    В Kafka обработчики обычно работают в consumer group. Тогда несколько экземпляров сервиса делят партиции между собой.

    Запустим consumer в группе billing-service:

    Теперь Kafka будет хранить offsets для этой группы.

    Посмотрим состояние consumer group:

    Полезная интерпретация полей:

  • CURRENT-OFFSET — где группа находится сейчас
  • LOG-END-OFFSET — конец лога партиции
  • LAG — сколько сообщений “не догнали”
  • Частые проблемы в локальной разработке

  • Клиент не может подключиться, хотя порт проброшен
  • Обычно причина в advertised.listeners: Kafka сообщает клиенту адрес, по которому надо подключаться. Для подключения с хоста чаще всего нужно localhost:9092.
  • Сервис в Docker не может подключиться к Kafka по localhost
  • Внутри контейнера localhost — это сам контейнер, а не ваша машина. В docker-сети обращайтесь по имени сервиса: kafka:9092.
  • Нужно “начать с нуля”, но сообщения остаются
  • Удалите volume: docker compose down -v.
  • Как это пригодится дальше в курсе

    В следующих статьях мы подключим:

  • FastStream как удобный Python-слой над Kafka (продюсер/консьюмер, декораторы, зависимости)
  • FastAPI как HTTP-вход (например, POST /orders публикует событие в Kafka)
  • В локальной разработке вы будете использовать:

  • localhost:9092 для FastAPI/FastStream, запущенных на хосте
  • kafka:9092 для сервисов, запущенных рядом в Docker
  • Полезно сохранить как ориентир:

    | Где запущен клиент | Адрес Kafka | |---|---| | На вашей машине (host) | localhost:9092 | | В Docker Compose сети | kafka:9092 |

    Дальше мы перейдём к минимальному Python-приложению с FastStream, которое публикует и читает события из Kafka, а затем обернём публикацию событий HTTP-эндпоинтами FastAPI.

    3. FastStream с Kafka: брокер, publish/subscribe, схемы сообщений

    FastStream с Kafka: брокер, publish/subscribe, схемы сообщений

    В прошлых статьях мы:

  • разобрали концепции Kafka: топики, партиции, ключи, consumer groups и offsets
  • подняли Kafka локально через Docker Compose и научились проверять сообщения через console producer/consumer
  • Теперь переходим к Python-коду: используем FastStream как удобный слой для работы с Kafka (публикация и подписка), а также вводим схемы сообщений, чтобы события были типизированы и валидировались.

    !Поток событий: публикация в Kafka и независимое потребление разными consumer groups

    Что такое FastStream и как он соотносится с Kafka

    FastStream — фреймворк для построения асинхронных event-driven приложений в Python. Для Kafka он закрывает типовые задачи:

  • подключение к брокеру Kafka и управление жизненным циклом приложения
  • декларативные подписчики через декораторы
  • публикация сообщений (publish)
  • сериализация/десериализация и валидация данных (часто через Pydantic)
  • Полезные ссылки:

  • FastStream Documentation
  • FastStream Kafka Broker
  • Важно держать в голове соответствие:

    | Kafka-концепция | В коде FastStream | Зачем нужно | |---|---|---| | broker | KafkaBroker(...) | точка подключения к кластеру | | topic | параметр @broker.subscriber("orders") и publish(..., topic="orders") | канал событий | | consumer group | group_id=... | разделение партиций между экземплярами | | key | key=b"..." при publish | порядок по ключу внутри партиции | | message value | Pydantic-модель или dict | контракт события |

    Подготовка окружения

    Предполагается, что Kafka из прошлой статьи запущена и доступна на хосте по localhost:9092.

    Установка зависимостей

    Установите FastStream с поддержкой Kafka:

    Если вы используете Poetry/uv, принцип тот же: нужен пакет faststream с extra kafka.

    Брокер и приложение FastStream

    В FastStream вы обычно создаёте:

  • объект брокера KafkaBroker
  • объект приложения FastStream
  • подписчиков через @broker.subscriber(...)
  • Минимальный пример консьюмера, который читает строки из топика orders.

    Создайте файл consumer.py:

    Что здесь происходит:

  • KafkaBroker("localhost:9092") задаёт адрес Kafka, как в прошлой статье
  • @broker.subscriber("orders", group_id="billing-service") создаёт консьюмера в группе billing-service
  • обработчик handle_order вызывается на каждое сообщение
  • Как запустить

    Обычно FastStream запускают через CLI.

    Если команда faststream недоступна, проверьте, что пакет установлен в то же окружение, из которого вы запускаете команду.

    Publish: как отправлять сообщения из FastStream

    Сделаем отдельный скрипт-продюсер, который публикует событие в Kafka.

    Создайте producer.py:

    Запуск:

    Если в это время запущен consumer.py, вы увидите сообщение в консоли.

    Ключ сообщения (key) и порядок по order_id

    В прошлых статьях мы обсуждали, что порядок гарантирован только внутри партиции, а ключ помогает стабильно маршрутизировать события в одну партицию.

    В publish можно передать key как bytes. Например, ключом будет order_id:

    Практический смысл:

  • события с одинаковым key попадут в одну партицию
  • обработчик в группе будет видеть события по этому ключу в правильном порядке
  • Subscribe: несколько consumer groups для одного топика

    Один и тот же топик часто читают разные сервисы, и у каждого своя consumer group.

    Добавим второго подписчика (в другом процессе или даже в том же приложении, но с другим group_id).

    Пример consumer_notifications.py:

    Если запустить оба консьюмера и отправить одно событие, то:

  • billing-service получит событие
  • notifications-service тоже получит событие
  • Это базовый паттерн событийной архитектуры: один продюсер и много независимых потребителей.

    Схемы сообщений: контракт события и валидация

    До этого мы читали str или dict без строгих правил. В реальном проекте важно, чтобы событие имело понятный контракт:

  • какие поля обязательны
  • какие типы данных
  • как отличать тип события
  • В Python это удобно делать через Pydantic.

    Пример схемы OrderCreated

    Создайте schemas.py:

    И обновите консьюмера так, чтобы он принимал модель:

    Что меняется:

  • FastStream попытается десериализовать входящее сообщение и провалидировать его как OrderCreated
  • если сообщение не соответствует схеме, обработка завершится ошибкой
  • Почему это важно для проекта

    Схемы сообщений дают:

  • единый формат событий между сервисами
  • раннее обнаружение ошибок интеграции
  • автодокументацию структуры данных прямо в коде
  • Практические нюансы: сериализация, ошибки, идемпотентность

    Как сериализуются сообщения

    В типичном случае вы публикуете dict или Pydantic-модель, а FastStream отправляет в Kafka JSON-представление.

    Рекомендации для старта проекта:

  • используйте JSON для простоты
  • фиксируйте event (строку типа события) и обязательные поля
  • Что делать с “плохими” сообщениями

    Если в топик попало сообщение не по схеме, обработчик упадёт. На практике применяют несколько подходов:

  • отдельный топик для “битых” сообщений (часто называют DLQ)
  • логирование и алертинг
  • строгий контроль продюсеров, чтобы в топик попадали только валидные события
  • В этом курсе мы начнём со строгих схем и аккуратной разработки продюсеров, а стратегии DLQ/ретраев обсудим, когда будем строить проект целиком.

    Почему нужно помнить про дубли

    Kafka не гарантирует “ровно один раз” в простом сценарии. При сбоях возможны повторы, поэтому обработчики стоит проектировать так, чтобы повторная обработка не ломала состояние.

    Минимальная практическая привычка:

  • хранить у себя order_id и делать операции идемпотентно (например, “создать запись, если ещё не создана”)
  • Как это связано с FastAPI и будущим проектом

    На следующем шаге мы соединим мир HTTP и событий:

  • FastAPI будет принимать POST /orders
  • внутри обработчика будет publish события order.created в Kafka
  • FastStream-консьюмеры (в отдельных процессах) будут асинхронно выполнять действия
  • Так вы получите небольшую, но реалистичную архитектуру:

  • HTTP для команд и входа пользователей
  • Kafka для событий и слабой связности
  • FastStream для удобной реализации продюсеров/консьюмеров в Python
  • Что дальше

    Следующая статья логично продолжит эту:

  • как встроить Kafka-публикацию в FastAPI (один процесс как HTTP API)
  • как запускать консьюмеров отдельно
  • как организовать структуру небольшого проекта: папки, настройки, топики, схемы событий
  • 4. Интеграция с FastAPI: эндпоинты, фоновые задачи и жизненный цикл

    Интеграция с FastAPI: эндпоинты, фоновые задачи и жизненный цикл

    В прошлых статьях вы подняли Kafka локально, научились создавать топики и проверять сообщения, а затем подключили Python-код через FastStream (publish/subscribe и схемы сообщений). Теперь соединяем HTTP-мир и события: делаем FastAPI-сервис, который принимает запросы, публикует события в Kafka и корректно управляет подключением к брокеру через жизненный цикл приложения.

    Цель: получить понятный шаблон, который дальше превратим в небольшой проект.

    !Как HTTP-запрос превращается в событие в Kafka и обрабатывается несколькими независимыми группами

    Что именно мы интегрируем

    У нас есть три роли:

  • FastAPI: принимает команды по HTTP и возвращает ответ.
  • FastStream: удобный способ публиковать сообщения в Kafka и запускать консьюмеров.
  • Kafka: транспорт событий, топики, партиции, consumer groups.
  • В этой статье фокус на стороне продюсера:

  • эндпоинт POST /orders принимает данные заказа
  • сервис публикует событие order.created в топик orders
  • подключение к Kafka создаётся на старте приложения и закрывается при остановке
  • Зависимости и минимальная структура

    Установите зависимости:

    Рекомендуемая минимальная структура:

  • app/main.py
  • app/schemas.py
  • app/kafka.py
  • Так проще не смешивать HTTP-слой, схемы и брокер.

    Схемы HTTP и схемы событий

    Важно различать:

  • HTTP-команда от клиента: что нужно создать
  • событие в Kafka: факт, который произошёл
  • Создайте app/schemas.py:

    Смысл:

  • OrderCreate описывает вход по HTTP
  • OrderCreated описывает контракт события, которое уйдёт в Kafka
  • Брокер FastStream как общий ресурс приложения

    Подключение к Kafka не стоит создавать на каждый запрос. Правильнее поднять брокер один раз на старте приложения и закрыть на остановке.

    Создайте app/kafka.py:

    Жизненный цикл FastAPI: подключаемся на старте, закрываемся на shutdown

    В FastAPI предпочтительный способ управлять жизненным циклом приложения сейчас делается через lifespan.

    Официальная документация:

  • FastAPI: Lifespan Events
  • Создайте app/main.py:

    Что здесь происходит:

  • на старте приложения создаём KafkaBroker и делаем connect
  • сохраняем брокер в app.state, чтобы использовать в обработчиках
  • при остановке процесса корректно закрываем соединения
  • Эндпоинт, который публикует событие

    Добавим POST /orders. Для простоты в этой статье мы не пишем заказ в базу, а просто публикуем событие.

    Продолжим app/main.py:

    Ключевые детали:

  • topic="orders" соответствует топику из предыдущих статей
  • key=str(order_id).encode() помогает маршрутизировать события одного заказа в одну партицию и сохранять порядок по order_id
  • headers добавляют метаданные, которые удобно использовать для трассировки и роутинга
  • Фоновые задачи: публиковать до ответа или после ответа

    Иногда хочется быстро вернуть HTTP-ответ и отправить событие в фоне. В FastAPI для этого есть BackgroundTasks.

    Документация:

  • FastAPI: Background Tasks
  • Пример варианта после ответа:

    Как выбирать подход:

  • Публиковать до ответа: проще обеспечить, что событие точно отправлено, но клиент ждёт чуть дольше.
  • Публиковать после ответа: клиент получает ответ быстрее, но при аварийной остановке процесса сразу после ответа событие может не успеть уйти.
  • Практическое правило для старта проекта:

  • если событие критично для бизнес-процесса, публикуйте до ответа
  • если событие вторично (логирование, аналитика), можно публиковать в фоне
  • Запуск FastAPI и проверка через Kafka UI

    Запуск приложения:

    Отправка запроса:

    Проверьте в Kafka UI:

  • Kafka UI
  • Ожидания:

  • в топике orders появится новое сообщение
  • если вы добавили key, в UI или CLI будет видно распределение по партициям
  • Как запускать консьюмеров отдельно от FastAPI

    Важный архитектурный момент: FastAPI и консьюмеры обычно запускаются разными процессами.

    Пример консьюмера (отдельный файл consumer_billing.py):

    Запуск консьюмера:

    Если параллельно работает FastAPI и вы вызываете POST /orders, вы увидите обработку в консоли консьюмера.

    Частые ошибки интеграции

  • Подключение к Kafka создаётся на каждый запрос: это лишняя нагрузка и источник утечек, используйте lifespan.
  • Неправильный адрес Kafka: на хосте обычно localhost:9092, внутри Docker Compose сети обычно kafka:9092.
  • Нет key, но нужен порядок по заказу: без ключа связанные события могут разъехаться по партициям.
  • Ожидание ровно одного раза: при сбоях возможны повторы, обработчики должны быть идемпотентными.
  • Что дальше

    Следующий шаг курса логично сделать проектным:

  • выделить конфигурацию (переменные окружения, настройки брокера)
  • добавить второй эндпоинт, который публикует другое событие, например order.paid
  • написать два консьюмера в разных consumer groups и показать, что они независимо читают один топик
  • Так мы получим небольшой, но реалистичный каркас: FastAPI как вход, Kafka как шина, FastStream как удобная реализация событий.

    5. Надёжная обработка: ретраи, idempotency, DLQ и порядок сообщений

    Надёжная обработка: ретраи, idempotency, DLQ и порядок сообщений

    В предыдущих частях курса мы:

  • подняли Kafka локально и научились работать с топиками
  • подключили FastStream для publish/subscribe и схем сообщений
  • встроили публикацию событий в FastAPI через жизненный цикл приложения
  • Теперь добавим то, без чего событийные системы быстро становятся хрупкими: правила надёжной обработки. В реальности консьюмеры падают, внешние сервисы отвечают ошибками, сообщения могут приходить повторно, а порядок важен не всегда и не везде.

    В этой статье разберём четыре практических темы:

  • ретраи (повторные попытки обработки)
  • idempotency (идемпотентность обработки при дублях)
  • DLQ (dead-letter queue, топик для “битых” сообщений)
  • порядок сообщений (что именно гарантирует Kafka и как этим управлять)
  • !Схема вариантов: успех, ретраи, DLQ

    Почему “дубликаты” и “ошибки” в Kafka нормальны

    Kafka и большинство клиентских библиотек ориентированы на модель доставки как минимум один раз.

  • Как минимум один раз означает: сообщение будет доставлено, но при сбоях возможно повторение.
  • Ровно один раз возможно, но обычно требует более сложных настроек и согласованного дизайна (это отдельная тема, выходящая за рамки небольшого проекта).
  • Откуда берутся повторы:

  • консьюмер обработал сообщение, но упал до сохранения прогресса чтения
  • произошёл ребаланс consumer group, и другая реплика перечитала часть лога
  • обработчик сам “перекинул” событие в другой топик (ретраи), и оно пришло повторно
  • Вывод для проекта: консьюмер должен быть устойчив к повторам.

    Полезная база из документации:

  • Документация Apache Kafka
  • Idempotency: как пережить дубли и не сломать состояние

    Идемпотентная обработка означает: если одно и то же событие обработать дважды, итоговое состояние не станет “хуже”, чем после одной обработки.

    Важно разделять:

  • идемпотентный продюсер (на стороне записи в Kafka) и
  • идемпотентный консьюмер (на стороне бизнес-эффекта)
  • В прикладных задачах чаще всего критичен именно идемпотентный консьюмер.

    Какой идентификатор использовать для дедупликации

    Чтобы отличать “то же самое” событие от “другого”, нужен идентификатор.

    Практичный минимум для нашего проекта:

  • event_id: уникальный идентификатор события (например, UUID)
  • event: тип события (order.created, order.paid)
  • order_id: бизнес-ключ
  • Если у вас есть только order_id, дедупликация становится опасной: разные события по одному заказу будут выглядеть одинаково.

    Схема события с метаданными

    Создадим более пригодный для надёжной обработки контракт.

    app/schemas.py:

    Что это даёт:

  • meta.event_id позволяет безопасно определять повторы
  • meta.occurred_at помогает в отладке и аналитике
  • Пример простой дедупликации через уникальное ограничение

    Для учебного проекта удобно показать дедупликацию на sqlite.

    Идея:

  • перед выполнением бизнес-эффекта пытаемся записать event_id в таблицу processed_events
  • если запись уже существует, считаем событие повтором и выходим
  • consumer_billing.py:

    Почему это работает:

  • Kafka может доставить событие повторно
  • но консьюмер “узнает” event_id и не выполнит эффект второй раз
  • Ограничение примера:

  • sqlite подходит для демонстрации, но в продакшене чаще используют общую БД сервиса или Redis
  • Ретраи: как повторять обработку и не превращать топик в “кладбище”

    Ретрай это повторная попытка обработать событие, если обработка завершилась ошибкой.

    Ошибки бывают разных классов:

  • временные: сеть, таймауты, кратковременная недоступность сервиса
  • постоянные: невалидные данные, несовместимая версия схемы, нарушение бизнес-правил
  • Для временных ошибок ретраи полезны. Для постоянных ретраи вредны: вы будете бесконечно “молотить” одно и то же.

    Стратегия ретраев внутри обработчика

    Самый простой стартовый вариант:

  • сделать несколько попыток
  • между попытками увеличивать задержку
  • после исчерпания попыток отправить событие в DLQ
  • Пример (упрощённый) для FastStream:

    Что важно понимать про такой подход:

  • пока вы ретраите, вы держите обработку сообщений в этой партиции
  • при больших задержках растёт лаг consumer group
  • Это приемлемо для небольших нагрузок и учебного проекта, но на серьёзных потоках обычно используют более управляемые схемы ретраев.

    Когда лучше не ретраить

    Ретраи почти никогда не помогают, если:

  • сообщение не проходит валидацию по схеме
  • нарушены бизнес-инварианты (например, total < 0)
  • данные требуют ручного разбора
  • В этих случаях лучше сразу отправлять в DLQ.

    DLQ: “парковка” проблемных сообщений

    DLQ (dead-letter queue) в Kafka обычно реализуют как отдельный топик, например orders.dlq.

    Зачем DLQ:

  • не блокировать обработку хороших сообщений из-за “ядовитого” (poison pill)
  • сохранять исходные данные для разбора
  • дать возможность повторной обработки вручную или отдельной утилитой
  • Что класть в DLQ

    Минимально полезный набор:

  • исходное сообщение целиком
  • причина (например, retries_exhausted, validation_error)
  • метаданные о сервисе/группе, где произошла ошибка
  • В нашем примере часть метаданных мы положили в headers.

    Про заголовки Kafka:

  • заголовки это пары key/value, где value это bytes
  • заголовки удобны для диагностики и маршрутизации
  • Как создать DLQ топик локально

    Если вы создаёте топики вручную, добавьте DLQ:

    Почему часто делают столько же партиций:

  • удобно сохранять ту же стратегию ключей и потенциального параллелизма
  • Порядок сообщений: что гарантируется и как не “сломать” порядок случайно

    Kafka гарантирует порядок только внутри одной партиции.

    Это связано с тем, что каждая партиция это упорядоченный журнал.

    С практической точки зрения:

  • если события одного заказа должны обрабатываться строго последовательно, публикуйте их с ключом key=order_id
  • тогда все события одного order_id попадут в одну и ту же партицию
  • Это мы уже делали в FastAPI:

    Как порядок можно потерять даже при правильном ключе

    Даже если Kafka доставляет события в порядке, ваше приложение может его нарушить.

    Типичные причины:

  • вы обрабатываете сообщения параллельно внутри одного консьюмера без контроля ключей
  • вы делаете “переупорядочивающие” побочные эффекты (например, запускаете фоновые задачи, которые завершаются в случайном порядке)
  • Правило для старта:

  • если порядок важен, обрабатывайте события синхронно по ключу (не распараллеливайте внутри одной партиции без явной стратегии)
  • Порядок и ретраи

    Ретрай внутри обработчика сохраняет порядок, но может снижать пропускную способность:

  • следующее сообщение в этой же партиции будет ждать, пока вы закончите ретраи предыдущего
  • Если для вас важнее пропускная способность, чем строгий порядок, применяют паттерны с “retry topics” и отдельными консьюмерами. В учебном проекте достаточно понимать компромисс:

  • порядок чаще означает меньше параллелизма
  • параллелизм чаще означает сложнее гарантировать порядок
  • Практическая памятка для нашего небольшого проекта

    Таблица решений, которые хорошо работают в учебной архитектуре FastAPI + FastStream + Kafka:

    | Проблема | Минимальное решение | Что это даёт | |---|---|---| | Повторы сообщений | event_id + дедупликация в хранилище | идемпотентность эффекта | | Временные ошибки | 2–5 ретраев с задержками | меньше ручных разборов | | Постоянные ошибки | сразу в DLQ | поток не блокируется | | Нужен порядок по заказу | key=order_id | порядок внутри партиции | | Нужно расследование ошибок | DLQ + headers с причиной | прозрачная диагностика |

    Что дальше

    Следующий шаг в курсе логично сделать проектным:

  • добавить в FastAPI второй тип события (например, order.paid)
  • сделать два консьюмера в разных consumer groups
  • в одном консьюмере добавить дедупликацию по event_id
  • во втором добавить ретраи и DLQ
  • Документация, к которой полезно возвращаться:

  • FastStream Documentation
  • FastAPI Documentation
  • Документация Apache Kafka
  • 6. Наблюдаемость и эксплуатация: логи, метрики, tracing, конфигурация

    Наблюдаемость и эксплуатация: логи, метрики, tracing, конфигурация

    В предыдущих статьях мы построили каркас событийного приложения:

  • FastAPI принимает HTTP-запросы и публикует события в Kafka через FastStream
  • консьюмеры FastStream читают события в своих consumer groups
  • мы добавили основы надёжности: ретраи, идемпотентность и DLQ
  • Следующий шаг, без которого проект сложно поддерживать даже на небольшой нагрузке, это наблюдаемость: умение ответить на вопросы что происходит, почему происходит и где именно ломается.

    В этой статье разберём практический минимум для эксплуатации:

  • логи: что логировать в продюсере и консьюмере, как связывать события и запросы
  • метрики: что измерять, чтобы видеть деградации и лаг
  • tracing: как понимать путь запроса через HTTP и Kafka
  • конфигурация: как хранить настройки и не хардкодить окружение
  • !Диаграмма, показывающая, как request_id/trace_id и метрики проходят через HTTP, Kafka и консьюмеров

    Наблюдаемость: что это и зачем

    Наблюдаемость обычно собирают из трёх источников:

  • Логи отвечают на вопрос что произошло и с какими данными.
  • Метрики отвечают на вопрос насколько хорошо работает система в целом.
  • Tracing отвечает на вопрос где именно во времени тратится задержка и где цепочка ломается.
  • Особенность событийной архитектуры: обработка «растянута» по времени и сервисам. Клиент получил HTTP-ответ, а консьюмеры ещё могут обрабатывать событие секунды или минуты. Поэтому ключевая задача наблюдаемости здесь это корреляция: уметь связать HTTP-запрос, опубликованное событие и обработку этого события.

    Логи: базовый стандарт для продюсера и консьюмера

    Что логировать

    Минимальный набор полей, которые почти всегда нужны:

  • event: тип события, например order.created
  • event_id: уникальный идентификатор события (мы вводили его для идемпотентности)
  • order_id: бизнес-ключ
  • topic: топик Kafka
  • group_id: consumer group (для консьюмера)
  • partition и offset: позиция сообщения (для консьюмера)
  • correlation_id: идентификатор цепочки (обычно связывает HTTP-запрос и все события от него)
  • trace_id: идентификатор трассировки (если используете tracing)
  • Важное правило: не логируйте персональные данные и секреты. Если данные чувствительные, логируйте только идентификаторы и технические поля.

    Структурированные логи

    Если лог это просто строка, её сложно искать и агрегировать. Структурированный лог это когда вы выводите поля как отдельные атрибуты.

    В Python можно стартовать с обычного logging, но выводить сообщения в едином формате. Минимальный практичный вариант без внешних библиотек:

    Теперь и в FastAPI, и в консьюмере вы можете логировать одинаково:

    Корреляция HTTP-запроса и Kafka-события

    Для корреляции нужен correlation_id. Типовая практика:

  • если клиент прислал заголовок X-Correlation-Id, используем его
  • иначе генерируем новый
  • кладём correlation_id в Kafka headers
  • консьюмер читает headers и пишет тот же correlation_id в лог
  • #### FastAPI: выдаём и принимаем correlation_id

    #### Продюсер: кладём correlation_id в Kafka headers

    #### Консьюмер: читаем headers и логируем их

    В Kafka заголовки приходят как пары bytes. В FastStream доступ к «сырому» сообщению зависит от используемого API и версии, но практический подход одинаковый: извлекаете correlation_id и включаете в лог.

    Если вы не извлекаете headers напрямую, минимальный учебный компромисс это дублировать correlation_id в теле события. Для продакшена обычно предпочитают headers, чтобы не смешивать бизнес-данные и технические метаданные.

    Уровни логирования

    Рекомендация для небольшого проекта:

  • INFO: публикации, успешные обработки, важные переходы состояний
  • WARNING: ретраи, временные деградации, пропуски дублей
  • ERROR: падение обработки, отправка в DLQ, необработанные исключения
  • Документация по logging:

  • Документация logging в Python
  • Метрики: что измерять в FastAPI и консьюмерах

    Метрики нужны, чтобы видеть тренды и деградации без просмотра логов. Минимальный набор метрик для нашего проекта:

  • HTTP:
  • - количество запросов по методам и статусам - latency (время ответа)
  • Продюсер:
  • - количество опубликованных сообщений - количество ошибок публикации
  • Консьюмер:
  • - количество успешно обработанных сообщений - количество ошибок обработки - количество ретраев - количество сообщений, отправленных в DLQ - latency обработки сообщения
  • Kafka-специфичное:
  • - consumer lag (насколько группа отстаёт от конца партиций)

    Простейшая интеграция Prometheus в FastAPI

    Самый простой старт это использовать prometheus_client и отдельный эндпоинт /metrics.

  • Prometheus Python client
  • Prometheus: документация
  • Пример для FastAPI:

    Метрики в консьюмерах

    В консьюмерах обычно добавляют счётчики и таймер обработки:

    Практический нюанс: если у вас несколько консьюмеров, у каждого будет свой порт метрик. В продакшене это обычно решается сервис-дискавери или прокси, но для учебного проекта достаточно понимать принцип.

    Consumer lag

    Consumer lag это разница между концом лога партиции и текущим offset группы. Это один из самых важных сигналов: если лаг растёт, система не успевает обрабатывать поток.

    В учебной среде лаг удобно смотреть:

  • через Kafka UI
  • через kafka-consumer-groups.sh --describe
  • В продакшене лаг обычно экспортируют в Prometheus через отдельный экспортёр, но это уже инфраструктурная часть, не обязательная для небольшого учебного проекта.

    Tracing: как понять путь запроса через Kafka

    Что такое tracing простыми словами

    Tracing это «маршрут» одного запроса, разбитый на интервалы времени, которые называются spans. Например:

  • span HTTP POST /orders
  • span publish to Kafka orders
  • span billing-service process event
  • span notifications-service send message
  • Трассировка помогает ответить:

  • где появилась задержка
  • какой компонент упал
  • сколько времени прошло от HTTP-команды до финального эффекта
  • Стандарт де-факто для передачи контекста трассировки это W3C Trace Context, где ключевой заголовок называется traceparent.

  • OpenTelemetry: документация
  • W3C Trace Context
  • Минимальная стратегия для нашего проекта

    Для связки FastAPI -> Kafka -> консьюмеры нужно:

  • создать trace/span в HTTP-обработчике
  • передать контекст в Kafka headers, обычно как traceparent
  • на стороне консьюмера извлечь traceparent и продолжить trace
  • Если вы пока не готовы поднимать полноценный tracing backend, всё равно полезно передавать trace_id в логах и headers. Это позволит позже «включить» tracing без изменения контрактов.

    Конфигурация: как не хардкодить окружение

    В предыдущих статьях мы использовали localhost:9092. Для эксплуатации это неудобно: адреса, топики, уровни логов, DLQ и порты метрик должны быть настройками.

    Принципы конфигурации

  • настройки читаются из переменных окружения
  • код не меняется при смене окружения
  • секреты не хранятся в репозитории
  • Это соответствует подходу The Twelve-Factor App.

  • The Twelve-Factor App
  • Пример настроек через pydantic-settings

    Для FastAPI и консьюмеров удобно иметь один объект настроек.

  • pydantic-settings: документация
  • Дальше используете settings.kafka_bootstrap и settings.orders_topic и в FastAPI, и в консьюмерах.

    Конфигурация в Docker Compose

    Чтобы не менять код, прокидывайте переменные окружения:

    Эксплуатационные мелочи, которые экономят часы

    Health checks: что «живое», а что «готово»

    Два разных смысла:

  • liveness: процесс жив и не завис
  • readiness: процесс готов обрабатывать запросы или сообщения
  • Для FastAPI обычно делают /health и /ready. В /ready полезно проверять, что брокер подключён.

  • FastAPI: про события жизненного цикла
  • Graceful shutdown

    Мы уже подключали брокер через lifespan. Это важно для эксплуатации:

  • при остановке сервиса соединения закрываются корректно
  • меньше риска «потерять» публикацию в фоне
  • DLQ как сигнал качества данных

    Если DLQ начинает расти, это почти всегда:

  • проблема совместимости схем
  • некорректные продюсеры
  • бизнес-ошибки, которые нельзя решить ретраями
  • Поэтому DLQ стоит считать метрикой и отдельным источником расследований.

    Итог

    Для небольшого проекта FastAPI + FastStream + Kafka наблюдаемость можно собрать без тяжёлой инфраструктуры, если соблюдать дисциплину:

  • передавайте correlation_id и event_id из HTTP в Kafka и далее в консьюмеры
  • логируйте структурированно и одинаковыми полями во всех компонентах
  • добавьте базовые метрики: объёмы, ошибки, latency, DLQ, лаг
  • держите конфигурацию во внешних настройках через переменные окружения
  • С этим набором вы сможете уверенно развивать проект из курса: добавлять новые события, новые consumer groups и понимать, что происходит в системе при сбоях и под нагрузкой.

    7. Небольшой проект: сервис заказов и событийный обработчик уведомлений

    Небольшой проект: сервис заказов и событийный обработчик уведомлений

    В прошлых статьях вы по отдельности разобрали Kafka (топики, партиции, группы), подняли её локально, научились публиковать и читать события через FastStream, встроили публикацию в FastAPI и обсудили надёжность и наблюдаемость.

    Теперь соберём небольшой, но целостный проект: HTTP-сервис заказов на FastAPI, который публикует событие order.created в Kafka, и событийный обработчик уведомлений на FastStream, который читает эти события, делает ретраи при временных ошибках, обеспечивает идемпотентность и отправляет “плохие” события в DLQ.

    !Общая схема проекта: HTTP-команда превращается в событие, которое обрабатывается асинхронно

    Что построим и какие правила примем

    Мы строим минимальную архитектуру:

  • Orders API (FastAPI) принимает команду создания заказа и публикует событие в Kafka.
  • Notifications worker (FastStream) читает события о заказах и “отправляет уведомления” (для учебного проекта это будет имитация отправки).
  • Kafka хранит поток событий.
  • Правила качества, которые заложим сразу:

  • Порядок по заказу: события одного order_id публикуются с key=order_id.
  • Идемпотентность: уведомления не должны “отправляться дважды” при повторной доставке, поэтому используем event_id и таблицу обработанных событий.
  • Ретраи и DLQ: при временных ошибках делаем несколько попыток, при исчерпании отправляем событие в orders.dlq.
  • Корреляция: прокидываем correlation_id из HTTP в Kafka headers.
  • Подготовка окружения Kafka

    Предполагается, что Kafka поднимается через Docker Compose из предыдущей статьи и доступна на хосте по localhost:9092, а Kafka UI по localhost:8080.

    Если нужно освежить, ориентируйтесь на официальные источники:

  • Документация Apache Kafka
  • FastStream документация
  • FastAPI документация
  • Топики

    Нам нужны два топика:

  • orders — основной поток событий о заказах
  • orders.dlq — DLQ для проблемных сообщений
  • Создание топиков (внутри контейнера Kafka):

    Проверка:

    Структура проекта

    Предлагаемая структура:

  • app/main.py — FastAPI приложение
  • app/kafka.py — сборка брокера
  • app/settings.py — конфигурация
  • app/schemas.py — схемы HTTP и событий
  • workers/notifications.py — FastStream консьюмер уведомлений
  • Так проще разделять HTTP-слой и обработчики событий.

    Зависимости

    Установка зависимостей (в одном виртуальном окружении):

    Конфигурация

    Создайте app/settings.py:

    Смысл:

  • код не зависит от окружения
  • в Docker вы сможете переопределить KAFKA_BOOTSTRAP на kafka:9092
  • Схемы данных: HTTP-команда и событие

    Создайте app/schemas.py:

    Что здесь важно:

  • OrderCreate это команда от клиента
  • OrderCreated это факт, который случился
  • meta.event_id нужен для дедупликации
  • Orders API: FastAPI, публикация события и correlation id

    Брокер

    Создайте app/kafka.py:

    Приложение FastAPI

    Создайте app/main.py:

    Почему так:

  • соединение с Kafka создаётся в lifespan, а не на каждый запрос
  • key равен order_id, значит связанные события попадут в одну партицию
  • correlation_id возвращается клиенту и прокидывается в Kafka headers
  • Запуск API:

    Notifications worker: консьюмер с идемпотентностью, ретраями и DLQ

    Задача обработчика:

  • читать OrderCreated из orders в группе notifications-service
  • не повторять эффект, если Kafka доставила сообщение повторно
  • имитировать отправку уведомления, иногда “падать” и ретраить
  • после исчерпания ретраев отправлять событие в orders.dlq
  • Создайте workers/notifications.py:

    Ключевые моменты:

  • group_id задаёт consumer group, а значит масштабирование делается количеством реплик обработчика, но параллелизм ограничен партициями
  • mark_processed(event_id) это минимальная дедупликация, чтобы переживать повторы доставки
  • ретраи сделаны внутри обработчика, поэтому порядок в партиции сохраняется, но пропускная способность падает при долгих задержках
  • при исчерпании ретраев мы публикуем исходное событие в orders.dlq с техническими headers
  • Запуск воркера:

    Проверка проекта end-to-end

    Отправка заказа

    Ожидаемое поведение:

  • API вернёт accepted и отдаст X-Correlation-Id
  • в Kafka UI в топике orders появится сообщение
  • воркер выведет либо notification sent, либо несколько notification retry и, возможно, moved to dlq
  • Проверка DLQ

    Если вы увидели moved to dlq, откройте Kafka UI:

  • Kafka UI репозиторий
  • И посмотрите сообщения в orders.dlq. В headers будут dlq_reason и source_group.

    Как это расширять дальше

    Этот проект легко развивать, не ломая архитектуру:

  • добавить второе событие, например order.paid, и второй консьюмер, например billing-service
  • добавить базу данных для заказов и реализовать паттерн outbox (это следующий логичный шаг, если нужно гарантировать публикацию события вместе с записью в БД)
  • добавить метрики и tracing из прошлой статьи, используя те же correlation_id и event_id
  • Итог

    Вы получили рабочий учебный каркас, где:

  • FastAPI превращает HTTP-команду в Kafka-событие
  • FastStream-консьюмер обрабатывает событие асинхронно
  • порядок по заказу достигается через key=order_id
  • повторы доставки не ломают систему из-за event_id и дедупликации
  • временные сбои обрабатываются ретраями, а безнадёжные случаи уходят в DLQ
  • Этот уровень уже достаточно близок к реальным практикам, чтобы на его основе собирать “маленькие продакшеноподобные” сервисы и уверенно отлаживать их через Kafka UI и логи.