Многошаговые serverless-пайплайны: повторы, ветвление и согласование человеком

Моделируйте длинную многошаговую работу как serverless-пайплайн из небольших шагов — у каждого свой таймаут, повторы и запись о запуске — с ветвлением, fan-out/fan-in, гейтами согласования человеком и запуском по cron.

Многошаговые serverless-пайплайны: повторы, ветвление и согласование человеком

Большая часть по-настоящему важной серверной работы — это не один вызов функции, а последовательность. Проверить заказ, списать деньги с карты, сформировать счёт, отправить его на почту. Где-то в середине есть ветвление («это крупный заказ?») и, часто, человек, который должен нажать Одобрить, прежде чем деньги уйдут. Рефлекс на любой serverless-платформе — запихнуть всё это в один обработчик. Это держится ровно до момента, когда падает третий шаг или всё целиком упирается в таймаут функции, и у вас остаётся 500 в логах и никакого понимания, какой именно из шести шагов на самом деле сломался.

Этот пост — про другую форму: моделировать такую работу как serverless-пайплайн — граф небольших шагов, у каждого из которых свой таймаут, свои повторы шагов (step retries) и своя запись о запуске. Мы разберём, как граф пайплайнов Inquir Compute обрабатывает многошаговую serverless-логику (multi-step serverless) от начала до конца: цепочки шагов, ветвление, параллелизм по схеме fan-out/fan-in, процесс согласования человеком (human approval workflow), который приостанавливается и возобновляется, запуск по cron и полную историю выполнения — плюс честный раздел о том, чего платформа намеренно не обещает.

От одной огромной функции к многошаговому serverless-пайплайну

Вот форма, которой стоит избегать, — одна функция, делающая всё:

// One function that does everything — the shape to avoid.
export async function handler(event) {
  const order   = await validateOrder(event.body);  // can throw
  const charge  = await chargePayment(order);        // can throw — money moves
  const invoice = await renderInvoice(charge);       // slow
  await emailInvoice(invoice);                        // flaky third-party
  return { ok: true };
}

Читается нормально. Ломается плохо. Четыре проблемы, и все структурные:

  • Частичный сбой невидим. Если emailInvoice бросает исключение, вызывающая сторона видит 500. Прошло ли списание? Скорее всего. Сформирован ли счёт? Неизвестно. Сбой и состояние оказываются в разных местах.
  • Повтор проигрывает всё заново. Перезапустите этот обработчик после сбоя отправки письма — и validateOrder с chargePayment выполнятся снова. Для шага оплаты «повторить всю функцию» — это ровно тот способ, которым списывают деньги дважды.
  • Один таймаут на всю цепочку. У каждой функции есть бюджет времени — в Inquir Compute по умолчанию это 5 секунд, а жёсткий потолок — 15 минут. Четыре последовательных вызова делят этот единый бюджет. Медленное формирование счёта может вытолкнуть весь запрос за грань и убить те части, что уже отработали.
  • Нет записи по каждому шагу. Один вызов, один поток логов, одна длительность. Вы не увидите, что шаг 2 занял 4,9 с, а шаг 4 падает уже три ночи подряд.

Решение — не более крупная функция, а более мелкие, соединённые вместе. Сделайте каждый шаг отдельной функцией и позвольте платформе выполнять их как пайплайн: у каждого шага — свой таймаут, своя политика повторов и своя сохранённая запись о запуске. Когда шаг отправки письма нестабилен, вы повторяете шаг отправки, а не списание.

Serverless-пайплайн — это граф небольших шагов

Пайплайн в Inquir Compute — это JSON-граф: список узлов (nodes) и рёбер (edges) между ними. Узел — это единица работы или управляющей логики. Виды, которые вы реально будете расставлять:

  • ТриггерыmanualTrigger, httpTrigger, cronTrigger. Точка старта запуска.
  • lambda — вызов одной из ваших развёрнутых функций. Здесь выполняется ваш код.
  • if — ветвление по условию.
  • parallel / merge — разветвить запуск на несколько ветвей и снова свести их вместе.
  • set / mapper — придать форму данным между шагами (записать в vars или преобразовать один payload в другой) без вызова функции.
  • respond — собрать HTTP-ответ для пайплайна, запускаемого по запросу.
  • humanGate — пауза для человека.

