Inquir Compute
Разделы

Пайплайны и графовые воркфлоу

Автоматизируйте cron, async jobs, DAG с fan-out, retries и webhook-triggered workflows — пошаговыми пайплайнами или графом; те же serverless functions, что и у HTTP-маршрутов.

Автоматизируйте многошаговые сценарии двумя способами: пошаговые пайплайны (упорядоченный список шагов-функций с триггерами) и визуальные графовые пайплайны (холст узлов и рёбер). Оба типа — в разделе Пайплайны в боковой панели: в списке доступны Новый пошаговый пайплайн и Новый визуальный пайплайн.

Визуальный редактор графа

Редактор графа рассчитан на HTTP-воркфлоу: триггеры ведут в Lambda и узлы ветвления; узел Map формирует JSON; узел Respond отдаёт HTTP-ответ клиенту.

  • В разделе Пайплайны откройте карточку визуального пайплайна и нажмите Открыть визуальный редактор (или создайте пайплайн через Новый визуальный пайплайн).
  • Перетаскивайте типы узлов из палитры слева: Триггеры (Manual, HTTP, Cron), Шаги (Lambda, If, Set, Map, Respond), группа Parallel (узлы Parallel и Merge) и Note. Соединяйте выходы и входы, чтобы задать порядок выполнения.
  • Клик по узлу открывает его настройки в боковой панели (привязка функции, выражения, JSON-шаблоны, коды ответа и т.д.).
  • Узлы Заметка только для документации на схеме: они не выполняются и не должны иметь рёбер — это стикеры на холсте.

В визуальном редакторе соединяют триггеры Manual, HTTP и Cron с узлами Lambda, If, Map, Respond и Human; на скриншоте ниже — холст в консоли.

Скриншот визуального редактора пайплайнов в консоли Inquir
Визуальный редактор пайплайнов в консоли.

Типы узлов графа (справочник)

Соединяйте нижние ручки (выходы) с верхней ручкой (входом) следующего узла. У триггеров, Parallel, Map, Set и Merge один выход по умолчанию. У Lambda два выхода: ok и err; у If — true и false. У Respond и Note нет исходящих ручек. В графе всегда должен быть хотя бы один триггер.

Ручной триггер

Запускает выполнение из UI пайплайна или через API execute. Дополнительных полей нет. Используйте как точку входа вместе с другими триггерами при необходимости.

HTTP-триггер

Запускает выполнение, когда публичный HTTP-вход пайплайна получает запрос с указанным методом и путём (например POST /hook). В типе узла хранятся настройки авторизации (без проверки, API-ключ или bearer) для этого входа; при добавлении из палитры подставляются значения по умолчанию.

Cron-триггер

Запуск по расписанию: пятизначное cron-выражение и часовой пояс (например UTC). Некорректный cron отклоняется при сохранении или публикации.

Lambda (функция)

Вызывает функцию воркспейса. Выберите функцию в панели свойств. При ошибке: failPipeline останавливает запуск; continue ведёт по ветке ok, но фиксирует сбой; errorBranch направляет на выход err — подключите его для сценариев восстановления. Поле переопределения входа оставьте пустым, чтобы передать выход предыдущего узла, тело триггера после триггера или структурированный объект ошибки, если предыдущий шаг упал и граф продолжился. JSON переопределения использует те же шаблоны плейсхолдеров, но поле input туда не подставляется — используйте пути trigger, vars и steps. В JSON графа поддерживается retryPolicy; в UI настраиваются функция, onError и переопределение входа.

If (ветвление)

Делит поток по булевому выражению. Подключите ручки true и false. Одно сравнение или один путь для «истинности» — без && и || (цепочка If или узел Map при необходимости). При вычислении input.* строится из выхода предыдущего шага: строка, похожая на JSON-объект или массив, разбирается как JSON (удобно для ответа Lambda). Значения из узлов Set поверхностно сливаются в тот же input (при совпадении ключей побеждают vars); можно по-прежнему писать vars.ИМЯ или steps.ID.output…. По умолчанию выход шага совпадает с входом (значение условия хранится отдельно как ifEvaluated, не смешивается с output). Чтобы передать булево в JSON дальше по графу, задайте форму выхода (outputMapping), например с {{ifResult}}. Форма выхода: {{input}}, {{ifResult}}, {{vars…}}, {{steps…}}.

