Многошаговые serverless-пайплайны: повторы, ветвление и согласование человеком
Моделируйте длинную многошаговую работу как serverless-пайплайн из небольших шагов — у каждого свой таймаут, повторы и запись о запуске — с ветвлением, fan-out/fan-in, гейтами согласования человеком и запуском по cron.
Большая часть по-настоящему важной серверной работы — это не один вызов функции, а последовательность. Проверить заказ, списать деньги с карты, сформировать счёт, отправить его на почту. Где-то в середине есть ветвление («это крупный заказ?») и, часто, человек, который должен нажать Одобрить, прежде чем деньги уйдут. Рефлекс на любой serverless-платформе — запихнуть всё это в один обработчик. Это держится ровно до момента, когда падает третий шаг или всё целиком упирается в таймаут функции, и у вас остаётся 500 в логах и никакого понимания, какой именно из шести шагов на самом деле сломался.
Этот пост — про другую форму: моделировать такую работу как serverless-пайплайн — граф небольших шагов, у каждого из которых свой таймаут, свои повторы шагов (step retries) и своя запись о запуске. Мы разберём, как граф пайплайнов Inquir Compute обрабатывает многошаговую serverless-логику (multi-step serverless) от начала до конца: цепочки шагов, ветвление, параллелизм по схеме fan-out/fan-in, процесс согласования человеком (human approval workflow), который приостанавливается и возобновляется, запуск по cron и полную историю выполнения — плюс честный раздел о том, чего платформа намеренно не обещает.
От одной огромной функции к многошаговому serverless-пайплайну
Вот форма, которой стоит избегать, — одна функция, делающая всё:
// One function that does everything — the shape to avoid.
export async function handler(event) {
const order = await validateOrder(event.body); // can throw
const charge = await chargePayment(order); // can throw — money moves
const invoice = await renderInvoice(charge); // slow
await emailInvoice(invoice); // flaky third-party
return { ok: true };
}
Читается нормально. Ломается плохо. Четыре проблемы, и все структурные:
- Частичный сбой невидим. Если
emailInvoiceбросает исключение, вызывающая сторона видит500. Прошло ли списание? Скорее всего. Сформирован ли счёт? Неизвестно. Сбой и состояние оказываются в разных местах. - Повтор проигрывает всё заново. Перезапустите этот обработчик после сбоя отправки письма — и
validateOrderсchargePaymentвыполнятся снова. Для шага оплаты «повторить всю функцию» — это ровно тот способ, которым списывают деньги дважды. - Один таймаут на всю цепочку. У каждой функции есть бюджет времени — в Inquir Compute по умолчанию это 5 секунд, а жёсткий потолок — 15 минут. Четыре последовательных вызова делят этот единый бюджет. Медленное формирование счёта может вытолкнуть весь запрос за грань и убить те части, что уже отработали.
- Нет записи по каждому шагу. Один вызов, один поток логов, одна длительность. Вы не увидите, что шаг 2 занял 4,9 с, а шаг 4 падает уже три ночи подряд.
Решение — не более крупная функция, а более мелкие, соединённые вместе. Сделайте каждый шаг отдельной функцией и позвольте платформе выполнять их как пайплайн: у каждого шага — свой таймаут, своя политика повторов и своя сохранённая запись о запуске. Когда шаг отправки письма нестабилен, вы повторяете шаг отправки, а не списание.
Serverless-пайплайн — это граф небольших шагов
Пайплайн в Inquir Compute — это JSON-граф: список узлов (nodes) и рёбер (edges) между ними. Узел — это единица работы или управляющей логики. Виды, которые вы реально будете расставлять:
- Триггеры —
manualTrigger,httpTrigger,cronTrigger. Точка старта запуска. lambda— вызов одной из ваших развёрнутых функций. Здесь выполняется ваш код.if— ветвление по условию.parallel/merge— разветвить запуск на несколько ветвей и снова свести их вместе.set/mapper— придать форму данным между шагами (записать вvarsили преобразовать один payload в другой) без вызова функции.respond— собрать HTTP-ответ для пайплайна, запускаемого по запросу.humanGate— пауза для человека.
Рёбра несут поток, и каждое ребро выходит из своего источника через handle (выходную «ручку»): success, error, true, false, parallel, approve, reject или обычный default. Именно этот handle позволяет одному узлу направлять поток к разным следующим шагам в зависимости от того, что произошло.
Вот тот же заказ из предыдущего раздела, пересобранный как граф:
{
"schemaVersion": 1,
"nodes": [
{ "id": "in", "kind": "httpTrigger", "name": "Order received", "config": { "method": "POST" } },
{ "id": "valid", "kind": "lambda", "name": "Validate order",
"config": { "functionId": "validate-order", "onError": "failPipeline" } },
{ "id": "charge", "kind": "lambda", "name": "Charge payment",
"config": { "functionId": "charge-payment",
"retryPolicy": { "maxAttempts": 3, "backoffMs": 1000, "strategy": "exponential" },
"onError": "errorBranch" } },
{ "id": "email", "kind": "lambda", "name": "Email invoice",
"config": { "functionId": "email-invoice",
"inputMapping": { "to": "{{trigger.body.email}}", "charge": "{{steps.charge.output}}" },
"retryPolicy": { "maxAttempts": 5, "backoffMs": 500, "strategy": "exponential" },
"onError": "continue" } },
{ "id": "ok", "kind": "respond", "name": "200 OK",
"config": { "statusCode": 200, "outputMapping": { "orderId": "{{steps.charge.output.id}}" } } },
{ "id": "fail", "kind": "respond", "name": "402 Declined",
"config": { "statusCode": 402, "outputMapping": { "error": "payment_failed" } } }
],
"edges": [
{ "id": "e1", "sourceNodeId": "in", "targetNodeId": "valid" },
{ "id": "e2", "sourceNodeId": "valid", "targetNodeId": "charge", "sourceHandle": "success" },
{ "id": "e3", "sourceNodeId": "charge", "targetNodeId": "email", "sourceHandle": "success" },
{ "id": "e4", "sourceNodeId": "charge", "targetNodeId": "fail", "sourceHandle": "error" },
{ "id": "e5", "sourceNodeId": "email", "targetNodeId": "ok", "sourceHandle": "success" }
]
}
Несколько моментов, на которые стоит обратить внимание, — потому что в них и весь смысл:
- Каждый узел
lambda— это настоящий изолированный вызов: свой контейнер, своя память, свой таймаут (по умолчанию 5 с, переопределяется на узле черезtimeoutMsвплоть до потолка в 15 минут) и своя сохранённая запись о выполнении шага с полученным входом, произведённым выходом или ошибкой, номером попытки и длительностью. Шагиlambdaтакже ссылаются на нижележащий вызов функции, так что из представления пайплайна можно перейти прямо к «сырому» вызову. - Шаги передают данные через шаблоны. Узел читает значения выше по потоку через
{{steps.<id>.output}}, триггер — через{{trigger.body}}, переменные запуска — через{{vars.<name>}}, а свой прямой вход — через{{input}}. Вышеemailсобирает собственный payload черезinputMapping, а не просто получает выход предыдущего узла. - Маршрутизацию делают handles. Узел
chargeотправляет свой handlesuccessна шаг отправки письма, аerror— на ответ402. Один узел, два будущих.
Граф — это конфигурация; ваша логика по-прежнему живёт в обычных функциях. Задача пайплайна — соединить их, маршрутизировать между ними, повторять их и помнить, что произошло.
Повторы шагов и обработка ошибок без переигрывания всей задачи
У каждого узла lambda есть необязательная политика повторов:
"retryPolicy": { "maxAttempts": 3, "backoffMs": 1000, "strategy": "exponential" }
strategy — это fixed (ждать backoffMs между каждой попыткой) или exponential (backoffMs, затем 2×, 4×, …). Важное значение по умолчанию: повторов нет, пока вы их не попросите — ненастроенный шаг делает ровно одну попытку. Повторы — это согласие на уровне отдельного шага, а не общесистемное «всё выполняется по пять раз». Это сделано намеренно: вам нужны агрессивные повторы для капризного почтового API и ноль повторов для неидемпотентного списания.
Поскольку повторы привязаны к шагу, сбой перезапускает только упавший шаг. Уже прошедшее списание остаётся успешным; повторяется только шаг отправки письма, с задержкой, вплоть до его собственного maxAttempts. Каждая попытка записывается как отдельная запись, так что «попытка 2 из 3, сбой через 5 с» — это то, что вы реально можете увидеть.
Когда шаг исчерпывает попытки, что делать пайплайну, решает onError:
failPipeline(по умолчанию) — остановить запуск и пометить его какFAILED.continue— продолжить; шаги ниже по потоку получают{ error, __pipelineFromFailedStep }и могут отреагировать. Хорошо для работы «по возможности» («отправить в аналитический API, но никогда не заваливать из-за этого заказ»).errorBranch— направить поток через handleerrorузла, чтобы построить явный компенсирующий путь — как ветка402выше.
Из всего этого прямо следует одно правило: делайте обработчики шагов идемпотентными. Шаг может выполниться более одного раза — повторы, ручной перезапуск, errorBranch с возвратом на исправление. Платформа не обещает выполнение «ровно один раз», поэтому шаг списания должен нести ключ идемпотентности, а «insert» должен быть upsert’ом. Проектируйте под «это может выполниться дважды» — и повторы становятся безопасными, а не пугающими.
Ветвление и fan-out/fan-in в serverless-процессе
Линейные пайплайны — это меньшинство. Реальная логика serverless-процесса (serverless workflow) разветвляется.
Узел if вычисляет небольшое JavaScript-выражение в песочнице над входным payload’ом и берёт handle true или false:
{
"nodes": [
{ "id": "route", "kind": "if", "name": "High value?",
"config": { "expression": "input.amount > 1000" } },
{ "id": "review", "kind": "humanGate", "name": "Manual review", "config": { "mode": "approve" } },
{ "id": "auto", "kind": "lambda", "name": "Auto-approve", "config": { "functionId": "auto-approve" } }
],
"edges": [
{ "id": "e1", "sourceNodeId": "route", "targetNodeId": "review", "sourceHandle": "true" },
{ "id": "e2", "sourceNodeId": "route", "targetNodeId": "auto", "sourceHandle": "false" }
]
}
Выражение видит input, trigger, vars, steps и inputs из merge; оно оборачивается в Boolean(...) и выполняется в жёстком бюджете времени, при этом require/eval/import заблокированы. Здесь крупный заказ уходит на узел согласования человеком, а всё остальное одобряется автоматически — ветвление и согласование в четырёх узлах.
Для работы, которая делится, а не выбирает, применяется схема fan-out/fan-in: узел parallel разветвляет запуск на несколько ветвей, а узел merge сводит их обратно:
{
"nodes": [
{ "id": "fan", "kind": "parallel", "name": "Fan out", "config": {} },
{ "id": "geo", "kind": "lambda", "name": "Geo-risk score", "config": { "functionId": "geo-risk" } },
{ "id": "fraud", "kind": "lambda", "name": "Fraud score", "config": { "functionId": "fraud-score" } },
{ "id": "join", "kind": "merge", "name": "Combine scores",
"config": { "mode": "all",
"outputMapping": { "geo": "{{inputs.geo}}", "fraud": "{{inputs.fraud}}" } } }
],
"edges": [
{ "id": "e1", "sourceNodeId": "fan", "targetNodeId": "geo", "sourceHandle": "parallel" },
{ "id": "e2", "sourceNodeId": "fan", "targetNodeId": "fraud", "sourceHandle": "parallel" },
{ "id": "e3", "sourceNodeId": "geo", "targetNodeId": "join", "sourceHandle": "success" },
{ "id": "e4", "sourceNodeId": "fraud", "targetNodeId": "join", "sourceHandle": "success" }
]
}
Узел merge работает в режиме mode: "all" — он ждёт, пока прибудет каждая входящая ветвь, прежде чем что-либо произвести, а затем разрешает свой outputMapping, где {{inputs.<sourceNodeId>}} — это payload каждой ветви, ключом к которому служит породивший её узел. Ценность этой схемы структурная: каждая ветвь — это независимый под-путь со своими шагами, своими повторами и своими записями, а merge — единственная явная точка, где они снова сходятся. Оцените гео-риск и риск мошенничества независимо, а затем объедините оба в один объект-решение для следующего шага.
Процесс согласования человеком: гейты, которые ставят на паузу и возобновляют
Именно эту возможность недооценивают. Процесс согласования человеком (human approval workflow) на большинстве стеков означает очередь, webhook, флаг в базе и вторую точку входа для возобновления — обвязку, которую вы пишете и поддерживаете сами. Здесь это один узел.
У humanGate два режима. approve даёт два выходных handle — approve и reject. question задаёт человеку вопрос и возвращает отправленный им answer обратно в запуск через одно ребро default.
{
"nodes": [
{ "id": "gate", "kind": "humanGate", "name": "Approve refund",
"config": { "mode": "approve",
"promptTemplate": "Refund {{input.amount}} to {{trigger.body.email}}?" } },
{ "id": "pay", "kind": "lambda", "name": "Issue refund", "config": { "functionId": "issue-refund" } },
{ "id": "deny", "kind": "lambda", "name": "Notify denied", "config": { "functionId": "notify-denied" } }
],
"edges": [
{ "id": "e1", "sourceNodeId": "gate", "targetNodeId": "pay", "sourceHandle": "approve" },
{ "id": "e2", "sourceNodeId": "gate", "targetNodeId": "deny", "sourceHandle": "reject" }
]
}
Когда запуск доходит до гейта, он приостанавливается. Шаг записывается как WAITING, а в выполнение пишется контрольная точка — сериализуемый снимок запуска: payload триггера, переменные запуска, выход каждого завершённого к этому моменту шага, любые незавершённые буферы объединения merge, собранный к этому моменту ответ и указатель возобновления (какой гейт, какой шаг, какой режим). Ни один процесс не удерживается открытым; запуск устойчив «в покое» и может ждать столько, сколько нужно.
Есть одна деталь, которая показывает, что состояние действительно сохранено в контрольной точке, а не «замято». Если гейт находится внутри fan-out, ветви-соседи, которые ещё не выполнились к моменту приостановки, сохраняются вместе с контрольной точкой как отложенные рёбра (pending edges). Когда кто-то реагирует на гейт, пайплайн возобновляется из снимка, продолжает по подходящему handle (approve → выдать возврат, reject → уведомить об отказе) и выполняет те отложенные ветви-соседи — так что гейт в одной ветви никогда молча не бросает остальные, а ожидавший их узел merge ниже по потоку всё же может завершиться.
Собственный выход гейта объединяет входной payload с метаданными humanGate — mode, разрешённый prompt и decision (approve/reject) или отправленный answer. Шаги ниже по потоку читают его как любой другой выход шага, например {{steps.gate.output.humanGate.decision}}. Ключевая ментальная модель: согласование здесь — это узел в графе, а не блокирующий вызов в вашем коде. Вы не пишете функцию, которая ждёт события. Вы кладёте гейт на холст и направляете его handles на следующие шаги.
Запуск по cron и полная история выполнения каждого запуска
Пайплайну не обязательно нужен вызывающий. Узел cronTrigger запускает его по расписанию:
{
"schemaVersion": 1,
"nodes": [
{ "id": "cron", "kind": "cronTrigger", "name": "Nightly 02:00",
"config": { "cron": "0 2 * * *", "timezone": "UTC" } },
{ "id": "extract", "kind": "lambda", "name": "Extract",
"config": { "functionId": "nightly-extract",
"retryPolicy": { "maxAttempts": 3, "backoffMs": 2000, "strategy": "exponential" } } },
{ "id": "transform", "kind": "lambda", "name": "Transform", "config": { "functionId": "nightly-transform" } },
{ "id": "publish", "kind": "lambda", "name": "Publish", "config": { "functionId": "nightly-publish" } }
],
"edges": [
{ "id": "e1", "sourceNodeId": "cron", "targetNodeId": "extract" },
{ "id": "e2", "sourceNodeId": "extract", "targetNodeId": "transform", "sourceHandle": "success" },
{ "id": "e3", "sourceNodeId": "transform", "targetNodeId": "publish", "sourceHandle": "success" }
]
}
Узел принимает стандартное cron-выражение и необязательный timezone и проверяется при сохранении пайплайна. Две честные оговорки: планирование основано на опросе (polling), поэтому считайте гранулярность минутной в пределе и ожидайте, что срабатывание придётся чуть позже минуты по настенным часам — это не секундный планировщик реального времени. И единственный экземпляр планировщика выбирается лидером (через advisory-lock в Postgres), так что запланированный запуск срабатывает один раз, а не по разу на реплику.
Что бы ни запускало прогон — HTTP, cron или ручной старт, — вы получаете одно и то же: полную историю выполнения. У каждого запуска есть общий статус (PENDING, RUNNING, WAITING, SUCCEEDED, FAILED, TIMED_OUT или CANCELLED), длительность и дерево шагов. Каждый узел этого дерева несёт свой вход, свой выход или ошибку, число попыток и длительность; шаги lambda ссылаются на нижележащий вызов. Выполняющийся (RUNNING) пайплайн можно отменить, и отмена проверяется между шагами и между попытками повтора — так что запуск, сидящий в многоминутной задержке, реагирует быстро, а не досыпает её до конца. Это разница между «где-то ночью был 500» и «шаг transform, попытка 2, таймаут через 5 с на этом конкретном входе».
Ограничения, компромиссы и чем пайплайн не является
Точность важнее рекламного тона, поэтому вот честная граница:
- Шаги ограничены по времени. Соединяйте цепочкой, а не растягивайте. Каждый шаг выполняется под теми же лимитами функции: 5 с по умолчанию, максимум 15 минут. Долгая работа — это много шагов, каждый под потолком, а не одна безлимитная функция. Если задаче нужен час, её нужно разбить на возобновляемые шаги.
- Нет «ровно один раз» и нет гарантированного порядка. Шаг может выполниться более одного раза (повторы, перезапуски, циклы по ошибке), и платформа не даёт гарантии «ровно один раз» или строгого порядка. Идемпотентные обработчики не опциональны.
- Повторы — по согласию. По умолчанию — одна попытка. Добавляйте
retryPolicyк тем шагам, которые должны повторяться; оставляйте его выключенным там, где повтор опасен. - Cron основан на опросе и минутен. Годится для ночных, ежечасных задач и задач «раз в несколько минут». Не для субминутного или посекундного расписания, и срабатывания могут немного запаздывать.
- Fan-out — это структура, а не гарантия скорости.
parallel/mergeдают вам независимые, отдельно повторяемые ветви, сведённые в одной точке. Считайте это изоляцией и ясностью, а не обещанием N-кратного ускорения по настенным часам. - Холодные старты реальны. Тёплые пулы их сокращают, но первый вызов после простоя — холодный, а пайплайн — это много вызовов, так что часть шагов за это заплатит. Держите шаги лёгкими.
- Версии ограничены и закреплены. Пайплайн хранит до 50 опубликованных версий, а запуск выполняется на той версии, на которой стартовал.
Ничто из этого не является непреодолимым — это форма инструмента. Уважайте её, и пайплайны становятся скучно надёжными, а именно этого вы и хотите от инфраструктуры.
Итог
Если у единицы работы больше одного режима отказа, больше одного шага или человек в контуре — это не должно быть одной функцией. Смоделируйте это как serverless-пайплайн: небольшие шаги lambda, у каждого свой таймаут, свои повторы шагов и своя запись о запуске; if для ветвления; parallel + merge для работы по схеме fan-out/fan-in; humanGate, который приостанавливается, сохраняет контрольную точку и возобновляется, — для настоящего согласования человеком; и cronTrigger для запусков по расписанию. Держите каждый шаг небольшим, идемпотентным и в пределах таймаута, а граф пусть берёт на себя связывание, маршрутизацию, повторы и память. Это и есть оркестрация пайплайнов (pipeline orchestration) — она делает скучную устойчивую работу, чтобы ваши функции оставались простыми.