1. Фундамент LangChain: декларативный язык LCEL и абстракции Runnable
Фундамент LangChain: декларативный язык LCEL и абстракции Runnable
При разработке ИИ-систем на чистом Python и базовых HTTP-клиентах код быстро превращается в монолитный клубок. Сначала вы пишете функцию для форматирования строки промпта. Затем добавляете вызов API локальной модели. Потом появляется необходимость парсить JSON-ответ, обрабатывать ошибки, реализовывать потоковую передачу токенов (SSE) и управлять асинхронным контекстом. Когда в эту архитектуру добавляется векторный поиск, логика ветвления и история диалогов, императивный код становится нечитаемым и хрупким. Возникает потребность в стандартизированном конвейере, где каждый компонент (промпт, модель, парсер, база данных) имеет единый интерфейс ввода-вывода.
!Харрисон Чейз, создатель LangChain
Решением этой архитектурной проблемы стал фреймворк LangChain. Его ядро построено вокруг концепции конвейеров (chains), которые собираются с помощью декларативного синтаксиса LCEL (LangChain Expression Language).
Протокол Runnable: единый стандарт взаимодействия
В основе всей архитектуры LangChain лежит абстрактный базовый класс Runnable. Любой компонент системы — будь то шаблон промпта, коннектор к векторной базе данных Qdrant, локальная модель Ollama или пользовательская Python-функция — наследуется от этого класса и реализует строгий контракт (протокол) вызовов.
Протокол Runnable предоставляет стандартизированный набор методов для выполнения компонента. Поскольку в предыдущих модулях мы спроектировали асинхронную архитектуру на базе FastAPI с использованием Event Loop, нас интересуют исключительно асинхронные версии этих методов:
ainvoke(input) — принимает один входной аргумент, асинхронно обрабатывает его и возвращает итоговый результат.astream(input) — возвращает асинхронный генератор, выдающий фрагменты результата по мере их готовности (критически важно для Time-to-First-Token).abatch(inputs) — принимает список входных данных и обрабатывает их конкурентно (используя пулы потоков или asyncio.gather под капотом).astream_events(input) — расширенный метод для распределенной трассировки, генерирующий события жизненного цикла (начало, конец, ошибка) для каждого вложенного шага.Преимущество протокола заключается в предсказуемости. Если вы напишете собственную функцию для фильтрации нецензурной лексики и обернете её в Runnable, она автоматически получит поддержку пакетной обработки (abatch) и потоковой передачи (astream), даже если вы реализовали только базовую логику трансформации текста.
LCEL: Декларативная сборка пайплайнов
LangChain Expression Language (LCEL) — это декларативный способ объединения объектов Runnable в единый конвейер. Визуально и концептуально LCEL заимствует идею конвейеров (pipes) из UNIX-систем, используя оператор побитового ИЛИ (|).
В Python оператор | перегружается через магический метод __or__. Когда интерпретатор встречает выражение A | B, где оба объекта наследуют Runnable, он не выполняет логическое ИЛИ. Вместо этого он создает новый составной объект RunnableSequence.
Рассмотрим базовый пример пайплайна:
В императивном стиле этот же код потребовал бы ручной передачи результатов от одной функции к другой: parser.invoke(model.invoke(prompt.invoke({"topic": ...}))). Декларативный подход скрывает эту маршрутизацию.
Строгая типизация и трансформация данных
Главная сложность при работе с LCEL — понимание того, как меняются типы данных при переходе от одного узла к другому. Оператор | требует, чтобы тип выходных данных левого компонента совпадал (или был совместим) с типом входных данных правого компонента.
!Трансформация типов данных в LCEL
Разберем трансформацию типов в пайплайне prompt | model | parser:
dict): Метод ainvoke получает словарь {"topic": "нейронных сетях"}.ChatPromptTemplate: Принимает словарь, подставляет значения в шаблон и возвращает объект PromptValue. Это специальная абстракция LangChain, которая может быть сконвертирована либо в строку (для старых Text LLM), либо в список объектов BaseMessage (для современных Chat Models).ChatOllama: Принимает PromptValue, извлекает из него сообщения, отправляет HTTP-запрос к локальному демону Ollama и возвращает объект AIMessage (содержащий сгенерированный текст и метаданные, такие как количество токенов).StrOutputParser: Принимает AIMessage, извлекает из него чистое строковое содержимое (атрибут content) и возвращает str.Если вы попытаетесь передать строку напрямую в модель, минуя шаблон промпта (например, chain = model | parser), пайплайн сработает, так как ChatModel умеет неявно оборачивать str в HumanMessage. Однако попытка передать AIMessage в другой промпт вызовет ошибку валидации типов Pydantic.
Управление потоком данных: RunnablePassthrough и RunnableParallel
В реальных RAG-системах линейные цепочки встречаются редко. Часто требуется передать исходный запрос пользователя сквозь несколько слоев обработки без изменений, либо выполнить несколько задач параллельно (например, извлечь документы из Qdrant и одновременно запросить историю диалога из PostgreSQL). Для этого используются специальные примитивы маршрутизации.
RunnablePassthrough: прозрачный прокси-объект
RunnablePassthrough позволяет передать входные данные следующему шагу без изменений, опционально добавив к ним новые ключи. Это критически важно при формировании контекста для RAG.
Допустим, у нас есть функция retrieve_documents, которая принимает строку и возвращает текст. Шаблону промпта нужен словарь с двумя ключами: context и question.
Если на вход chain.ainvoke("Что такое квантование?") поступает строка, RunnablePassthrough просто копирует её в ключ question. Одновременно эта же строка передается в функцию retrieve_documents, результат которой записывается в ключ context. На выходе из первого блока формируется словарь {"context": "...", "question": "Что такое квантование?"}, который идеально подходит под сигнатуру prompt.
Ещё более мощный инструмент — метод RunnablePassthrough.assign(). Он принимает на вход словарь, сохраняет все его существующие ключи и добавляет новые, вычисленные на основе переданных функций.
RunnableParallel: конкурентное выполнение
RunnableParallel (часто инициализируется неявно при передаче словаря в LCEL) позволяет расщепить поток выполнения на несколько независимых ветвей. Все ветви запускаются конкурентно в Event Loop, а их результаты собираются в единый словарь.
!Параллельное выполнение ветвей графа
С точки зрения производительности, время выполнения параллельного блока равно времени выполнения самой медленной ветви плюс небольшие накладные расходы на синхронизацию:
Где — время выполнения отдельной ветви, а — накладные расходы Event Loop на переключение контекста корутин.
Если вам нужно сделать три независимых запроса к LLM (например, для оценки тональности, извлечения сущностей и перевода), обертывание их в RunnableParallel сократит общее время ожидания в три раза по сравнению с последовательным вызовом.
Пользовательская логика: RunnableLambda
Фреймворк не ограничивает разработчика встроенными классами. Любую синхронную или асинхронную Python-функцию можно встроить в конвейер LCEL с помощью RunnableLambda (или декоратора @chain).
sql" in text: return text.split("")[0].strip() return text
custom_parser = RunnableLambda(extract_sql_query)
sql_chain = prompt | model | custom_parser python from fastapi import APIRouter from fastapi.responses import StreamingResponse
router = APIRouter()
@router.post("/chat/stream") async def stream_chat_response(request: ChatRequest): chain = prompt | model | parser
async def generate(): # astream возвращает асинхронный генератор async for chunk in chain.astream({"question": request.query}): # Форматируем под стандарт Server-Sent Events (SSE) yield f"data: {chunk}\n\n"
return StreamingResponse(generate(), media_type="text/event-stream") python
Основная модель (быстрая, но может упасть при сложном запросе)
primary_model = ChatOllama(model="llama3:8b")Резервная модель (более стабильная, с большим окном контекста)
backup_model = ChatOllama(model="mixtral:8x7b")Пайплайн с защитой от сбоев
robust_model = primary_model.with_fallbacks([backup_model])chain = prompt | robust_model | parser
``
Если primary_model выбрасывает исключение на этапе ainvoke, LangChain перехватывает его и автоматически направляет тот же PromptValue в backup_model`. Этот механизм работает не только для моделей, но и для целых цепочек. Вы можете задать fallback для парсера, если LLM сгенерировала невалидный JSON, направив ответ в специальную цепочку исправления ошибок (Output-Fixing Parser).
Переход от императивного вызова функций к декларативному синтаксису LCEL требует изменения мышления. Вы перестаете писать код, который выполняет действия над данными, и начинаете проектировать граф трансформаций, по которому данные текут самостоятельно. Эта абстракция становится критически важной на следующих этапах, когда мы начнем встраивать в эти конвейеры векторные ретриверы Qdrant для извлечения контекста и инструменты (Tools) для взаимодействия агентов с внешним миром.