Inquir Compute logoInquir Compute
Сценарий · Inquir Compute

Обработка CSV: async импорт по частям без HTTP-таймаута

HTTP-функция принимает файл или S3-ссылку, возвращает jobId, пайплайн читает CSV по частям, делает idempotent upsert и обновляет прогресс — без таймаута шлюза.

Last updated: 2026-04-20

Direct answer

Обработка CSV: async импорт по частям без HTTP-таймаута. HTTP-функция получает URL файла, возвращает `{ jobId }`. Пайплайн скачивает CSV, режет на чанки, обрабатывает каждый как отдельный шаг с повторами.

When it fits

  • Файл больше 1000 строк или весит > 1 МБ
  • Нужны идемпотентные повторы без дублей

Tradeoffs

  • Стриминг файла в HTTP-ответ держит соединение: обрыв сети = потеря прогресса. Нет возможности продолжить с места обрыва.
  • Без chunked обработки весь файл загружается в память — OOM при больших файлах.

Почему CSV-импорт не работает в HTTP-запросе

  • Files with 100k+ rows take 30–300 seconds to parse and insert—well past gateway timeouts
  • Inline processing holds the HTTP connection open—clients time out or retry, causing double imports
  • Memory pressure: loading a 50MB CSV into a serverless function in a single pass causes OOM on small runtimes

Парсинг и запись 100 000 строк занимают секунды или минуты. HTTP-таймаут шлюза обрывает работу на середине — данные частично записаны.

Где ломаются простые подходы

Стриминг файла в HTTP-ответ держит соединение: обрыв сети = потеря прогресса. Нет возможности продолжить с места обрыва.

Без chunked обработки весь файл загружается в память — OOM при больших файлах.

Chunked async CSV-импорт на Inquir

HTTP-функция получает URL файла, возвращает `{ jobId }`. Пайплайн скачивает CSV, режет на чанки, обрабатывает каждый как отдельный шаг с повторами.

Idempotency ключ — хэш строки или составной ключ; `ON CONFLICT DO NOTHING` гарантирует безопасный повтор.

Что нужно для надёжного CSV-импорта

Async split

HTTP принимает ссылку → jobId. Пайплайн скачивает и разбивает на чанки.

Idempotent upsert

Каждая строка — по уникальному ключу; повтор не создаёт дубль.

Chunked шаги

Каждый чанк — шаг пайплайна. Сбой → повтор только упавшего чанка.

Прогресс

Обновлять счётчик в БД после каждого чанка. Клиент опрашивает по jobId.

Как организовать CSV-импорт

1

Принять файл и вернуть jobId

HTTP-функция сохраняет файл или ссылку, вызывает `global.durable.startNew()`, возвращает `{ jobId }` за <100 мс.

2

Разбить на чанки в пайплайне

Первый шаг читает заголовки и разбивает на батчи по N строк.

3

Upsert и трекинг прогресса

Каждый шаг — upsert батча + обновление счётчика. Итог записывается после последнего шага.

Chunked CSV import pipeline

HTTP handler returns 202; pipeline step processes the file in batches. Idempotency key prevents duplicate rows on retry.

api/import-csv.mjs (HTTP handler)
export async function handler(event) {
  const { fileUrl, importId } = JSON.parse(event.body || '{}');
  if (!fileUrl || !importId) return { statusCode: 400, body: JSON.stringify({ error: 'fileUrl and importId required' }) };
  await db.imports.create({ id: importId, status: 'pending', fileUrl });
  await global.durable.startNew('process-csv', undefined, { fileUrl, importId });
  return { statusCode: 202, body: JSON.stringify({ importId, status: 'pending' }) };
}
jobs/process-csv.mjs (pipeline step)
import { parse } from 'csv-parse/sync';

export async function handler(event) {
  const { fileUrl, importId } = event.payload ?? {};
  const csvText = await fetch(fileUrl).then((r) => r.text());
  const rows = parse(csvText, { columns: true, skip_empty_lines: true });
  let inserted = 0;
  for (const batch of chunk(rows, 500)) {
    // Upsert by external_id — idempotent on retry
    await db.records.upsertBatch(batch.map((r) => ({ ...r, importId })));
    inserted += batch.length;
  }
  await db.imports.update(importId, { status: 'done', rowCount: rows.length });
  return { importId, rows: rows.length, inserted };
}

Когда нужен async CSV-импорт

Когда это уместно

  • Файл больше 1000 строк или весит > 1 МБ
  • Нужны идемпотентные повторы без дублей

Когда лучше не трогать

  • 10–20 строк без требований к idempotency — синхронная обработка проще

Вопросы и ответы

Как передать файл в пайплайн?

Загрузить в объектное хранилище (S3/GCS), передать URL в payload пайплайна. Большой файл не попадает в память функции целиком.

Что делать при ошибке в одной строке?

Пропустить строку с логом и продолжить чанк — или завершить чанк с ошибкой для повтора. Решение зависит от требований к данным.

Inquir Compute logoInquir Compute

Самый простой способ запускать AI-агентов и backend-джобы без инфраструктуры.

Связаться info@inquir.org

© 2025 Inquir Compute. Все права защищены.