Zettelkasten

배치 작업의 Promise 병렬화는 Pipeline보다 Stage-by-stage가 안전하다

·수정 1

요약

  • 여러 단계(fetch A → fetch B → save)를 거치는 비동기 작업은 두 가지 방식으로 병렬화할 수 있다: Stage-by-stage (단계 완료 후 다음 단계)와 Pipeline (아이템이 도착하는 즉시 다음 단계 시작).
  • Pipeline은 처리량(throughput)이 높지만 복잡도와 에러 추적 비용이 크다.
  • 배치 작업은 처리량보다 정합성·중복 방지·에러 추적이 우선이므로 Stage 방식이 거의 항상 정답이다.

본문

두 방식의 구조

배치 작업이 다음과 같은 3단계 파이프라인을 갖는다고 가정한다:

A. Transaction fetch (page 1..N)
  → B. StoreTransaction fetch (tx별)
  → C. Merge & DB 저장

Stage-by-stage: 각 단계를 완전히 끝낸 뒤 다음 단계로 넘어간다.

const allTxs = await fetchAllTransactionPages(url);          // Stage A 완료 대기
const allStoreTxs = await Promise.allSettled(                // Stage B 시작
  allTxs.map((tx) => fetchStoreTx(tx)),
);
await saveAll(merge(allTxs, allStoreTxs));                   // Stage C

Pipeline (item-level): 한 아이템이 stage A를 끝내면 즉시 stage B로 흘려보낸다.

await Promise.allSettled(
  allTxs.map(async (tx) => {
    const storeTx = await fetchStoreTx(tx);
    const merged = merge(tx, storeTx);
    await save(merged);  // 도착하는 순서대로 저장
  }),
);

더 정교한 pipeline은 stage A의 페이지를 stream으로 흘려 stage B/C를 동시 진행시킨다 (for await...of + AsyncIterator).

트레이드오프

항목 Stage-by-stage Pipeline
처리량 낮음 — stage 사이 idle 높음 — 모든 stage 동시 진행
지연시간 마지막 아이템까지 대기 빠른 건 먼저 끝남
복잡도 단순, 직선적 복잡 (backpressure, 동시성 제어)
에러 처리 stage 단위 부분 실패 명확 아이템 단위로 흩어짐 → 추적 어려움
중복 저장 방지 stage C에서 모아서 dedup 아이템마다 lookup → 느림 or race
DB 부하 배치 INSERT 가능 한 건씩 → 트랜잭션 비용 ↑
메모리 전 단계 결과 다 보유 streaming 가능 → 메모리 ↓

배치 작업이 Stage를 선호하는 이유

  1. 중복 방지 요구사항 — stage C에서 한 번에 dedup 후 INSERT가 단순하다. Pipeline은 매 아이템마다 SELECT 후 INSERT가 필요해 race condition 위험.
  2. 에러 추적"4001 port의 page 5 fetch 실패" 처럼 stage 단위로 위치 특정이 쉽다. Pipeline은 어느 stage에서 터졌는지 매번 분기 필요.
  3. 배치의 본질 — 주기 실행 작업은 처리량보다 정합성이 중요하다. Pipeline의 throughput 이점은 실시간 스트리밍에서 빛난다.
  4. DB 동시 쓰기 안전성 — 파일 기반 JSON DB나 SQLite는 동시 write에 약하다. Stage C에서 직렬 저장이 안전.

권장 절충안

Stage끼리는 직렬, stage 안에서는 동시성 제한(pool, concurrency ≈ 10)을 둔 Map 패턴으로 병렬화한다. Promise.all 대신 Promise.allSettled로 부분 실패를 허용하고, 실패한 아이템은 로깅한다.

async run() {
  for (const { url, retry } of targetUrls) {
    const txs = await this.fetchAllTransactions(url, retry);  // Stage A: 내부 pool
    const storeTxs = await this.fetchAllStoreTxs(txs);        // Stage B: 내부 pool
    await this.saveMerged(txs, storeTxs);                     // Stage C: 직렬 저장
  }
}

결정 기준 한 줄

  • 실시간 / SLA가 처리량 → Pipeline
  • 주기 배치 / 정합성·재실행 안전 → Stage

관련 노트

참고