Рёбра несут поток, и каждое ребро выходит из своего источника через handle (выходную «ручку»): success, error, true, false, parallel, approve, reject или обычный default. Именно этот handle позволяет одному узлу направлять поток к разным следующим шагам в зависимости от того, что произошло.

Вот тот же заказ из предыдущего раздела, пересобранный как граф:

{
  "schemaVersion": 1,
  "nodes": [
    { "id": "in",     "kind": "httpTrigger", "name": "Order received", "config": { "method": "POST" } },
    { "id": "valid",  "kind": "lambda", "name": "Validate order",
      "config": { "functionId": "validate-order", "onError": "failPipeline" } },
    { "id": "charge", "kind": "lambda", "name": "Charge payment",
      "config": { "functionId": "charge-payment",
                  "retryPolicy": { "maxAttempts": 3, "backoffMs": 1000, "strategy": "exponential" },
                  "onError": "errorBranch" } },
    { "id": "email",  "kind": "lambda", "name": "Email invoice",
      "config": { "functionId": "email-invoice",
                  "inputMapping": { "to": "{{trigger.body.email}}", "charge": "{{steps.charge.output}}" },
                  "retryPolicy": { "maxAttempts": 5, "backoffMs": 500, "strategy": "exponential" },
                  "onError": "continue" } },
    { "id": "ok",     "kind": "respond", "name": "200 OK",
      "config": { "statusCode": 200, "outputMapping": { "orderId": "{{steps.charge.output.id}}" } } },
    { "id": "fail",   "kind": "respond", "name": "402 Declined",
      "config": { "statusCode": 402, "outputMapping": { "error": "payment_failed" } } }
  ],
  "edges": [
    { "id": "e1", "sourceNodeId": "in",     "targetNodeId": "valid" },
    { "id": "e2", "sourceNodeId": "valid",  "targetNodeId": "charge", "sourceHandle": "success" },
    { "id": "e3", "sourceNodeId": "charge", "targetNodeId": "email",  "sourceHandle": "success" },
    { "id": "e4", "sourceNodeId": "charge", "targetNodeId": "fail",   "sourceHandle": "error" },
    { "id": "e5", "sourceNodeId": "email",  "targetNodeId": "ok",     "sourceHandle": "success" }
  ]
}

Несколько моментов, на которые стоит обратить внимание, — потому что в них и весь смысл:

  • Каждый узел lambda — это настоящий изолированный вызов: свой контейнер, своя память, свой таймаут (по умолчанию 5 с, переопределяется на узле через timeoutMs вплоть до потолка в 15 минут) и своя сохранённая запись о выполнении шага с полученным входом, произведённым выходом или ошибкой, номером попытки и длительностью. Шаги lambda также ссылаются на нижележащий вызов функции, так что из представления пайплайна можно перейти прямо к «сырому» вызову.
  • Шаги передают данные через шаблоны. Узел читает значения выше по потоку через {{steps.<id>.output}}, триггер — через {{trigger.body}}, переменные запуска — через {{vars.<name>}}, а свой прямой вход — через {{input}}. Выше email собирает собственный payload через inputMapping, а не просто получает выход предыдущего узла.
  • Маршрутизацию делают handles. Узел charge отправляет свой handle success на шаг отправки письма, а error — на ответ 402. Один узел, два будущих.

Граф — это конфигурация; ваша логика по-прежнему живёт в обычных функциях. Задача пайплайна — соединить их, маршрутизировать между ними, повторять их и помнить, что произошло.

Повторы шагов и обработка ошибок без переигрывания всей задачи

У каждого узла lambda есть необязательная политика повторов:

"retryPolicy": { "maxAttempts": 3, "backoffMs": 1000, "strategy": "exponential" }

strategy — это fixed (ждать backoffMs между каждой попыткой) или exponential (backoffMs, затем , , …). Важное значение по умолчанию: повторов нет, пока вы их не попросите — ненастроенный шаг делает ровно одну попытку. Повторы — это согласие на уровне отдельного шага, а не общесистемное «всё выполняется по пять раз». Это сделано намеренно: вам нужны агрессивные повторы для капризного почтового API и ноль повторов для неидемпотентного списания.

