동시성 프로그래밍 모델 (Decomposition Patterns)
참고: Grokking Concurrency (Kirill Bobrov, Manning)
개요
동시성 프로그래밍에서 **분해(Decomposition)**는 큰 작업을 병렬 처리 가능한 단위로 나누는 핵심 기법이다. 어떻게 나누느냐에 따라 성능, 복잡도, 확장성이 결정된다.
핵심 질문
"작업을 어떻게 나눌 것인가?"
↓
┌─────────────────────────────────────────────────────┐
│ Task 기준 vs Data 기준 │
│ (작업 단계로 분할) (데이터 chunk로 분할) │
└─────────────────────────────────────────────────────┘
↓ ↓
파이프라인 Chunk 분할
Fork/Join Map / Map-Reduce
Task 분해 vs Data 분해 예시
같은 작업(1000장 이미지 처리)을 두 방식으로 비교:
Task 분해 (파이프라인) - "작업 단계"를 나눔
이미지1: [읽기] → [필터] → [저장]
이미지2: [읽기] → [필터] → [저장]
이미지3: [읽기] → [필터] → [저장]
↑ ↑ ↑
코어1 코어2 코어3
(읽기만) (필터만) (저장만)
→ 각 코어가 다른 작업을 담당
Data 분해 (Chunk 분할) - "데이터"를 나눔
[이미지 1~333] → [읽기→필터→저장] → 완료 ← 코어1
[이미지 334~666] → [읽기→필터→저장] → 완료 ← 코어2
[이미지 667~1000]→ [읽기→필터→저장] → 완료 ← 코어3
→ 각 코어가 같은 작업을 다른 데이터에 수행
실생활 비유: 햄버거 가게
| 방식 | 비유 |
|---|---|
| Task 분해 | 직원A는 패티만, B는 야채만, C는 조립만 (전문화) |
| Data 분해 | 직원A는 주문1 |
패턴 관계도
┌─────────────────┐
│ Decomposition │
└────────┬────────┘
┌──────────────┼──────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Task │ │ Data │ │ Hybrid │
│ 분해 │ │ 분해 │ │ │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
파이프라인 Chunk 분할 Map/Reduce
Fork/Join Map 패턴
선택 기준 요약
| 상황 | 추천 패턴 |
|---|---|
| 단계별 처리 흐름이 명확 | 파이프라인 |
| 같은 작업을 대량 데이터에 | Chunk 분할, Map |
| 재귀적 분할이 자연스러움 | Fork/Join |
| 대용량 집계/분석 | Map/Reduce |
목차
- Task 분해 - 작업(기능) 단위로 분할
- Data 분해 - 데이터 단위로 분할
- Hybrid - Task + Data 결합
- 공통 고려사항
1. Task 분해
**작업(기능)**을 기준으로 분할. 서로 다른 작업을 병렬로 수행.
[전체 작업]
↓
[작업A] [작업B] [작업C] ← 서로 다른 기능
↓ ↓ ↓
코어1 코어2 코어3
1.1 파이프라인 패턴 (Pipeline Pattern)
작업을 여러 단계로 나누고, 각 단계를 서로 다른 코어/스레드에 배정하는 방식
특징
- 각 단계가 순차적으로 연결 (Stage 1 → Stage 2 → Stage 3)
- 한 단계가 완료되면 다음 단계로 데이터 전달
- 처리량(Throughput) 향상에 유리
왜 처리량이 향상되는가?
순차 처리 (파이프라인 없음):
Item1: [Read][Process][Save] ----대기---- [Read][Process][Save] Item2
|------ 3초 ------| |------ 3초 ------|
총 6초 (2개 아이템)
파이프라인 처리:
시간: T1 T2 T3 T4 T5
Read: [I1] [I2] [I3]
Process: [I1] [I2] [I3]
Save: [I1] [I2] [I3]
- 개별 아이템의 지연시간(Latency)은 동일 (3단계 거쳐야 함)
- 하지만 여러 아이템이 동시에 서로 다른 단계를 처리
- T3 시점: Read는 I3, Process는 I2, Save는 I1 동시 처리
- → 단위 시간당 완료되는 아이템 수 증가 (공장 조립 라인 원리)
예시
- 이미지 처리: 읽기 → 필터 적용 → 저장
- ETL: Extract → Transform → Load
- 영상 인코딩: 디코딩 → 필터 → 인코딩
구현 (Python)
from concurrent.futures import ThreadPoolExecutor
from queue import Queue
def stage1_read(data):
return f"read:{data}"
def stage2_process(data):
return f"processed:{data}"
def stage3_save(data):
return f"saved:{data}"
# 각 스테이지를 별도 스레드에서 실행
def pipeline(items):
with ThreadPoolExecutor(max_workers=3) as executor:
# Stage 1: 읽기
stage1_results = list(executor.map(stage1_read, items))
# Stage 2: 처리
stage2_results = list(executor.map(stage2_process, stage1_results))
# Stage 3: 저장
stage3_results = list(executor.map(stage3_save, stage2_results))
return stage3_results
1.2 Fork/Join 패턴
작업을 재귀적으로 분할(Fork)하고 결과를 병합(Join)
특징
- 분할 정복(Divide & Conquer) 알고리즘의 병렬화
- 문제를 재귀적으로 쪼깨는 것이 핵심
- 작업이 충분히 작아질 때까지 재귀적으로 분할
- 결과를 상위로 병합하며 올라감
왜 재귀적 분할이 병렬화에 효과적인가?
병합 정렬 예시: [8,4,2,6,5,1,7,3]
순차 처리:
[8,4,2,6,5,1,7,3] → [4,8] → [2,6] → [1,5] → [3,7] → ... (순서대로)
Fork/Join 병렬 처리:
[8,4,2,6,5,1,7,3]
/ \
[8,4,2,6] ←(Fork 병렬)→ [5,1,7,3]
/ \ / \
[8,4] [2,6] [5,1] [7,3] ← 4개 동시 처리
↓ ↓ ↓ ↓
[4,8] [2,6] [1,5] [3,7] ← Join (병합)
\ / \ /
[2,4,6,8] [1,3,5,7]
\ /
[1,2,3,4,5,6,7,8]
- 분할할 때마다 독립적인 서브트리 생성 → 병렬 처리 가능
- 트리 깊이가 깊을수록 동시 실행 가능한 작업 수 증가 (2^depth)
- threshold 설정: 너무 작으면 Fork 오버헤드 > 병렬화 이득
예시
- 병합 정렬 (Merge Sort)
- 퀵 정렬 (Quick Sort)
- 피보나치 계산
- 디렉토리 크기 계산 (재귀적 탐색)
구현 (Python)
from concurrent.futures import ThreadPoolExecutor
def parallel_merge_sort(arr, executor, threshold=1000):
if len(arr) <= threshold:
return sorted(arr) # 충분히 작으면 순차 정렬
mid = len(arr) // 2
# Fork: 두 부분으로 나눠 병렬 처리
left_future = executor.submit(parallel_merge_sort, arr[:mid], executor, threshold)
right_future = executor.submit(parallel_merge_sort, arr[mid:], executor, threshold)
# Join: 결과 병합
left = left_future.result()
right = right_future.result()
return merge(left, right)
def merge(left, right):
result = []
i = j = 0
while i < len(left) and j < len(right):
if left[i] <= right[j]:
result.append(left[i])
i += 1
else:
result.append(right[j])
j += 1
result.extend(left[i:])
result.extend(right[j:])
return result
2. Data 분해
데이터를 기준으로 분할. 같은 작업을 여러 데이터에 병렬 적용.
[전체 데이터]
↓
[chunk1] [chunk2] [chunk3] ← 같은 작업
↓ ↓ ↓
코어1 코어2 코어3
Chunk 분할 vs Map 패턴
- 관심사가 다름
- Chunk 분할: "데이터를 어떻게 나눌까?"
- Map 패턴: "각 데이터에 어떤 작업을 할까?"
- 꼭 둘중 하나의 패턴만 쓰는 건 아님
- Chunk 분할로 나눈 데이터를 Map 패턴으로 처리 가능
2.1 Chunk 분할 (Data Partitioning)
가장 기본적인 아이디어 데이터를 chunk로 나눠 동일한 작업을 병렬로 수행
특징
- 같은 연산을 여러 데이터에 독립적으로 적용
- 데이터 간 의존성이 없어야 함
왜 데이터 간 의존성이 없어야 하는가?
의존성 있는 경우 (병렬화 불가):
피보나치: F(n) = F(n-1) + F(n-2)
→ F(5)를 계산하려면 F(4)와 F(3)이 먼저 필요
→ 순서대로 계산해야 함 (병렬화 X)
의존성 없는 경우 (병렬화 가능):
배열 합계: [1,2,3,4,5,6,7,8]
→ chunk1 합계(1+2+3+4)와 chunk2 합계(5+6+7+8)는 서로 독립
→ 동시에 계산 가능 (병렬화 O)
- 의존성이 있으면 선행 작업 완료를 대기해야 함
- 의존성이 없으면 각 코어가 독립적으로 자기 chunk만 처리
- SIMD와 유사: 하나의 명령(sum)을 여러 데이터에 동시 적용
예시
- 배열 합계: 배열을 N개로 나눠 각각 합산 후 최종 합산
- 이미지 처리: 이미지를 영역별로 나눠 병렬 처리
- 대용량 파일 처리: 파일을 chunk로 나눠 병렬 읽기
구현 (Python)
from concurrent.futures import ProcessPoolExecutor
import numpy as np
def sum_chunk(chunk):
return sum(chunk)
def parallel_sum(data, num_workers=4):
# 데이터를 chunk로 분할
chunks = np.array_split(data, num_workers)
with ProcessPoolExecutor(max_workers=num_workers) as executor:
partial_sums = list(executor.map(sum_chunk, chunks))
return sum(partial_sums)
# 사용
data = list(range(1_000_000))
result = parallel_sum(data) # 각 코어가 25만개씩 처리
2.2 Map 패턴
각 요소에 독립적으로 동일한 함수 적용
특징
- Side effect 없이 순수 함수 적용
- 요소 간 의존성 없음 → 완벽한 병렬화 가능
- 함수형 프로그래밍의 핵심 패턴
왜 Side effect가 없어야 완벽한 병렬화가 가능한가?
# Side effect 있는 경우 (병렬화 위험):
total = 0
def add_to_total(x):
global total
total += x # 공유 상태 수정 → Race condition!
return x * 2
# Side effect 없는 경우 (안전한 병렬화):
def double(x):
return x * 2 # 입력만 보고 출력 결정, 외부 상태 변경 X
- Side effect: 함수가 외부 상태(전역변수, 파일, DB 등)를 변경하는 것
- Side effect가 있으면 실행 순서에 따라 결과가 달라짐 → 동기화 필요
- Side effect가 없으면:
- 어떤 순서로 실행해도 결과 동일
- 락/동기화 없이 병렬 실행 가능
- 각 요소의 처리가 완전히 독립적
예시
- 리스트의 모든 요소 제곱
- URL 리스트에서 각각 데이터 fetch
- 이미지 리스트의 각 이미지 리사이즈
구현 (Python)
from concurrent.futures import ThreadPoolExecutor
import requests
def fetch_url(url):
response = requests.get(url)
return response.status_code
urls = [
"https://api.example.com/1",
"https://api.example.com/2",
"https://api.example.com/3",
]
# 병렬 Map
with ThreadPoolExecutor(max_workers=10) as executor:
results = list(executor.map(fetch_url, urls))
# results: [200, 200, 404]
3. Hybrid
Task + Data 분해를 결합. 데이터를 나누고(Map), 결과를 집계(Reduce).
[전체 데이터]
↓ Split
[chunk1] [chunk2] [chunk3]
↓ Map (병렬)
[결과1] [결과2] [결과3]
↓ Reduce
[최종 결과]
3.1 Map/Reduce 패턴
Map: 데이터에 함수 적용 (병렬) → Reduce: 결과를 하나로 합침
특징
- 대용량 데이터 처리의 핵심 패턴
- Map 단계: 병렬로 데이터 변환/필터링 (Data 분해)
- Reduce 단계: 중간 결과를 집계 (Task 분해)
왜 대용량 데이터 처리에 적합한가?
10TB 로그에서 에러 카운트 (단일 머신):
[10TB 로그] → 순차 스캔 → 10시간 소요
Map/Reduce (100대 클러스터):
[10TB 로그]
↓ Split
[100GB] x 100대
↓ Map (병렬)
각 머신: 자기 chunk에서 에러 카운트 → 6분
↓ Reduce
100개 결과 합산 → 최종 에러 카운트
총 ~6분 소요
- 수평 확장(Scale-out): 머신 추가로 처리량 선형 증가
- 데이터 지역성: 데이터가 있는 곳에서 연산 (네트워크 이동 최소화)
- Fault tolerance: 한 노드 실패해도 해당 chunk만 재처리
- 단순한 프로그래밍 모델: Map과 Reduce 함수만 작성하면 분산 처리는 프레임워크가 담당
예시
- 단어 빈도수 계산 (Word Count)
- 로그 분석 (에러 카운트)
- 분산 검색 (각 노드 검색 → 결과 병합)
구현 (Python)
from concurrent.futures import ProcessPoolExecutor
from collections import Counter
from functools import reduce
def map_word_count(text_chunk):
"""Map: 텍스트에서 단어 빈도수 계산"""
words = text_chunk.lower().split()
return Counter(words)
def reduce_counters(counter1, counter2):
"""Reduce: 두 Counter를 병합"""
return counter1 + counter2
def parallel_word_count(texts, num_workers=4):
with ProcessPoolExecutor(max_workers=num_workers) as executor:
# Map 단계: 각 텍스트 chunk의 단어 빈도수 계산
counters = list(executor.map(map_word_count, texts))
# Reduce 단계: 모든 Counter 병합
final_count = reduce(reduce_counters, counters)
return final_count
# 사용
texts = [
"hello world hello",
"world foo bar",
"hello bar baz",
]
result = parallel_word_count(texts)
# Counter({'hello': 3, 'world': 2, 'bar': 2, 'foo': 1, 'baz': 1})
Hadoop 스타일 Map/Reduce 흐름
Input Data
↓
[Split] → chunk1, chunk2, chunk3, chunk4
↓
[Map] → (key, value) pairs 생성
↓
[Shuffle] → 같은 key끼리 그룹핑
↓
[Reduce] → 각 key별로 집계
↓
Output
4. 공통 고려사항
4.1 Granularity (세분화 수준)
작업을 얼마나 잘게 나눌 것인가의 문제. 모든 분해 패턴에 공통 적용.
Fine-grained (세밀한 분할)
- 작은 단위로 많이 나눔
- 장점: 병렬성 극대화, 부하 분산 용이
- 단점: 통신/동기화 오버헤드 증가
Coarse-grained (거친 분할)
- 큰 단위로 적게 나눔
- 장점: 오버헤드 최소화
- 단점: 부하 불균형 가능, 병렬성 제한
왜 트레이드오프가 발생하는가?
작업: 1000개 아이템 처리, 4코어 CPU
Case 1: Fine-grained (1000개 태스크)
- 태스크당 처리: 1ms
- 태스크 생성/스케줄링 오버헤드: 0.5ms x 1000 = 500ms
- 총: 1000ms + 500ms = 1500ms
Case 2: Coarse-grained (4개 태스크)
- 태스크당 처리: 250ms (250개씩)
- 오버헤드: 0.5ms x 4 = 2ms
- 총: 250ms + 2ms = 252ms ← 훨씬 빠름
Case 3: 작업 크기 불균일 (Coarse-grained의 문제)
- 코어1: 100ms, 코어2: 100ms, 코어3: 100ms, 코어4: 400ms
- 총: 400ms (코어4가 병목) ← 3개 코어는 놀고 있음
- 오버헤드 요소: 태스크 생성, 큐 관리, 컨텍스트 스위칭, 결과 수집
- 부하 불균형: Coarse-grained에서 작업 크기가 불균일하면 일부 코어만 바쁨
- 최적점: 오버헤드를 숨길 수 있을 만큼 크고, 부하 분산이 될 만큼 작은 크기
선택 기준
| 상황 | 권장 |
|---|---|
| 작업 크기 균일 | Coarse-grained |
| 작업 크기 불균일 | Fine-grained |
| 통신 비용 높음 | Coarse-grained |
| CPU-bound 작업 | 코어 수에 맞춤 |
| I/O-bound 작업 | Fine-grained 가능 |
Agglomeration (응집)
너무 세밀하게 나눈 경우, 다시 묶어서 오버헤드 감소
# 너무 세밀함 (오버헤드 큼)
chunks = np.array_split(data, 1000) # 1000개 chunk
# 적절한 응집
chunks = np.array_split(data, num_cpus) # CPU 수만큼만
패턴 선택 가이드
| 분해 유형 | 패턴 | 적합한 상황 |
|---|---|---|
| Task | 파이프라인 | 단계별 처리가 명확할 때, 스트리밍 데이터 |
| Task | Fork/Join | 재귀적 분할이 자연스러운 알고리즘 |
| Data | Chunk 분할 | 같은 작업을 대량 데이터에 적용 |
| Data | Map | 독립적인 변환 작업 |
| Hybrid | Map/Reduce | 대용량 데이터 집계/분석 |
정리
핵심 개념 한 줄 요약
| 패턴 | 핵심 |
|---|---|
| 파이프라인 | 작업을 단계별로 나눠 처리량 향상 (조립 라인) |
| Fork/Join | 재귀적으로 분할 → 병렬 처리 → 병합 (분할 정복) |
| Chunk 분할 | 데이터를 chunk로 나눠 같은 작업 병렬 수행 |
| Map | 각 요소에 독립적으로 함수 적용 (순수 함수) |
| Map/Reduce | Map으로 분산 처리 → Reduce로 집계 |
| Granularity | 분할 크기 결정: 오버헤드 vs 부하 균형 트레이드오프 |
의사결정 플로우차트
병렬화할 작업이 있다
│
▼
┌─────────────────────────────────┐
│ 무엇을 기준으로 나눌 것인가? │
└─────────────────────────────────┘
│
┌────┴────┐
▼ ▼
Task Data
기준 기준
│ │
│ ┌────┴────┐
│ ▼ ▼
│ 결과 집계 집계 없음
│ 필요? (변환만)
│ │ │
│ ▼ ▼
│ Map/ Map 또는
│ Reduce Chunk 분할
│
├─────────────┐
▼ ▼
단계별 재귀적
흐름? 구조?
│ │
▼ ▼
파이프라인 Fork/Join
병렬화 성공의 3가지 조건
- 독립성: 작업/데이터 간 의존성이 없어야 함
- 충분한 작업량: 병렬화 오버헤드 > 이득이면 역효과
- 적절한 분할 크기: Fine/Coarse 트레이드오프 고려
실무 체크리스트
□ 의존성 분석: 병렬 처리 가능한가?
□ 분해 기준 선택: Task vs Data vs Hybrid
□ 패턴 선택: 파이프라인 / Fork-Join / Map / Map-Reduce
□ Granularity 결정: 작업 크기 균일? 통신 비용?
□ 동기화 필요성: Side effect 있는가? 공유 상태?
□ 오버헤드 측정: 병렬화 이득 > 오버헤드인가?