Одно сравнение или один путь для «истинности» — без составных && и || (ещё один If или Map). Не оборачивайте выражение в {{ }} — такой синтаксис только в JSON-шаблонах. Вместо NODE_ID укажите реальный id узла из графа. Примеры:

  • Строка из upstream — input.tier == 'pro' (литерал в одинарных или двойных кавычках).
  • Число из upstream больше порога — input.score > 50
  • Строгое равенство с булевым — input.ok === true (справа также false или null).
  • Выход другого узла — steps.NODE_ID.output.status == 'done'
  • Переменная пайплайна из Set — vars.environment == 'production'
  • Число «больше или равно» — input.retryCount >= 3
  • Истинность без оператора: один путь, например input.recordId или steps.NODE_ID.output.items — ветка true, если значение truthy.
  • Неравенство — input.code != 404 или input.code !== 404 (== и != допускают приведение типов, === и !== — нет).

Parallel

Распараллеливание: один вход, несколько исходящих рёбер с ручки parallel. Каждая нижняя ветка выполняется независимо до Merge или другого объединения.

Merge

Собирает ветки после Parallel. Режим all (единственный поддерживаемый) ждёт завершения каждой входящей ветки, затем узел выполняется один раз. Без outputMapping: одна входящая ветка — проброс её полезной нагрузки; несколько веток — объект с картой inputs, ключи — id исходных узлов веток. С outputMapping используйте {{inputs}} для всей карты или {{inputs.idУзла}} для одной ветки; при одном входе также доступен {{input}} (как у Map).

Set

Записывает переменные пайплайна (vars) для последующих шаблонов. Задайте JSON-объект; в значениях можно использовать шаблоны. После разрешения они сливаются в vars. Выход узла объединяет эти значения с upstream-объектом, если он обычный объект.

Map

Строит JSON для следующего шага через outputMapping и подстановку шаблонов; upstream доступен как input (см. раздел шаблонов выше). Если outputMapping не задан, по умолчанию пробрасывается input.

Respond

Завершает HTTP-запуск: код ответа, необязательное тело по шаблонам и необязательные заголовки (JSON-объект; значения заголовков — те же плейсхолдеры, что и у тела). Оставьте тело ответа пустым (без outputMapping и без устаревшего bodyMapping), чтобы вернуть выход предыдущего шага без изменений — либо тело триггера, если Respond сразу после триггера. Пример: Content-Type application/json и X-Request-Id = {{vars.requestId}}. Исходящих рёбер нет. Для ответа в том же HTTP-запросе предпочтителен синхронный режим выполнения.

Human gate

Pauses the run until someone calls POST /api/pipeline-graph-executions/:executionId/resume. Approve mode requires two outgoing edges labeled approve and reject; the API body is { "decision": "approve" } or { "decision": "reject" }. Question mode uses one default edge; the body is { "answer": <any JSON> }. Optional promptTemplate is a JSON template ({{input}} is the upstream payload) stored on the waiting step for operators. The completed step output merges the upstream payload with humanGate: { mode, decision or answer, optional prompt }.

Заметка (Note)

Подпись на схеме: заголовок, текст, необязательная ссылка, размер. Не выполняется; не соединяйте рёбрами. При проверке достижимости узлы-заметки игнорируются.

Пошаговые пайплайны: форма JSON

Классические пайплайны задаются JSON (через API или редактор шагов): триггер, режим выполнения и упорядоченный список шагов. Пример:

pipeline.json
// Pipeline config (API / stored config): trigger + executionMode + steps
// Each step needs id, name, and functionId (lambda steps). Order uses "order" or list order in sequential mode.
{
  "trigger": { "type": "manual" },
  "executionMode": "sequential",
  "steps": [
    { "id": "validate", "name": "Validate", "functionId": "fn-1" },
    { "id": "process", "name": "Process", "functionId": "fn-2" },
    { "id": "notify", "name": "Notify", "functionId": "fn-3" }
  ]
}

Событие для каждого шага

В отличие от вызова через HTTP gateway, шаги пайплайна получают один JSON-объект (не формат API Gateway). Полезная нагрузка запуска дублируется: поля разворачиваются на верхний уровень и остаются в payload. Также есть pipeline, step, previousOutput, previousStepResults и stepResults.

