Обработка CSV: async импорт по частям без HTTP-таймаута
HTTP-функция принимает файл или S3-ссылку, возвращает jobId, пайплайн читает CSV по частям, делает idempotent upsert и обновляет прогресс — без таймаута шлюза.
Обновлено: 2026-04-20
Кратко
Суть ответа
Обработка CSV: async импорт по частям без HTTP-таймаута. HTTP-функция получает URL файла, возвращает `{ jobId }`. Пайплайн скачивает CSV, режет на чанки, обрабатывает каждый как отдельный шаг с повторами.
Когда подходит и когда нет
- Файл больше 1000 строк или весит > 1 МБ
- Нужны идемпотентные повторы без дублей
На что обратить внимание
- Стриминг файла в HTTP-ответ держит соединение: обрыв сети = потеря прогресса. Нет возможности продолжить с места обрыва.
- Без chunked обработки весь файл загружается в память — OOM при больших файлах.
Ситуация: нагрузка и где обычно ломается
Почему CSV-импорт не работает в HTTP-запросе
- Files with 100k+ rows take 30–300 seconds to parse and insert—well past gateway timeouts
- Inline processing holds the HTTP connection open—clients time out or retry, causing double imports
- Memory pressure: loading a 50MB CSV into a serverless function in a single pass causes OOM on small runtimes
Парсинг и запись 100 000 строк занимают секунды или минуты. HTTP-таймаут шлюза обрывает работу на середине — данные частично записаны.
Когда упрощённых рецептов мало
Почему простая загрузка большого CSV в одном HTTP-запросе обрывается по таймауту шлюза, памяти и повторным прогонам
Стриминг файла в HTTP-ответ держит соединение: обрыв сети = потеря прогресса. Нет возможности продолжить с места обрыва.
Без chunked обработки весь файл загружается в память — OOM при больших файлах.
Как Inquir помогает в этом сценарии
Как на Inquir устроить асинхронный импорт CSV по частям (чанками) с идемпотентной записью и отслеживанием jobId
HTTP-функция получает URL файла, возвращает `{ jobId }`. Пайплайн скачивает CSV, режет на чанки, обрабатывает каждый как отдельный шаг с повторами.
Idempotency ключ — хэш строки или составной ключ; `ON CONFLICT DO NOTHING` гарантирует безопасный повтор.
Что вы получаете на платформе
Что нужно для надёжного CSV-импорта
Асинхронный приём и постановка в пайплайн
HTTP принимает ссылку на файл → возвращает jobId. Пайплайн скачивает CSV и режет на чанки.
Идемпотентная вставка или обновление (UPSERT)
Каждая строка — по уникальному ключу; повтор не создаёт дубль.
Обработка файла по частям в шагах пайплайна
Каждый фрагмент файла — отдельный шаг пайплайна. Сбой → повтор только упавшего фрагмента.
Прогресс
Обновлять счётчик в БД после каждого чанка. Клиент опрашивает по jobId.
Что сделать дальше, по шагам
Как организовать CSV-импорт
Принять файл и вернуть jobId
HTTP-функция сохраняет файл или ссылку, вызывает `global.durable.startNew()`, возвращает `{ jobId }` за <100 мс.
Разбить на чанки в пайплайне
Первый шаг читает заголовки и разбивает на батчи по N строк.
Upsert и трекинг прогресса
Каждый шаг — upsert батча + обновление счётчика. Итог записывается после последнего шага.
Пример кода
Chunked CSV import pipeline
HTTP handler returns 202; pipeline step processes the file in batches. Idempotency key prevents duplicate rows on retry.
export async function handler(event) { const { fileUrl, importId } = JSON.parse(event.body || '{}'); if (!fileUrl || !importId) return { statusCode: 400, body: JSON.stringify({ error: 'fileUrl and importId required' }) }; await db.imports.create({ id: importId, status: 'pending', fileUrl }); await global.durable.startNew('process-csv', undefined, { fileUrl, importId }); return { statusCode: 202, body: JSON.stringify({ importId, status: 'pending' }) }; }
import { parse } from 'csv-parse/sync'; export async function handler(event) { const { fileUrl, importId } = event.payload ?? {}; const csvText = await fetch(fileUrl).then((r) => r.text()); const rows = parse(csvText, { columns: true, skip_empty_lines: true }); let inserted = 0; for (const batch of chunk(rows, 500)) { // Upsert by external_id — idempotent on retry await db.records.upsertBatch(batch.map((r) => ({ ...r, importId }))); inserted += batch.length; } await db.imports.update(importId, { status: 'done', rowCount: rows.length }); return { importId, rows: rows.length, inserted }; }
Когда подходит и когда нет
Когда нужен async CSV-импорт
Когда это уместно
- Файл больше 1000 строк или весит > 1 МБ
- Нужны идемпотентные повторы без дублей
Когда лучше не трогать
- 10–20 строк без требований к idempotency — синхронная обработка проще
Вопросы и ответы
Вопросы и ответы
Как передать файл в пайплайн?
Загрузить в объектное хранилище (S3/GCS), передать URL в payload пайплайна. Большой файл не попадает в память функции целиком.
Что делать при ошибке в одной строке?
Пропустить строку с логом и продолжить чанк — или завершить чанк с ошибкой для повтора. Решение зависит от требований к данным.