Поскольку повторы привязаны к шагу, сбой перезапускает только упавший шаг. Уже прошедшее списание остаётся успешным; повторяется только шаг отправки письма, с задержкой, вплоть до его собственного maxAttempts. Каждая попытка записывается как отдельная запись, так что «попытка 2 из 3, сбой через 5 с» — это то, что вы реально можете увидеть.

Когда шаг исчерпывает попытки, что делать пайплайну, решает onError:

  • failPipeline (по умолчанию) — остановить запуск и пометить его как FAILED.
  • continue — продолжить; шаги ниже по потоку получают { error, __pipelineFromFailedStep } и могут отреагировать. Хорошо для работы «по возможности» («отправить в аналитический API, но никогда не заваливать из-за этого заказ»).
  • errorBranch — направить поток через handle error узла, чтобы построить явный компенсирующий путь — как ветка 402 выше.

Из всего этого прямо следует одно правило: делайте обработчики шагов идемпотентными. Шаг может выполниться более одного раза — повторы, ручной перезапуск, errorBranch с возвратом на исправление. Платформа не обещает выполнение «ровно один раз», поэтому шаг списания должен нести ключ идемпотентности, а «insert» должен быть upsert’ом. Проектируйте под «это может выполниться дважды» — и повторы становятся безопасными, а не пугающими.

Ветвление и fan-out/fan-in в serverless-процессе

Линейные пайплайны — это меньшинство. Реальная логика serverless-процесса (serverless workflow) разветвляется.

Узел if вычисляет небольшое JavaScript-выражение в песочнице над входным payload’ом и берёт handle true или false:

{
  "nodes": [
    { "id": "route",  "kind": "if", "name": "High value?",
      "config": { "expression": "input.amount > 1000" } },
    { "id": "review", "kind": "humanGate", "name": "Manual review", "config": { "mode": "approve" } },
    { "id": "auto",   "kind": "lambda", "name": "Auto-approve", "config": { "functionId": "auto-approve" } }
  ],
  "edges": [
    { "id": "e1", "sourceNodeId": "route", "targetNodeId": "review", "sourceHandle": "true" },
    { "id": "e2", "sourceNodeId": "route", "targetNodeId": "auto",   "sourceHandle": "false" }
  ]
}

Выражение видит input, trigger, vars, steps и inputs из merge; оно оборачивается в Boolean(...) и выполняется в жёстком бюджете времени, при этом require/eval/import заблокированы. Здесь крупный заказ уходит на узел согласования человеком, а всё остальное одобряется автоматически — ветвление и согласование в четырёх узлах.

Для работы, которая делится, а не выбирает, применяется схема fan-out/fan-in: узел parallel разветвляет запуск на несколько ветвей, а узел merge сводит их обратно:

{
  "nodes": [
    { "id": "fan",   "kind": "parallel", "name": "Fan out", "config": {} },
    { "id": "geo",   "kind": "lambda", "name": "Geo-risk score", "config": { "functionId": "geo-risk" } },
    { "id": "fraud", "kind": "lambda", "name": "Fraud score",    "config": { "functionId": "fraud-score" } },
    { "id": "join",  "kind": "merge",  "name": "Combine scores",
      "config": { "mode": "all",
                  "outputMapping": { "geo": "{{inputs.geo}}", "fraud": "{{inputs.fraud}}" } } }
  ],
  "edges": [
    { "id": "e1", "sourceNodeId": "fan",   "targetNodeId": "geo",   "sourceHandle": "parallel" },
    { "id": "e2", "sourceNodeId": "fan",   "targetNodeId": "fraud", "sourceHandle": "parallel" },
    { "id": "e3", "sourceNodeId": "geo",   "targetNodeId": "join",  "sourceHandle": "success" },
    { "id": "e4", "sourceNodeId": "fraud", "targetNodeId": "join",  "sourceHandle": "success" }
  ]
}

