Обработка CSV: async импорт по частям без HTTP-таймаута
HTTP-функция принимает файл или S3-ссылку, возвращает jobId, пайплайн читает CSV по частям, делает idempotent upsert и обновляет прогресс — без таймаута шлюза.
Last updated: 2026-04-20
Answer first
Direct answer
Обработка CSV: async импорт по частям без HTTP-таймаута. HTTP-функция получает URL файла, возвращает `{ jobId }`. Пайплайн скачивает CSV, режет на чанки, обрабатывает каждый как отдельный шаг с повторами.
When it fits
- Файл больше 1000 строк или весит > 1 МБ
- Нужны идемпотентные повторы без дублей
Tradeoffs
- Стриминг файла в HTTP-ответ держит соединение: обрыв сети = потеря прогресса. Нет возможности продолжить с места обрыва.
- Без chunked обработки весь файл загружается в память — OOM при больших файлах.
Нагрузка и где ломается
Почему CSV-импорт не работает в HTTP-запросе
- Files with 100k+ rows take 30–300 seconds to parse and insert—well past gateway timeouts
- Inline processing holds the HTTP connection open—clients time out or retry, causing double imports
- Memory pressure: loading a 50MB CSV into a serverless function in a single pass causes OOM on small runtimes
Парсинг и запись 100 000 строк занимают секунды или минуты. HTTP-таймаут шлюза обрывает работу на середине — данные частично записаны.
Где костыли не спасают
Где ломаются простые подходы
Стриминг файла в HTTP-ответ держит соединение: обрыв сети = потеря прогресса. Нет возможности продолжить с места обрыва.
Без chunked обработки весь файл загружается в память — OOM при больших файлах.
Как помогает Inquir
Chunked async CSV-импорт на Inquir
HTTP-функция получает URL файла, возвращает `{ jobId }`. Пайплайн скачивает CSV, режет на чанки, обрабатывает каждый как отдельный шаг с повторами.
Idempotency ключ — хэш строки или составной ключ; `ON CONFLICT DO NOTHING` гарантирует безопасный повтор.
Что получаете
Что нужно для надёжного CSV-импорта
Async split
HTTP принимает ссылку → jobId. Пайплайн скачивает и разбивает на чанки.
Idempotent upsert
Каждая строка — по уникальному ключу; повтор не создаёт дубль.
Chunked шаги
Каждый чанк — шаг пайплайна. Сбой → повтор только упавшего чанка.
Прогресс
Обновлять счётчик в БД после каждого чанка. Клиент опрашивает по jobId.
Что делать дальше
Как организовать CSV-импорт
Принять файл и вернуть jobId
HTTP-функция сохраняет файл или ссылку, вызывает `global.durable.startNew()`, возвращает `{ jobId }` за <100 мс.
Разбить на чанки в пайплайне
Первый шаг читает заголовки и разбивает на батчи по N строк.
Upsert и трекинг прогресса
Каждый шаг — upsert батча + обновление счётчика. Итог записывается после последнего шага.
Пример кода
Chunked CSV import pipeline
HTTP handler returns 202; pipeline step processes the file in batches. Idempotency key prevents duplicate rows on retry.
export async function handler(event) { const { fileUrl, importId } = JSON.parse(event.body || '{}'); if (!fileUrl || !importId) return { statusCode: 400, body: JSON.stringify({ error: 'fileUrl and importId required' }) }; await db.imports.create({ id: importId, status: 'pending', fileUrl }); await global.durable.startNew('process-csv', undefined, { fileUrl, importId }); return { statusCode: 202, body: JSON.stringify({ importId, status: 'pending' }) }; }
import { parse } from 'csv-parse/sync'; export async function handler(event) { const { fileUrl, importId } = event.payload ?? {}; const csvText = await fetch(fileUrl).then((r) => r.text()); const rows = parse(csvText, { columns: true, skip_empty_lines: true }); let inserted = 0; for (const batch of chunk(rows, 500)) { // Upsert by external_id — idempotent on retry await db.records.upsertBatch(batch.map((r) => ({ ...r, importId }))); inserted += batch.length; } await db.imports.update(importId, { status: 'done', rowCount: rows.length }); return { importId, rows: rows.length, inserted }; }
Когда подходит
Когда нужен async CSV-импорт
Когда это уместно
- Файл больше 1000 строк или весит > 1 МБ
- Нужны идемпотентные повторы без дублей
Когда лучше не трогать
- 10–20 строк без требований к idempotency — синхронная обработка проще
FAQ
Вопросы и ответы
Как передать файл в пайплайн?
Загрузить в объектное хранилище (S3/GCS), передать URL в payload пайплайна. Большой файл не попадает в память функции целиком.
Что делать при ошибке в одной строке?
Пропустить строку с логом и продолжить чанк — или завершить чанк с ошибкой для повтора. Решение зависит от требований к данным.