event (per step)
// Lambda step handlers receive one object. Payload fields are spread at the top level as well as under "payload".
{
  "userId": "u1",
  "payload": {
    "userId": "u1"
  },
  "pipeline": {
    "id": "pl_abc",
    "name": "Example"
  },
  "step": {
    "id": "process",
    "name": "Process"
  },
  "previousOutput": null,
  "previousStepResults": {},
  "stepResults": {}
}

Типы триггеров

  • Вручную — запуск из UI или API.
  • HTTP/API-триггер — запуск через POST /pipelines/:id/execute с JSON-payload.
  • Расписание — cron-планировщик (например, 0 */6 * * *).

Ручное подтверждение

Добавляйте шаги подтверждения, которые ставят пайплайн на паузу до одобрения пользователем или истечения таймаута.

Графовые пайплайны: JSON-шаблоны

В визуальном редакторе графа часть узлов принимает JSON, в строковых значениях которого можно писать плейсхолдеры. Движок подставляет вместо каждого фрагмента с синтаксисом двойных фигурных скобок (примеры ниже) данные текущего запуска.

  • {{input}} — Полезная нагрузка с предыдущего узла (или тело триггера после триггера). Вложенные поля: {{input.field}}. Подставляется для Map, Respond (тело и значения заголовков), необязательного выходного JSON узла If и outputMapping узла Merge, если входящая ветка одна.
  • {{trigger.body}}, {{trigger.type}} и т.д. — данные триггера запуска (доступны во всех шаблонных полях этих узлов).
  • {{vars.name}} — значения, заданные узлами Set ранее в графе.
  • {{steps.NODE_ID.output}} — выход завершённого шага; вместо NODE_ID укажите id узла из графа. Работают вложенные пути (например {{steps.abc.output.score}}).
  • {{inputs}} — при разрешении outputMapping узла Merge это объект с ключами по id исходного узла каждой входящей ветки. Используйте {{inputs.idУзла}} для полезной нагрузки одной ветки (подставьте реальный id из графа).
  • {{ifResult}} — только при разрешении необязательного outputMapping узла If: булево значение условия. В последующих узлах то же значение: {{steps.ID_УЗЛА_IF.ifEvaluated}} (id узла If). В тексте самого условия недоступно.

Если строка состоит только из одного плейсхолдера, подставленное значение сохраняет JSON-тип (объект, массив, число). Внутри более длинной строки значения превращаются в текст.

По узлам:

  • Lambda — оставьте переопределение входа пустым, чтобы передать выход предыдущего узла (или payload триггера) без изменений. Если задаёте JSON вручную, используйте {{trigger…}}, {{vars…}} и {{steps…}}. Для переопределения Lambda {{input}} не используется.
  • Merge — поведение без outputMapping описано выше в разделе Merge. Произвольный outputMapping использует {{inputs…}}; при одном входящем ребре работает и {{input}}.
  • If — поле условия без {{ }}. Движок собирает input из выхода предыдущего шага (строки JSON разбираются) и сливает vars из Set. Используйте input.поле, steps.id.output…, vars…. Необязательная форма выхода — JSON как у Map и {{ifResult}}.
  • Map — поле outputMapping задаёт объект, который отдаёт узел; {{input}} — вход с предыдущего узла (если поле опущено в JSON, движок по умолчанию пробрасывает {{input}}).
  • Respond — необязательный outputMapping формирует тело HTTP; если нет ни outputMapping, ни устаревшего bodyMapping, тело ответа совпадает с входом с предыдущего узла. Значения в headersMapping подчиняются тем же правилам (например Content-Type и application/json).
  • Заметка — не участвует в выполнении и в шаблонах; движок её игнорирует. Не подключайте к заметкам рёбра.
graph-templates.json
// In Map, Respond (body/headers), and Lambda input override, values may be JSON
// with strings like "{{trigger.body.field}}" or "{{steps.lambdaNodeId.output}}".

// Map node — outputMapping (default shape is pass-through of upstream via {{input}})
{
  "data": "{{input}}",
  "userId": "{{input.userId}}",
  "enriched": "{{steps.analyze.output}}"
}

// Respond node — optional outputMapping; omit it to return the upstream body unchanged
{
  "result": "{{input}}",
  "meta": { "source": "pipeline" }
}

// Respond headers (same rules; header values are strings after resolution)
{
  "X-Request-Id": "{{steps.firstLambda.output.requestId}}"
}

// Lambda — input payload override only when you need a custom shape (otherwise leave empty for chain input)
{
  "item": "{{trigger.body}}",
  "prior": "{{steps.stepA.output}}"
}