Узел merge работает в режиме mode: "all" — он ждёт, пока прибудет каждая входящая ветвь, прежде чем что-либо произвести, а затем разрешает свой outputMapping, где {{inputs.<sourceNodeId>}} — это payload каждой ветви, ключом к которому служит породивший её узел. Ценность этой схемы структурная: каждая ветвь — это независимый под-путь со своими шагами, своими повторами и своими записями, а merge — единственная явная точка, где они снова сходятся. Оцените гео-риск и риск мошенничества независимо, а затем объедините оба в один объект-решение для следующего шага.

Процесс согласования человеком: гейты, которые ставят на паузу и возобновляют

Именно эту возможность недооценивают. Процесс согласования человеком (human approval workflow) на большинстве стеков означает очередь, webhook, флаг в базе и вторую точку входа для возобновления — обвязку, которую вы пишете и поддерживаете сами. Здесь это один узел.

У humanGate два режима. approve даёт два выходных handle — approve и reject. question задаёт человеку вопрос и возвращает отправленный им answer обратно в запуск через одно ребро default.

{
  "nodes": [
    { "id": "gate", "kind": "humanGate", "name": "Approve refund",
      "config": { "mode": "approve",
                  "promptTemplate": "Refund {{input.amount}} to {{trigger.body.email}}?" } },
    { "id": "pay",  "kind": "lambda", "name": "Issue refund",  "config": { "functionId": "issue-refund" } },
    { "id": "deny", "kind": "lambda", "name": "Notify denied", "config": { "functionId": "notify-denied" } }
  ],
  "edges": [
    { "id": "e1", "sourceNodeId": "gate", "targetNodeId": "pay",  "sourceHandle": "approve" },
    { "id": "e2", "sourceNodeId": "gate", "targetNodeId": "deny", "sourceHandle": "reject" }
  ]
}

Когда запуск доходит до гейта, он приостанавливается. Шаг записывается как WAITING, а в выполнение пишется контрольная точка — сериализуемый снимок запуска: payload триггера, переменные запуска, выход каждого завершённого к этому моменту шага, любые незавершённые буферы объединения merge, собранный к этому моменту ответ и указатель возобновления (какой гейт, какой шаг, какой режим). Ни один процесс не удерживается открытым; запуск устойчив «в покое» и может ждать столько, сколько нужно.

Есть одна деталь, которая показывает, что состояние действительно сохранено в контрольной точке, а не «замято». Если гейт находится внутри fan-out, ветви-соседи, которые ещё не выполнились к моменту приостановки, сохраняются вместе с контрольной точкой как отложенные рёбра (pending edges). Когда кто-то реагирует на гейт, пайплайн возобновляется из снимка, продолжает по подходящему handle (approve → выдать возврат, reject → уведомить об отказе) и выполняет те отложенные ветви-соседи — так что гейт в одной ветви никогда молча не бросает остальные, а ожидавший их узел merge ниже по потоку всё же может завершиться.

Собственный выход гейта объединяет входной payload с метаданными humanGatemode, разрешённый prompt и decision (approve/reject) или отправленный answer. Шаги ниже по потоку читают его как любой другой выход шага, например {{steps.gate.output.humanGate.decision}}. Ключевая ментальная модель: согласование здесь — это узел в графе, а не блокирующий вызов в вашем коде. Вы не пишете функцию, которая ждёт события. Вы кладёте гейт на холст и направляете его handles на следующие шаги.

Запуск по cron и полная история выполнения каждого запуска

Пайплайну не обязательно нужен вызывающий. Узел cronTrigger запускает его по расписанию:

{
  "schemaVersion": 1,
  "nodes": [
    { "id": "cron",      "kind": "cronTrigger", "name": "Nightly 02:00",
      "config": { "cron": "0 2 * * *", "timezone": "UTC" } },
    { "id": "extract",   "kind": "lambda", "name": "Extract",
      "config": { "functionId": "nightly-extract",
                  "retryPolicy": { "maxAttempts": 3, "backoffMs": 2000, "strategy": "exponential" } } },
    { "id": "transform", "kind": "lambda", "name": "Transform", "config": { "functionId": "nightly-transform" } },
    { "id": "publish",   "kind": "lambda", "name": "Publish",   "config": { "functionId": "nightly-publish" } }
  ],
  "edges": [
    { "id": "e1", "sourceNodeId": "cron",      "targetNodeId": "extract" },
    { "id": "e2", "sourceNodeId": "extract",   "targetNodeId": "transform", "sourceHandle": "success" },
    { "id": "e3", "sourceNodeId": "transform", "targetNodeId": "publish",   "sourceHandle": "success" }
  ]
}

