Project Reactor в Java: реактивное программирование на практике

Курс знакомит с реактивной моделью и библиотекой Project Reactor (Mono/Flux), операторами, планировщиками и обработкой ошибок. Вы научитесь строить потоковые пайплайны, тестировать их и интегрировать Reactor со Spring WebFlux и реальными источниками данных.

1. Основы Reactive Streams и место Project Reactor

Основы Reactive Streams и место Project Reactor

Зачем нужен Reactive Streams

Реактивное программирование в Java появилось как практический ответ на две проблемы классического подхода блокирующих вызовов и потоков:

  • Нагрузка растёт быстрее, чем количество потоков, которые вы можете позволить себе держать в приложении.
  • Источники данных работают с разной скоростью, и «быстрый» продюсер может легко перегрузить «медленного» потребителя.
  • Reactive Streams решает это через стандартизированный протокол обмена данными с поддержкой backpressure (управления скоростью), чтобы потребитель мог явно сообщать, сколько элементов он готов принять.

    Reactive Streams — это не «ещё одна библиотека», а спецификация (набор интерфейсов и правил взаимодействия).

    Полезные источники:

  • Reactive Streams
  • Reactive Streams JVM README (правила и TCK)
  • Что такое Reactive Streams

    Reactive Streams определяет минимальный набор ролей и сигналов для асинхронной потоковой обработки данных.

    Основные роли:

  • Publisher — источник данных (публикует элементы).
  • Subscriber — потребитель данных (получает элементы).
  • Subscription — связь между Publisher и Subscriber, через которую Subscriber запрашивает элементы и может отменить подписку.
  • Processor — одновременно Subscriber и Publisher (полезен для промежуточных стадий обработки, хотя на практике чаще используют операторы библиотек).
  • Эти роли формируют контракт: как подписываться, как запрашивать элементы, как завершать поток и как обрабатывать ошибки.

    Сигналы и жизненный цикл потока

    Reactive Streams поток — это последовательность сигналов между Publisher и Subscriber.

    Ключевые сигналы:

  • onSubscribe(subscription) — подписка установлена, Subscriber получил Subscription.
  • request(n) — Subscriber просит у Publisher n элементов (это и есть backpressure).
  • onNext(value) — Publisher отправляет очередной элемент.
  • onError(error) — поток завершился с ошибкой (финальный сигнал).
  • onComplete() — поток успешно завершился (финальный сигнал).
  • cancel() — Subscriber отменяет подписку.
  • Важное правило: после onError или onComplete никаких onNext быть не может.

    !Диаграмма показывает порядок сигналов и как request(n) ограничивает выдачу элементов.

    Backpressure простыми словами

    Backpressure — это механизм, который не позволяет источнику «затопить» потребителя.

    Интуитивная модель:

  • Publisher может производить данные быстро.
  • Subscriber не обязан принимать всё сразу.
  • Subscriber сам управляет скоростью: запрашивает request(n) порциями.
  • Если Subscriber запросил n, Publisher не имеет права отправить больше n элементов, пока не придёт следующий request.

    Это принципиальное отличие от многих «push-only» подходов, где скорость диктует источник.

    Где Reactive Streams находится в Java

    Спецификация Reactive Streams не привязана к конкретной библиотеке. В Java есть стандартные интерфейсы в составе JDK:

  • java.util.concurrent.Flow.Publisher
  • java.util.concurrent.Flow.Subscriber
  • java.util.concurrent.Flow.Subscription
  • java.util.concurrent.Flow.Processor
  • Они концептуально соответствуют Reactive Streams.

    Документация JDK:

  • Flow (Java SE)
  • Однако сам по себе Flow — это только интерфейсы. Нужна библиотека, которая:

  • реализует протокол корректно,
  • даёт готовые типы и операторы,
  • решает планирование (потоки), отмену, ошибки, ресурсы,
  • предоставляет удобный API для повседневной разработки.
  • Здесь и появляется Project Reactor.

    Место Project Reactor

    Project Reactor — это реактивная библиотека для JVM, которая:

  • реализует спецификацию Reactive Streams,
  • предоставляет два ключевых типа: Flux (0..N элементов) и Mono (0..1 элемент),
  • даёт богатый набор операторов преобразования потоков,
  • хорошо интегрируется со Spring (особенно Spring WebFlux),
  • включает инструменты для тестирования реактивных цепочек.
  • Официальные источники:

  • Project Reactor Reference Guide
  • reactor-core (GitHub)
  • Flux и Mono: базовая модель

    Reactor строится вокруг двух типов:

  • Mono<T> — асинхронный источник, который выдаёт либо 0 элементов (пусто), либо 1 элемент, либо ошибку.
  • Flux<T> — асинхронный источник, который выдаёт 0..N элементов, либо ошибку.
  • Примеры:

    Важно понимать: subscribe(...) — это момент, когда вы подключаете подписчика и запускаете обработку. Многие реактивные цепочки по умолчанию ленивые: пока нет подписки, работа не выполняется.

    Реактивная цепочка и операторы

    Reactor поощряет стиль «конвейера»: вы собираете цепочку операторов, которые описывают что делать с данными.

    Пример преобразований:

    Типичные категории операторов:

  • преобразование: map, flatMap
  • фильтрация: filter, take
  • агрегация: collectList, reduce
  • работа с ошибками: onErrorReturn, onErrorResume
  • управление временем: delayElements, timeout
  • Как увидеть backpressure в Reactor

    Reactor позволяет подписаться так, чтобы управлять запросами вручную (это полезно для обучения и диагностики).

    Здесь Subscriber сначала просит 2 элемента, получает их через hookOnNext, затем снова просит следующую порцию.

    Cold и hot источники: важная интуиция

    В реактивных библиотеках часто различают:

  • cold источники: каждый новый подписчик обычно получает данные «заново» (например, Flux.range, повторный HTTP-запрос при каждой подписке).
  • hot источники: данные «идут независимо от подписчиков» (например, события UI, поток сообщений, broadcast).
  • На старте курса достаточно запомнить: тип источника влияет на то, что увидят разные подписчики и когда реально начинается работа.

    Потоки выполнения и Scheduler: что Reactor делает с потоками

    Reactive Streams сам по себе не гарантирует «магическую многопоточность». Реактивность — про асинхронность, неблокирующий подход и управление давлением, а не про параллелизм.

    В Reactor за переключение потоков обычно отвечают:

  • publishOn(scheduler) — меняет поток, в котором дальше обрабатываются сигналы.
  • subscribeOn(scheduler) — влияет на то, где выполняется подписка и генерация источника.
  • Вы подробно разберёте это в следующих темах курса, но важно помнить уже сейчас:

  • без явного переключения Scheduler цепочка может выполняться в текущем потоке,
  • блокирующие операции нужно изолировать (иначе они «ломают» неблокирующую модель).
  • Ошибки и завершение: базовые правила

    Reactive Streams поток завершается ровно один раз, одним из способов:

  • успешно: onComplete
  • с ошибкой: onError
  • отменой: cancel
  • Reactor даёт декларативные способы обработки ошибок:

    Эта модель важна для проектирования API: ошибка — это такой же «сигнал», как данные, а не исключение, которое обязательно должно пробрасываться вверх по стеку.

    Совместимость экосистемы

    Поскольку Reactor реализует Reactive Streams, он хорошо сочетается с другими компонентами JVM-мира, которые тоже опираются на этот стандарт.

    На практике это означает:

  • библиотеки могут обмениваться Publisher-типами без жёсткой привязки к конкретной реализации,
  • инфраструктурные компоненты (например, серверы, драйверы, брокеры) могут предоставлять реактивные API.
  • Итоги

    Теперь у вас есть опорные понятия, на которых держится весь курс:

  • Reactive Streams — спецификация, определяющая роли Publisher/Subscriber/Subscription и протокол сигналов.
  • Backpressure — ключевая идея, позволяющая согласовать скорость продюсера и потребителя.
  • Project Reactor — практическая библиотека, реализующая Reactive Streams и предоставляющая Mono/Flux и множество операторов.
  • В следующих материалах курса вы углубите понимание Mono/Flux, операторов, управления потоками (Schedulers) и типовых паттернов построения реактивных пайплайнов в реальных приложениях.

    2. Mono и Flux: создание, подписка и жизненный цикл

    Mono и Flux: создание, подписка и жизненный цикл

    Связь с Reactive Streams

    В предыдущей статье вы познакомились с Reactive Streams: ролями Publisher/Subscriber/Subscription, сигналами onNext/onError/onComplete и идеей backpressure через request(n).

    Project Reactor строит удобный прикладной API поверх этих правил. Два основных типа Reactor:

  • Mono<T> — 0 или 1 значение (или ошибка)
  • Flux<T> — 0..N значений (или ошибка)
  • Они оба являются Publisher и подчиняются тем же правилам жизненного цикла.

    Официальные источники для справки:

  • Project Reactor Reference Guide
  • Javadoc Mono
  • Javadoc Flux
  • Модель данных: чем Mono отличается от Flux

    С точки зрения контракта сигналов:

  • Mono может отправить:
  • - onNext(value) и затем onComplete() - только onComplete() (пустой результат) - onError(error)
  • Flux может отправить:
  • - много onNext(value) и затем onComplete() - ни одного onNext, но onComplete() - onError(error)

    Практическое правило выбора:

  • используйте Mono для одной сущности: загрузить пользователя, выполнить команду, вернуть один DTO
  • используйте Flux для коллекции/потока: список пользователей, события, сообщения
  • > Важно: Mono и Flux по умолчанию ленивые. Пока не произойдёт subscribe(...), никаких вычислений или запросов часто не будет.

    !Диаграмма показывает допустимые последовательности сигналов для Mono и Flux и место request(n).

    Создание Mono: основные фабрики

    Немедленные значения и завершения

    Когда это полезно:

  • just — у вас уже есть значение
  • empty — отсутствие результата как нормальная ветка (например, сущность не найдена)
  • error — явная реактивная ошибка вместо бросания исключения вверх по стеку
  • Отложенное вычисление: fromSupplier, fromCallable, defer

    Ключевая мысль: создание цепочки и выполнение — разные вещи.

    Разница, которую важно запомнить:

  • fromSupplier удобно для логики без checked-исключений
  • fromCallable подходит, когда код может бросить исключение: Reactor превратит его в onError
  • defer нужен, когда вы хотите создавать новый Mono на каждую подписку:

    Если вместо defer написать Mono.just("time=" + System.currentTimeMillis()), время вычислится сразу при сборке цепочки, а не при подписке.

    Оборачивание будущего результата: fromFuture и fromCompletionStage

    Это полезно для интеграции с существующим кодом, но помните: CompletableFuture сам по себе не поддерживает backpressure, а значит контроль скорости возможен только на реактивной стороне, после получения результата.

    Создание Flux: основные фабрики

    Наборы данных: just, fromIterable, range

    Источник по времени: interval

    interval создаёт бесконечный поток, пока его не отменят или не ограничат операторами вроде take.

    Программная генерация: generate и create

    Эти методы нужны, когда вы сами выдаёте элементы в поток.

    generate — синхронная генерация, по одному элементу за шаг:

    create — более гибкий вариант, подходит для мостов с callback-API и может эмитить несколько элементов за вызов:

    На старте курса достаточно понимать назначение:

  • generate — когда удобно мыслить как итератор (строго по одному next)
  • create — когда вы получаете события извне и прокидываете их в Flux
  • Подписка: что делает subscribe(...)

    subscribe(...):

  • создаёт подписчика
  • инициирует onSubscribe
  • запускает выполнение цепочки (для ленивых источников)
  • Самый простой вариант:

    Полная форма подписки с обработчиками:

    Disposable: как отменить подписку

    subscribe(...) обычно возвращает Disposable, который позволяет отменить подписку:

    Отмена подписки соответствует cancel() на уровне Reactive Streams.

    Жизненный цикл сигналов в Reactor: как наблюдать и отлаживать

    doOn... операторы для наблюдения

    Reactor позволяет «подсмотреть» сигналы, не изменяя данные:

    Полезные наблюдения:

  • doOnSubscribe срабатывает при установке подписки
  • doOnRequest показывает, что backpressure реально есть: подписчик запрашивает элементы порциями
  • doOnNext видит элементы
  • doOnComplete и doOnError — финальные сигналы
  • log() для быстрого просмотра сигналов

    Для обучения и диагностики удобно включать log():

    log() выводит события Reactive Streams, включая request.

    Backpressure в Flux: ручной контроль запроса

    Чаще всего Reactor сам выставляет запросы так, чтобы пайплайн работал эффективно. Но важно уметь увидеть, что происходит.

    Пример подписчика, который запрашивает элементы порциями:

    Здесь потребитель сам задаёт темп: он никогда не попросит больше 2 элементов за раз.

    Для Mono backpressure проще: максимум один элемент. Но протокол всё равно соблюдается: подписчик всё так же делает request(1).

    Cold источники и эффект повторной подписки

    Многие стандартные источники Reactor являются cold: при каждой новой подписке они «проигрываются заново».

    В этом примере вы, скорее всего, увидите два разных значения, потому что вычисление происходит на каждую подписку.

    !Иллюстрация объясняет, что cold-источник выполняется заново для каждого подписчика.

    Завершение, ошибки и отмена: три способа закончить поток

    Любой Mono/Flux завершается ровно одним из способов:

  • успешно: onComplete
  • с ошибкой: onError
  • отменой: cancel (обычно через Disposable.dispose() или операторы вроде take)
  • Пример обработки ошибки с запасным вариантом:

    Важное предупреждение: block() и почему его нужно избегать

    Reactor даёт методы block()Mono) и blockFirst()Flux) для мостов с блокирующим миром:

    Это синхронное ожидание результата и оно:

  • блокирует текущий поток
  • может разрушать преимущества реактивного подхода
  • Используйте block() только на границах системы: в простых утилитах, прототипах, тестах или при интеграции с легаси-кодом, где по-другому нельзя.

    Итоги

    Теперь у вас есть практическая база работы с Mono и Flux:

  • чем отличаются Mono (0..1) и Flux (0..N)
  • как создавать источники: значения, пусто/ошибка, отложенные вычисления, диапазоны, интервалы, генераторы
  • что делает subscribe(...) и как отменять подписку через Disposable
  • как выглядит жизненный цикл сигналов и как наблюдать его через doOn... и log()
  • как backpressure проявляется в Flux через request(n)
  • Следующий шаг курса обычно состоит в том, чтобы научиться уверенно комбинировать операторы (map, flatMap, filter, collectList) и понимать, где именно выполняется код и как управлять потоками выполнения через Scheduler.

    3. Ключевые операторы: map/flatMap, filter, merge/zip, window/buffer

    Ключевые операторы: map/flatMap, filter, merge/zip, window/buffer

    Контекст: как операторы «собирают» реактивный конвейер

    В предыдущих статьях вы разобрали базовую модель Reactor:

  • Mono<T> как источник 0..1 элемента
  • Flux<T> как источник 0..N элементов
  • сигналы onNext/onError/onComplete и ленивость (цепочка выполняется при subscribe)
  • Теперь переходим к практике построения пайплайнов: операторы — это методы, которые создают новый Mono/Flux, добавляя шаг обработки данных.

    Полезные официальные ссылки:

  • Project Reactor Reference Guide
  • Javadoc Flux
  • Javadoc Mono
  • Карта операторов из этой статьи

    | Задача | Оператор(ы) | Ключевая идея | |---|---|---| | Преобразовать элементы 1-в-1 | map | синхронная функция, без вложенных Publisher | | Преобразовать элемент в асинхронный шаг | flatMap | функция возвращает Publisher, происходит «сплющивание» | | Отфильтровать элементы | filter | пропускаем дальше только подходящие значения | | Объединить два потока | merge | значения приходят по мере готовности из обоих источников | | Сопоставить элементы попарно | zip | собираем пары/кортежи по индексу (1-й с 1-м, 2-й со 2-м) | | Группировать в коллекции | buffer | Flux<T>Flux<List<T>> | | Группировать в «подпотоки» | window | Flux<T>Flux<Flux<T>> |

    !Общая схема того, как операторы последовательно преобразуют поток

    map и flatMap

    map: преобразование «значение в значение»

    map применяет функцию к каждому элементу и выдаёт новый элемент. Функция выполняется синхронно в контексте текущего выполнения цепочки.

    Практические свойства map:

  • удобно для чистых преобразований: парсинг, форматирование, вычисление полей
  • не меняет «форму» потока: Flux<T> остаётся Flux<R>
  • не подходит для шагов, которые возвращают Mono/Flux (например, реактивный запрос в БД)
  • Ошибка новичка: «вложенный Mono» при map

    Представим, что есть метод findNameById(int id), который возвращает Mono<String>.

    Если сделать так:

    Тип результата будет Flux<Mono<String>>, и в subscribe вы увидите не строки, а объекты Mono.

    flatMap: «значение в Publisher» и затем сплющивание

    flatMap решает предыдущую проблему: функция возвращает Publisher (Mono или Flux), а оператор «сплющивает» результат в единый поток.

    Важный нюанс flatMap: порядок элементов

    flatMap часто выполняет внутренние операции конкурентно (например, несколько запросов одновременно). Поэтому порядок onNext может не совпасть с исходным порядком элементов.

    Здесь элементы могут прийти не как done-1, done-2, done-3, а в порядке готовности.

    Если вам критически важен исходный порядок, в Reactor обычно используют concatMap (последовательно) или flatMapSequential (конкурентно, но выдача в исходном порядке). Эти операторы выходят за рамки данной статьи, но важно знать, что «порядок» — осознанный выбор.

    !Интуитивная разница между map и flatMap и влияние на порядок

    filter

    filter пропускает дальше только те элементы, для которых предикат возвращает true.

    Что важно понимать про filter:

  • это не «удаление» элементов из уже готовой коллекции, а решение на лету для каждого onNext
  • filter не превращает ошибку в пустой поток: если выше по цепочке пришёл onError, фильтр не «спасёт» поток
  • filter — частый инструмент для backpressure-дружелюбного снижения объёма данных до тяжёлых операторов дальше по конвейеру
  • merge и zip

    Эти операторы объединяют несколько источников, но делают это принципиально по-разному.

    merge: «всё, что пришло — отдаём дальше»

    merge объединяет элементы из нескольких Publisher в один поток. Элементы приходят по мере готовности каждого источника.

    Практические свойства merge:

  • порядок элементов между разными источниками не гарантируется
  • отлично подходит для объединения независимых событий (например, два источника метрик)
  • если один из источников завершится с ошибкой, общий поток обычно завершится с ошибкой (это важная часть реактивной модели ошибок)
  • zip: «сопоставляем элементы попарно»

    zip ждёт, пока у каждого источника будет доступен очередной элемент, и затем комбинирует их.

    Практические свойства zip:

  • семантика «по индексу»: -й элемент первого потока объединится с -м элементом второго
  • если один поток завершится раньше, zip тоже завершится (потому что больше не сможет образовывать пары)
  • zip удобен, когда вам нужна синхронизация результатов (например, склеить данные из двух запросов)
  • !Разница семантики merge и zip на временной шкале

    buffer и window

    Оба оператора группируют элементы, но возвращают разные «формы» результата.

    buffer: группируем в списки

    buffer(n) собирает элементы партиями по n штук и выдаёт List<T>.

    Результат будет похож на:

  • [1, 2, 3]
  • [4, 5, 6]
  • [7, 8, 9]
  • [10]
  • Где это полезно:

  • пакетная запись/отправка (batch insert, batch API)
  • уменьшение накладных расходов на сетевые вызовы
  • О чём стоит помнить:

  • buffer удерживает элементы в памяти до формирования партии (поэтому размер буфера — осознанный выбор)
  • window: группируем в подпотоки

    window(n) тоже группирует по n элементов, но возвращает потоки, а не списки: Flux<Flux<T>>.

    Почему window бывает предпочтительнее buffer:

  • окно можно обрабатывать как отдельный реактивный конвейер (например, параллелить обработку разных окон)
  • не обязательно собирать всё в память сразу в виде List, если обработка может быть потоковой
  • Как выбрать между buffer и window

    | Вопрос | Чаще выбирают | |---|---| | Нужен List<T> и пакетный вызов (одним запросом/сообщением) | buffer | | Нужно продолжать реактивную обработку «по частям», сохраняя потоковую природу | window | | Критично контролировать память (не копить большие списки) | чаще window, но всё зависит от обработки |

    Практические комбинации операторов

    Ниже несколько типовых «мини-рецептов», которые встречаются в реальном коде.

    map + filter: «привести данные к форме и отсечь лишнее»

    flatMap + merge: «параллельные асинхронные задачи и общий поток результатов»

    Идея: каждый входной элемент порождает асинхронный Mono, а flatMap естественным образом даёт объединённый поток.

    zip: «собрать DTO из разных источников»

    buffer: «батчинг для внешнего API»

    Частые ошибки и как их избегать

  • Использовать map там, где нужен flatMap
  • Ожидать сохранения порядка от flatMap
  • Путать merge и zip
  • Делать слишком большие buffer без оценки памяти
  • В следующих темах курса обычно добавляют ещё два ключевых измерения:

  • управление потоками выполнения (publishOn/subscribeOn и Scheduler)
  • обработка ошибок и повторные попытки
  • Эти темы важны, потому что как только в цепочке появляются реальные I/O-операции (HTTP, БД, брокеры), то поведение flatMap, merge, timeout, ретраев и планировщиков становится решающим для производительности и устойчивости.

    Итоги

  • map — синхронное преобразование элементов 1-в-1.
  • flatMap — преобразование элемента в Publisher и сплющивание, часто с конкуренцией и без гарантии порядка.
  • filter — отбор элементов по условию.
  • merge — объединение потоков по мере готовности элементов.
  • zip — попарное сопоставление элементов из потоков (зависит от самого медленного и от длины потоков).
  • buffer — группировка в списки.
  • window — группировка в подпотоки для дальнейшей реактивной обработки.
  • 4. Backpressure, управление скоростью и Schedulers

    Backpressure, управление скоростью и Schedulers

    Контекст: что добавляется к Mono/Flux и операторам

    В прошлых статьях вы разобрали:

  • модель сигналов Reactive Streams (onSubscribe, request(n), onNext, onError, onComplete, cancel)
  • типы Reactor Mono и Flux, ленивость и подписку
  • базовые операторы построения конвейера (map, flatMap, merge, zip, buffer, window)
  • Теперь добавим две практические оси, без которых сложно писать производительный реактивный код:

  • backpressure и управление скоростью: что делать, если источник производит быстрее, чем потребитель успевает обработать
  • Schedulers и управление потоками: где именно выполняется код, и как изолировать блокирующие или тяжёлые участки
  • Официальные источники:

  • Reactive Streams
  • Project Reactor Reference Guide
  • Javadoc Schedulers
  • Backpressure как договорённость о скорости

    Backpressure в Reactive Streams устроен просто: потребитель сам сообщает, сколько элементов он готов принять.

  • Subscriber получает Subscription в onSubscribe
  • затем вызывает request(n)
  • Publisher не имеет права отправить больше n элементов, пока не получит следующий запрос
  • Эта модель важна по двум причинам:

  • она защищает потребителя от перегрузки (память, CPU, очередь)
  • она заставляет вас явно выбирать стратегию, когда «данных слишком много»
  • !Диаграмма показывает, что Publisher отправляет не больше элементов, чем было запрошено

    Как увидеть backpressure в Reactor

    Наблюдение через doOnRequest и log

    Backpressure проще всего заметить через doOnRequest и log().

    Что важно понять:

  • запросы (request) часто приходят не по одному, а «пачками»
  • многие операторы делают предварительный запрос (например, чтобы заполнить внутренний буфер)
  • Ручной контроль спроса через BaseSubscriber

    Если вы хотите управлять спросом явно (полезно для обучения и диагностики), используйте BaseSubscriber.

    Здесь потребитель обрабатывает элементы партиями по 3. Это и есть «управление скоростью» на стороне клиента.

    Типовая проблема: быстрый источник и медленный потребитель

    Реактивный код часто подключается к источникам, которые способны производить данные очень быстро:

  • чтение из очереди/брокера
  • быстрые in-memory последовательности (range, fromIterable)
  • сетевой входящий поток событий
  • А downstream может быть медленным:

  • тяжёлая сериализация
  • запись на диск
  • обращение к блокирующему API
  • Если скорость производства (publisher rate) выше скорости обработки (consumer rate), то разница накапливается где-то:

  • либо в памяти (буфер/очередь)
  • либо теряется (drop)
  • либо приводит к ошибке (fail fast)
  • В Reactor это оформляется как явный выбор стратегии backpressure.

    Стратегии backpressure в Reactor

    Reactor предоставляет несколько распространённых стратегий, когда upstream не может или не хочет замедляться так, как нужно downstream.

    onBackpressureBuffer

    Идея: накапливать элементы в буфере, пока downstream не успевает.

    Практические последствия:

  • буфер защищает от потерь, но может съесть память
  • буферирование хорошо для кратковременных всплесков, но плохо для постоянного несоответствия скоростей
  • В реальном коде чаще используют ограниченный буфер и реакцию на переполнение. Подробности и варианты смотрите в Project Reactor Reference Guide.

    onBackpressureDrop

    Идея: если downstream не успевает, лишние элементы просто отбрасываются.

    Подходит, когда важнее «свежесть», чем полнота:

  • метрики
  • телеметрия
  • частые обновления состояния
  • onBackpressureLatest

    Идея: хранить только последний элемент, старые заменяются.

    Это типичный подход для UI или «витрины состояния»: вам не нужны все промежуточные значения, нужен актуальный снимок.

    onBackpressureError

    Идея: при перегрузке завершаться ошибкой.

    Это полезно, когда потеря данных недопустима, а буферировать бесконечно нельзя. Тогда лучше быстро «упасть» и включить внешнюю стратегию:

  • ретраи
  • DLQ
  • деградация функциональности
  • Управление размером запроса: limitRate и prefetch

    Даже когда upstream поддерживает backpressure корректно, вы можете захотеть контролировать, какими порциями downstream запрашивает данные.

    limitRate

    limitRate(k) заставляет downstream запрашивать элементы более мелкими партиями, а не «скопом».

    Когда это полезно:

  • вы видите, что по цепочке «летят» слишком большие запросы, и это раздувает буферы
  • у вас дорогой upstream, и лучше потреблять его ровнее
  • Важно: limitRate не «ускоряет» обработку. Он влияет на размер спроса, а значит на давление на upstream и объём внутренних буферов.

    prefetch как внутренняя оптимизация операторов

    Некоторые операторы (особенно связанные с асинхронностью и очередями) используют внутренний prefetch: они заранее запрашивают определённое количество элементов, чтобы работать эффективнее.

    Практический вывод:

  • если вы видите request(256) или похожие числа в логах, это часто нормальная оптимизация
  • если цепочка начинает буферизировать слишком много, ищите места с очередями и переключением потоков (например, publishOn)
  • Schedulers: где выполняется ваш код

    Reactive Streams и Reactor не обещают «автоматическую многопоточность». По умолчанию многие цепочки выполняются в том потоке, где произошла подписка или где upstream эмитит элементы.

    Scheduler в Reactor — это абстракция планирования задач (по сути, управляемый пул потоков). Вы явно говорите: где выполнять подписку, где выполнять обработку дальше по цепочке.

    Основные фабрики (см. Javadoc Schedulers):

  • Schedulers.immediate()
  • Schedulers.single()
  • Schedulers.parallel()
  • Schedulers.boundedElastic()
  • subscribeOn и publishOn: главное различие

    subscribeOn

    subscribeOn(scheduler) влияет на то, где выполняется подписка и upstream-часть цепочки.

    Практически:

  • полезно, чтобы перенести источник (или блокирующую обёртку источника) на другой Scheduler
  • Здесь fromCallable потенциально блокирующий, поэтому мы уводим его на boundedElastic.

    publishOn

    publishOn(scheduler) переключает поток выполнения для downstream-части цепочки, начиная с места, где он стоит.

    Практическое правило:

  • subscribeOn отвечает на вопрос где стартует и выполняется upstream
  • publishOn отвечает на вопрос где продолжает выполняться downstream после переключения
  • !Иллюстрация показывает области влияния subscribeOn и publishOn

    Как выбрать Scheduler под задачу

    parallel: CPU-bound задачи

    Schedulers.parallel() предназначен для задач, которые нагружают CPU и не блокируются (чистые вычисления).

  • число потоков обычно привязано к количеству ядер
  • если вы будете блокировать эти потоки, вы «заморозите» вычислительный пул
  • boundedElastic: блокирующие операции

    Schedulers.boundedElastic() предназначен для блокирующего I/O:

  • JDBC
  • файловые операции
  • вызовы блокирующих SDK
  • Он создаёт/переиспользует потоки, но делает это с ограничениями, чтобы не уйти в бесконечный рост.

    Типовой паттерн изоляции блокирующего кода:

    Ключевая мысль: вы не делаете блокирующий вызов «внутри реактивного потока на случайном потоке», вы явно назначаете ему место исполнения.

    single и immediate

  • Schedulers.single() полезен для строго последовательной обработки в одном выделенном потоке (например, если есть не потокобезопасный ресурс)
  • Schedulers.immediate() выполняет работу в текущем потоке (обычно это либо нейтральный выбор, либо инструмент для тестов/отладки)
  • Связь Scheduler и backpressure

    Переключение потоков часто означает появление очереди между upstream и downstream.

    Практические последствия:

  • publishOn обычно буферизирует элементы, чтобы разгрузить переключение между потоками
  • если downstream медленный, эта очередь может расти (поэтому важно помнить про backpressure и стратегии)
  • Полезный подход к отладке:

  • временно добавить doOnRequest, doOnNext, log()
  • посмотреть, где появляются большие запросы и где образуются буферы
  • Частые ошибки и безопасные правила

  • Блокировать в parallel: CPU-пул предназначен для неблокирующих вычислений. Блокировки ведут к деградации и таймаутам.
  • Безгранично буферизировать: onBackpressureBuffer() без ограничений может привести к OOM при постоянном несоответствии скоростей.
  • Считать, что flatMap автоматически “параллелит” всё правильно: flatMap даёт конкуренцию, но управлять потоками исполнения и блокировками всё равно нужно через Scheduler.
  • Использовать block() в середине цепочки: это ломает неблокирующую модель и часто приводит к взаимным ожиданиям и нехватке потоков.
  • Итоги

    Теперь у вас есть практическая связка из двух ключевых тем:

  • Backpressure: downstream управляет спросом через request(n), а при перегрузке вы выбираете стратегию (buffer, drop, latest, error) и можете контролировать размер спроса через limitRate.
  • Schedulers: вы явно задаёте, где выполняется код. subscribeOn влияет на upstream и подписку, publishOn переключает downstream. Для блокирующих операций используйте boundedElastic, для CPU-bound вычислений чаще подходит parallel.
  • Эти знания становятся особенно важными, когда вы начинаете подключать реальные I/O (HTTP, базы, брокеры) и строить надёжные конвейеры с предсказуемой нагрузкой.

    5. Ошибки и завершение: retry, timeout, fallbacks, ресурсы

    Ошибки и завершение: retry, timeout, fallbacks, ресурсы

    Контекст: ошибки как часть протокола Reactive Streams

    В предыдущих материалах вы уже видели, что реактивная цепочка живёт сигналами onNext, onComplete, onError, а также может быть остановлена отменой (cancel). В Reactor это означает:

  • ошибка это сигнал, а не исключение, которое обязательно «полетит вверх по стеку»
  • завершение всегда терминальное: после onError или onComplete данные больше не придут
  • отмена это тоже финал, и её важно учитывать при освобождении ресурсов
  • Эта статья про то, как строить устойчивые пайплайны в реальном мире:

  • делать fallback при ошибках и пустых результатах
  • ограничивать время ожидания через timeout
  • повторять операции безопасно через retry и retryWhen
  • корректно освобождать ресурсы (в том числе при отмене)
  • Полезные источники:

  • Project Reactor Reference Guide
  • Javadoc Retry
  • Javadoc Mono
  • Javadoc Flux
  • !Диаграмма: где в цепочке происходят timeout, retry, fallback и освобождение ресурсов

    Терминальные события: что значит «поток закончился»

    У Mono и Flux есть три способа закончиться:

  • успешное завершение: onComplete
  • завершение с ошибкой: onError
  • отмена: cancel
  • Практические последствия:

  • обработчики map, flatMap, filter не будут выполняться после терминального события
  • retry и retryWhen реагируют именно на onError и инициируют переподписку
  • timeout обычно приводит к ошибке и одновременно отменяет upstream
  • Чтобы увидеть финал независимо от причины, используйте doFinally.

    SignalType покажет причину: ON_COMPLETE, ON_ERROR, CANCEL.

    Fallback-стратегии: чем спасаться при проблемах

    Ошибка против пустого результата

    Важно не путать два разных случая:

  • пусто это onComplete без onNext (например, «пользователь не найден»)
  • ошибка это onError (например, «БД недоступна»)
  • Отсюда два разных класса операторов:

  • для пустых результатов: switchIfEmpty
  • для ошибок: onErrorReturn, onErrorResume, onErrorMap
  • onErrorReturn: вернуть константу

    Подходит для простого дефолта.

    Ограничение: вы не можете выполнить дополнительную реактивную логику, только вернуть значение.

    onErrorResume: переключиться на альтернативный Publisher

    Это основной инструмент для умных fallback:

  • вызвать другой сервис
  • сходить в кэш
  • вернуть разные ответы для разных типов ошибок
  • Частая практика: фильтровать по типам исключений, а остальные ошибки пропускать дальше.

    Здесь IllegalArgumentException не будет перехвачено и дойдёт до подписчика.

    switchIfEmpty: fallback для пустого результата

    switchIfEmpty не ловит ошибки. Он срабатывает только если значений не было.

    Практический смысл: различать доменную ветку «нет данных» и инфраструктурную ветку «ошибка».

    onErrorMap: нормализовать ошибки

    Когда вам нужно привести «зоопарк» исключений к понятной модели доменных ошибок.

    Timeout: ограничение времени ожидания

    timeout защищает от зависаний: если за заданное время не пришёл ожидаемый сигнал, Reactor завершит поток ошибкой и отменит upstream.

    Типовые случаи:

  • внешний HTTP вызов может зависнуть
  • брокер или БД могут «подвисать»
  • внутренняя операция может перестать выдавать элементы
  • timeout с ошибкой по умолчанию

    На практике вы часто увидите TimeoutException.

    timeout с fallback

    Можно заменить таймаут на альтернативный результат.

    Важно: такой fallback полезен, когда частичная деградация допустима, но бесконечное ожидание недопустимо.

    Retry: повторные попытки без циклов и блокировок

    retry и retryWhen используются только при onError. Они делают переподписку на источник, то есть запускают операцию заново.

    Ключевая идея: повторять нужно только то, что безопасно повторять.

    Примеры того, что обычно можно ретраить:

  • чтение по сети
  • запрос к БД на чтение
  • получение метаданных
  • Примеры того, что ретраить опасно без дополнительных гарантий:

  • командные операции с побочными эффектами (двойная оплата, двойная отправка письма)
  • запись без идемпотентности
  • retry: фиксированное число повторов

    Здесь общее число попыток будет до 3: первая попытка плюс 2 повтора.

    retryWhen и Retry: политика с задержкой и фильтрацией

    Для реальных систем чаще используют retryWhen с Retry, чтобы:

  • делать задержку между попытками
  • ограничивать число попыток
  • ретраить только определённые ошибки
  • Что здесь происходит:

  • backoff(3, ...) задаёт максимум 3 повтора и задержку, растущую от базовой
  • filter(...) запрещает ретраи для ошибок, которые считаются «неисправимыми»
  • onRetryExhaustedThrow(...) управляет финальным исключением, когда попытки закончились
  • Связка timeout + retry: «ограничиваем ожидание и пробуем снова»

    Очень частый практический паттерн:

    Смысл:

  • timeout превращает «вечное ожидание» в управляемую ошибку
  • retryWhen даёт несколько повторов
  • onErrorResume задаёт финальную деградацию, если всё плохо
  • Ресурсы: как гарантировать освобождение при complete, error и cancel

    Почему try/finally вокруг реактивной цепочки обычно не работает

    Реактивная цепочка ленивая и часто выполняется асинхронно. Код вида:

  • «взял ресурс»
  • «собрал Flux»
  • try/finally и «закрыл ресурс»
  • часто закроет ресурс слишком рано, потому что выполнение начнётся позже, при subscribe.

    Нужны реактивные операторы управления ресурсами.

    doFinally: универсальный хук на завершение

    doFinally срабатывает при любом финале: onComplete, onError, cancel.

    Здесь take(3) отменяет подписку после трёх элементов, и doFinally увидит CANCEL.

    usingWhen: корректная работа с асинхронными ресурсами

    usingWhen нужен, когда:

  • ресурс приобретается реактивно (Mono<Resource>)
  • освобождение тоже реактивное (Mono<Void>)
  • нужно закрыть ресурс корректно при успехе, ошибке и отмене
  • Типичный пример: реактивный клиент БД или любой ресурс с асинхронным закрытием.

    Смысл подписей:

  • второй аргумент это «как использовать ресурс»
  • третий аргумент это «как закрыть при успешном завершении»
  • четвёртый аргумент это «как закрыть при ошибке»
  • пятый аргумент это «как закрыть при отмене»
  • Именно это делает usingWhen одним из самых надёжных способов работы с ресурсами в Reactor.

    Практические правила устойчивости

  • Разделяйте пустой результат и ошибку: switchIfEmpty не заменяет onErrorResume.
  • Retry только для того, что безопасно повторять: особенно осторожно с командами и побочными эффектами.
  • Timeout ставьте там, где ожидание может быть бесконечным: особенно перед retry, чтобы повторы не зависали.
  • Освобождение ресурса проектируйте для cancel: в реактивном мире отмена это нормальный сценарий, а не «редкая ошибка».
  • Итоги

    Теперь у вас есть набор базовых инструментов завершения и устойчивости в Project Reactor:

  • onErrorReturn и onErrorResume для fallback при ошибках
  • switchIfEmpty для fallback при пустом результате
  • timeout для ограничения ожидания и предотвращения зависаний
  • retry и retryWhen(Retry...) для контролируемых повторных попыток
  • doFinally и usingWhen для гарантированного освобождения ресурсов при complete, error и cancel
  • Эти техники особенно важны в реальных приложениях, где реактивные цепочки взаимодействуют с сетью, базами данных и внешними сервисами, а значит ошибки, таймауты и отмены происходят регулярно.

    6. Тестирование реактивного кода: StepVerifier и виртуальное время

    Тестирование реактивного кода: StepVerifier и виртуальное время

    Зачем отдельный подход к тестированию Reactor

    Реактивные цепочки Mono и Flux из предыдущих тем работают через сигналы onNext/onError/onComplete, могут отменяться (cancel) и часто выполняются асинхронно с участием Scheduler. Поэтому в тестах возникают типичные проблемы:

  • Обычные проверки «вернулось ли значение» плохо подходят, потому что значения приходят со временем.
  • Если в цепочке есть delayElements, interval, timeout, retryWhen и похожие операторы, тесты начинают ждать реальное время и становятся медленными и нестабильными.
  • Нужно уметь проверять не только данные, но и протокол: порядок сигналов, завершение, ошибку, отмену и даже backpressure.
  • Для этого в экосистеме Reactor есть модуль reactor-test, а его основной инструмент — StepVerifier.

    Полезные ссылки:

  • Reactor Test Reference Guide
  • Javadoc StepVerifier
  • Javadoc VirtualTimeScheduler
  • Подключение reactor-test

    Gradle

    Maven

    > Важно: версии reactor-core и reactor-test должны совпадать по major/minor, чтобы избежать несовместимостей.

    StepVerifier: базовая модель

    StepVerifier позволяет описать ожидаемую последовательность сигналов от Publisher:

  • какие значения должны прийти (expectNext...)
  • как должен завершиться поток (verifyComplete)
  • какая ошибка должна прийти (verifyError...)
  • должен ли поток быть отменён (thenCancel)
  • что происходит со временем (в том числе через виртуальное время)
  • Базовый паттерн:

    Здесь мы проверяем сразу два аспекта:

  • данные приходят в ожидаемом порядке
  • поток корректно завершается onComplete
  • Проверка ошибок

    Ошибки в Reactor — это терминальный сигнал onError. В тестах важно проверять:

  • тип ошибки
  • сообщение (иногда)
  • что после ошибки поток не продолжает эмитить значения
  • Пример:

    Если вы хотите проверить сценарий «часть значений пришла, затем ошибка», это тоже делается пошагово:

    Проверка «пустого» результата

    Пустой Mono или Flux — это нормальный сценарий: onComplete без onNext. Проверка выглядит так:

    Если вы ожидаете, что значений не будет, но хотите явно это подчеркнуть:

    Ассерты на содержимое: expectNextMatches и consumeNextWith

    Когда важно проверить свойства элемента (а не точное равенство), используйте:

  • expectNextMatches(predicate)
  • consumeNextWith(consumer)
  • Пример:

    Проверка отмены (cancel)

    Отмена — нормальный способ завершения, особенно когда downstream использует take, или когда вы сами управляете жизненным циклом подписки.

    Пример: проверим, что бесконечный Flux можно отменить после нескольких элементов.

    verify() здесь уместен, потому что терминальный сигнал не onComplete/onError, а cancel.

    Backpressure в тестах: управление запросом

    В статье про backpressure вы видели, что downstream управляет спросом через request(n). StepVerifier умеет стартовать с нулевым спросом и запрашивать элементы вручную.

    Типовой сценарий: «ничего не должно прийти, пока не запросим».

    Что здесь проверяется:

  • подписка действительно установлена (expectSubscription)
  • пока спроса нет, onNext не приходит
  • после thenRequest(n) приходят ровно запрошенные элементы и поток завершается
  • Проверка side-effect логики: then, thenRunnable, verifyThenAssertThat

    Иногда нужно проверить побочные эффекты:

  • что был вызван метод
  • что изменилось состояние
  • что метрика инкрементнулась
  • Для этого есть шаги then(...) и друзья. Пример:

    Здесь побочный эффект проверяется после verifyComplete(). Во многих случаях это проще и чище, чем пытаться «встроить» все проверки внутрь шага.

    Виртуальное время: быстрые и детерминированные тесты времени

    Проблема: такие операторы как delayElements, interval, timeout, retryWhen с задержкой делают тесты медленными, если ждать реальное время.

    Решение: виртуальное время — тестовая модель времени, где вы «перематываете» часы вперёд.

    !Схема показывает, как thenAwait в StepVerifier перематывает виртуальное время и позволяет быстро проверить time-based операторы

    StepVerifier.withVirtualTime: основной инструмент

    Для виртуального времени обычно используют StepVerifier.withVirtualTime(...). Важный нюанс: нужно передать поставщика (Supplier) реактивной цепочки, чтобы Reactor мог корректно установить виртуальный планировщик на время подписки.

    Пример: тестируем Flux.interval, не ожидая реальные секунды.

    Как это читать:

  • withVirtualTime включает виртуальный Scheduler для операторов, завязанных на время
  • thenAwait(3s) мгновенно перематывает виртуальное время на 3 секунды
  • мы ожидаем три тика и onComplete (из-за take(3))
  • delayElements и проверка «до времени ничего не пришло»

    Здесь важны два шага:

  • expectNoEvent(...) проверяет, что никаких сигналов не было до нужного момента
  • thenAwait(...) перематывает время и делает события доступными
  • timeout и виртуальное время

    timeout завершает поток ошибкой, если за заданный промежуток не пришёл ожидаемый сигнал.

    retryWhen с задержкой: проверяем количество попыток без ожидания

    В статье про ошибки вы видели retryWhen(Retry...). Виртуальное время помогает тестировать backoff.

    Почему здесь thenAwait(2s):

  • первая попытка падает сразу
  • затем 1 секунда до первой повторной попытки
  • затем 1 секунда до второй повторной попытки
  • на третьей попытке приходит ok
  • Точная политика backoff может быть сложнее (джиттер, рост задержки), поэтому в реальных тестах часто проверяют:

  • факт успеха после перемотки достаточного времени
  • число попыток (через счётчик)
  • Частые ошибки в тестах Reactor

  • Использовать реальный сон (Thread.sleep) вместо виртуального времени
  • - тесты будут медленными и нестабильными
  • Делать StepVerifier.create(mono) для цепочки, зависящей от времени, и ожидать, что она "сама" станет виртуальной
  • - виртуальное время включается явно через withVirtualTime
  • Создавать Publisher заранее, а в withVirtualTime передавать не supplier
  • - правильнее: withVirtualTime(() -> yourPublisher)
  • Пытаться тестировать многопоточность через проверку имён потоков
  • - имена потоков могут меняться, и это слабый контракт; лучше тестировать поведение (сигналы, количество элементов, завершение, обработку ошибок)

    Мини-шпаргалка StepVerifier

    | Цель теста | Что обычно использовать | |---|---| | Проверить значения и завершение | expectNext... + verifyComplete() | | Проверить ошибку | verifyError(...), verifyErrorMessage(...), verifyErrorMatches(...) | | Проверить пустой результат | verifyComplete() (или expectNextCount(0)) | | Проверить отмену | thenCancel() + verify() | | Проверить backpressure | create(publisher, 0), thenRequest(n) | | Проверить time-based операторы быстро | withVirtualTime, thenAwait, expectNoEvent |

    Итоги

    StepVerifier — основной инструмент модульного тестирования Reactor, который проверяет реактивную цепочку как последовательность сигналов:

  • данные (expectNext, expectNextMatches, consumeNextWith)
  • завершение (verifyComplete) и ошибки (verifyError...)
  • отмену (thenCancel) и управление спросом (thenRequest)
  • А виртуальное время через StepVerifier.withVirtualTime делает тесты time-based операторов (interval, delayElements, timeout, retryWhen) быстрыми, детерминированными и удобными для поддержки.

    7. Интеграция и продакшен: Spring WebFlux, Context, hot/cold, отладка

    Интеграция и продакшен: Spring WebFlux, Context, hot/cold, отладка

    Как эта тема связана с предыдущими

    Ранее вы разобрали:

  • что Mono и Flux — это Publisher с ленивым выполнением и сигналами onNext/onError/onComplete
  • как строить конвейеры операторами (map, flatMap, merge, zip, buffer, window)
  • как работают backpressure и планировщики (subscribeOn, publishOn, Schedulers)
  • как проектировать устойчивость через timeout, retryWhen, fallback-операторы и корректное освобождение ресурсов
  • как тестировать реактивные цепочки через StepVerifier и виртуальное время
  • Теперь добавим практику, которая чаще всего отделяет учебные примеры от реальной эксплуатации:

  • интеграция Reactor со Spring WebFlux
  • корректная передача контекста запроса через Reactor Context
  • понимание cold и hot источников в реальном приложении
  • отладка и диагностика реактивных цепочек в продакшене
  • Полезные источники:

  • Spring Framework Reference: WebFlux
  • Project Reactor Reference Guide
  • BlockHound (GitHub)
  • reactor-tools (GitHub)
  • !Схема жизненного пути запроса в WebFlux и границы для блокирующих операций

    Spring WebFlux и Reactor: что реально происходит при запросе

    Spring WebFlux — это веб-стек Spring для неблокирующей обработки запросов, который в типичной конфигурации работает поверх Reactor (и часто поверх Reactor Netty).

    Ключевые практические выводы:

  • Ваш обработчик HTTP-запроса обычно не делает работу сразу, а возвращает Mono или Flux, который будет выполнен при подписке со стороны инфраструктуры WebFlux.
  • WebFlux ожидает, что обработка будет неблокирующей: если вы блокируете поток event loop (например, Thread.sleep, JDBC без адаптации), вы ухудшаете задержки и пропускную способность для множества запросов.
  • Планировщики из Reactor по-прежнему актуальны: блокирующий код нужно изолировать через subscribeOn(Schedulers.boundedElastic()).
  • Аннотационный контроллер: возвращаем Mono/Flux

    Здесь важно, что метод контроллера:

  • не должен вызывать block()
  • должен возвращать реактивный тип, а не готовое значение
  • Функциональный стиль: RouterFunction + HandlerFunction

    Функциональный стиль удобен, когда вы хотите явно управлять обработкой и составлять маршруты как конвейер.

    Семантика пустого результата и ошибки из прошлой статьи остаётся ключевой:

  • switchIfEmpty — для ветки “не найдено”
  • onErrorResume — для инфраструктурных проблем
  • WebClient: типичная интеграция наружу

    WebClient в WebFlux — это стандартный неблокирующий HTTP-клиент, который естественно возвращает Mono/Flux.

    Практические продакшен-правила:

  • ставьте timeout там, где ожидание не должно быть бесконечным
  • применяйте retryWhen только к операциям, которые безопасно повторять
  • используйте fallback-логику (onErrorResume) осознанно, чтобы деградация была контролируемой
  • Reactor Context: как передавать данные запроса без ThreadLocal

    Зачем нужен Context

    В синхронном коде часто используют ThreadLocal (например, MDC для логов) для хранения:

  • correlationId/requestId
  • данных аутентификации
  • “сквозных” метаданных трассировки
  • В реактивном мире поток выполнения может переключаться (publishOn, subscribeOn), а один и тот же запрос может продолжаться на другом потоке. Поэтому ThreadLocal перестаёт быть надёжной моделью.

    Reactor Context — это ассоциированное с подпиской хранилище “метаданных”, которое:

  • передаётся по реактивной цепочке вместе с сигналами
  • не зависит от конкретного потока
  • Базовые операции: contextWrite и deferContextual

  • contextWrite(...) добавляет/изменяет контекст для downstream
  • Mono.deferContextual(...) читает контекст в момент подписки
  • Важно:

  • контекст читается внутри deferContextual, а не через внешнюю переменную
  • контекст задаётся “снизу вверх” относительно места чтения, потому что contextWrite влияет на downstream
  • Context в Spring WebFlux: добавляем requestId в начале запроса

    В WebFlux это обычно делают в WebFilter, чтобы каждый запрос получил свой requestId в Reactor Context.

    После этого ваш сервисный код может читать requestId через deferContextual и использовать в логике/логировании.

    Cold и hot источники в продакшене

    Быстрое определение

  • Cold источник начинает производить данные заново для каждого подписчика.
  • Hot источник производит данные независимо от подписчиков, а подписчики “подключаются” к уже идущему потоку.
  • В WebFlux это важно, потому что:

  • один и тот же Publisher может быть подписан несколько раз (явно или неявно)
  • неправильный выбор cold/hot может привести к повторным HTTP-вызовам, повторной записи в БД или дублированию побочных эффектов
  • Пример проблемы: случайный повтор внешнего вызова

    Если remoteCallcold (типично для WebClient), то произойдёт два HTTP-запроса, потому что было две подписки.

    Как “разделить” результат между подписчиками

    В Reactor есть операторы, которые делают результат “разделяемым”, но с разной семантикой:

  • cache() делает источник эффективно горячим для последующих подписчиков, переиспользуя данные (и обычно кэшируя терминальный сигнал)
  • share() (по сути, publish().refCount(1)) превращает cold-источник в hot на время активных подписчиков, но без долговременного кэша
  • publish()/replay() дают более явный контроль через ConnectableFlux
  • Пример “один вызов — много читателей” через cache:

    Практическое правило:

  • cache удобен для кэширования результата дорогой операции
  • share удобен для “подключения” к потоку событий без желания хранить всё в памяти
  • Hot-источники событий: Sinks

    Для моделирования “живых” событий (уведомления, локальная шина событий, стрим из брокера) в Reactor часто используют Sinks.

    Ключевые моменты:

  • Sinks — это явный “вход” в реактивный мир, который обычно является hot
  • стратегия backpressure (onBackpressureBuffer/drop/latest) должна быть выбрана осознанно
  • Отладка и диагностика реактивных цепочек

    Отладка реактивного кода отличается от императивного: стек вызовов меньше помогает, потому что выполнение “размазано” по времени и операторам.

    Минимальный набор для диагностики

  • doOnNext, doOnError, doOnCancel, doFinally — точечные хуки
  • log() — быстрый просмотр сигналов Reactive Streams
  • checkpoint("name") — привязка читаемой метки к месту сборки цепочки
  • checkpoint особенно полезен, когда вы ловите ошибку и хотите видеть, в какой части конвейера она возникла.

    Расширенная диагностика: operator debug

    Reactor может добавлять “сборочные” трассировки операторов, чтобы исключения содержали больше информации о том, где именно была собрана цепочка.

  • Hooks.onOperatorDebug() — включает debug-хуки (может влиять на производительность)
  • ReactorDebugAgent из reactor-tools — вариант, который удобнее в ряде сценариев
  • Ссылка на инструмент:

  • reactor-tools (GitHub)
  • Практическое правило:

  • включайте такие режимы на стендах и в отладочных профилях
  • в продакшене используйте дозированно и осознанно из-за overhead
  • Поиск блокировок: BlockHound

    Одна из самых дорогих продакшен-проблем в WebFlux — случайная блокировка event loop (например, через JDBC, файловые операции, Thread.sleep).

    BlockHound (GitHub) позволяет обнаруживать блокирующие вызовы в “неправильных” потоках на ранних этапах.

    Типовая идея использования:

  • включить BlockHound в тестах или в dev-профиле
  • получить понятное исключение при блокирующей операции
  • вынести блокирующий код на Schedulers.boundedElastic() или заменить на неблокирующую альтернативу
  • Продакшен-паттерны для WebFlux + Reactor

    Не блокировать и правильно изолировать блокирующий код

    Смысл:

  • блокировка остаётся, но она больше не происходит в event loop
  • нагрузка на блокирующие операции регулируется пулом boundedElastic
  • Дублирование подписок: явно управлять повторным выполнением

    Если источник делает I/O или побочные эффекты, считайте его опасным при повторной подписке. Если повторная подписка возможна, решите явно:

  • повторять ли действие заново
  • кэшировать ли результат (cache)
  • раздавать ли общий стрим событий (share)
  • Таймауты и устойчивость на границах

    Типичный “продакшен-контур” вокруг внешнего вызова:

    Здесь важно, что retryWhen применяется только если повтор безопасен, а timeout предотвращает “вечное ожидание”.

    Итоги

    В продакшен-интеграции Reactor чаще всего “стреляет” не синтаксис операторов, а системные свойства:

  • Spring WebFlux запускает ваши цепочки при подписке инфраструктуры и ожидает неблокирующую работу
  • Reactor Context заменяет небезопасные ThreadLocal для данных запроса и трассировки
  • различие cold/hot определяет, будет ли I/O повторяться на каждую подписку, и как правильно делиться данными
  • для диагностики используйте doOn..., log(), checkpoint, а для поиска блокировок — BlockHound
  • Этот набор практик делает реактивные приложения предсказуемыми, наблюдаемыми и устойчивыми, когда они выходят за пределы учебных примеров и начинают жить под реальной нагрузкой.