Узел принимает стандартное cron-выражение и необязательный timezone и проверяется при сохранении пайплайна. Две честные оговорки: планирование основано на опросе (polling), поэтому считайте гранулярность минутной в пределе и ожидайте, что срабатывание придётся чуть позже минуты по настенным часам — это не секундный планировщик реального времени. И единственный экземпляр планировщика выбирается лидером (через advisory-lock в Postgres), так что запланированный запуск срабатывает один раз, а не по разу на реплику.

Что бы ни запускало прогон — HTTP, cron или ручной старт, — вы получаете одно и то же: полную историю выполнения. У каждого запуска есть общий статус (PENDING, RUNNING, WAITING, SUCCEEDED, FAILED, TIMED_OUT или CANCELLED), длительность и дерево шагов. Каждый узел этого дерева несёт свой вход, свой выход или ошибку, число попыток и длительность; шаги lambda ссылаются на нижележащий вызов. Выполняющийся (RUNNING) пайплайн можно отменить, и отмена проверяется между шагами и между попытками повтора — так что запуск, сидящий в многоминутной задержке, реагирует быстро, а не досыпает её до конца. Это разница между «где-то ночью был 500» и «шаг transform, попытка 2, таймаут через 5 с на этом конкретном входе».

Ограничения, компромиссы и чем пайплайн не является

Точность важнее рекламного тона, поэтому вот честная граница:

  • Шаги ограничены по времени. Соединяйте цепочкой, а не растягивайте. Каждый шаг выполняется под теми же лимитами функции: 5 с по умолчанию, максимум 15 минут. Долгая работа — это много шагов, каждый под потолком, а не одна безлимитная функция. Если задаче нужен час, её нужно разбить на возобновляемые шаги.
  • Нет «ровно один раз» и нет гарантированного порядка. Шаг может выполниться более одного раза (повторы, перезапуски, циклы по ошибке), и платформа не даёт гарантии «ровно один раз» или строгого порядка. Идемпотентные обработчики не опциональны.
  • Повторы — по согласию. По умолчанию — одна попытка. Добавляйте retryPolicy к тем шагам, которые должны повторяться; оставляйте его выключенным там, где повтор опасен.
  • Cron основан на опросе и минутен. Годится для ночных, ежечасных задач и задач «раз в несколько минут». Не для субминутного или посекундного расписания, и срабатывания могут немного запаздывать.
  • Fan-out — это структура, а не гарантия скорости. parallel/merge дают вам независимые, отдельно повторяемые ветви, сведённые в одной точке. Считайте это изоляцией и ясностью, а не обещанием N-кратного ускорения по настенным часам.
  • Холодные старты реальны. Тёплые пулы их сокращают, но первый вызов после простоя — холодный, а пайплайн — это много вызовов, так что часть шагов за это заплатит. Держите шаги лёгкими.
  • Версии ограничены и закреплены. Пайплайн хранит до 50 опубликованных версий, а запуск выполняется на той версии, на которой стартовал.

Ничто из этого не является непреодолимым — это форма инструмента. Уважайте её, и пайплайны становятся скучно надёжными, а именно этого вы и хотите от инфраструктуры.

Итог

Если у единицы работы больше одного режима отказа, больше одного шага или человек в контуре — это не должно быть одной функцией. Смоделируйте это как serverless-пайплайн: небольшие шаги lambda, у каждого свой таймаут, свои повторы шагов и своя запись о запуске; if для ветвления; parallel + merge для работы по схеме fan-out/fan-in; humanGate, который приостанавливается, сохраняет контрольную точку и возобновляется, — для настоящего согласования человеком; и cronTrigger для запусков по расписанию. Держите каждый шаг небольшим, идемпотентным и в пределах таймаута, а граф пусть берёт на себя связывание, маршрутизацию, повторы и память. Это и есть оркестрация пайплайнов (pipeline orchestration) — она делает скучную устойчивую работу, чтобы ваши функции оставались простыми.