RxJS Subject는 발행과 구독을 겸하는 멀티캐스트 Observable이고 4형제는 과거값 재생 정책으로 갈린다
·수정 1회
요약
- Subject는 밖에서
next()로 값을 밀어넣을 수 있는 Observable — "발행자 + 구독 대상"을 겸한다. - 4형제(
Subject/ReplaySubject/BehaviorSubject/AsyncSubject)의 차이는 "늦게 구독한 사람에게 과거 값을 얼마나 다시 틀어주냐" 한 축으로 갈린다. - 더 강한 게 좋은 게 아니라 요구에 맞는 것을 골라야 한다. 기본은
Subject, 필요할 때 올려쓴다.
본문
Subject = 발행자이자 Observable
일반 Observable은 값 발행을 내부에서만 한다. Subject는 밖에서 직접 값을 밀어넣을 수 있다.
const subject = new Subject<string>();
subject.subscribe(v => console.log('구독자:', v));
subject.next('안'); // 밖에서 발행
subject.next('녕');
subject.next(값)— 외부에서 발행subject.subscribe(...)— 구독- Subject는 멀티캐스트다. 구독자가 여럿이면 같은 값을 모두가 공유해서 받는다(일반 cold Observable은 구독마다 독립 실행 = 유니캐스트).
핵심 함정: 생성 시점 ≠ 구독 시점
Subject를 만든 순간과 누군가 구독하는 순간 사이에는 시간차가 있을 수 있다. 그 틈에 발행된 값을 어떻게 처리하느냐가 4형제를 가른다.
시간 →
[생성] [next:안] [next:녕] [구독] [next:하세요]
Subject: 버림 버림 ←여기부터 받음 "하세요"만
Replay: 저장 저장 ←구독시 재생→ "안","녕","하세요" 전부
일반 Subject는 생방송 TV(늦게 틀면 앞부분 못 봄), ReplaySubject는 다시보기(VOD).
4형제 비교
| 종류 | 늦게 구독하면 받는 값 | 비유 | 쓸 때 |
|---|---|---|---|
Subject |
구독 이후 값만 | 생방송 | 기본값. 과거 불필요 |
ReplaySubject(n) |
최근 n개 과거값 재생 (무인자=전부) | 다시보기 | 생성~구독 틈의 값을 살려야 함 |
BehaviorSubject(초기값) |
마지막 1개(없으면 초기값) | 현재 상태판 | "현재 값" 하나가 핵심 (온라인 수, 현재 유저) |
AsyncSubject |
complete 시점의 최종 1개만 | 최종 결과 | 끝났을 때 결과값 하나만 |
ReplaySubject가 "무조건 좋은 것"은 아니다
재생을 위해 값을 버퍼에 저장하는 대가가 있다.
- 메모리 — 무제한 버퍼(
new ReplaySubject())를 끝나지 않는 무한 스트림에 쓰면 값이 계속 쌓여 누수. 무한 스트림엔ReplaySubject(1)또는 다른 방식. - 낡은 값 재생이 버그일 수 있음 — "현재 상태"만 의미 있는 데이터는 과거값 재생이 오히려 틀린 동작. 이 경우
BehaviorSubject가 맞다. - 의도 표현 — 아무 데나 Replay를 쓰면 "왜 재생이 필요하지?"라고 읽는 사람이 헷갈린다.
→ 도구는 강한 게 좋은 게 아니라 요구에 딱 맞는 게 좋은 것. 기본 Subject, 필요할 때만 올려쓴다.
실제 적용 사례 — LLM 토큰 스트리밍 (gen-ai)
NestJS 백엔드가 AI(LLM) 토큰을 받아 흘려보낼 때 ReplaySubject가 정답이었던 케이스:
const subject = new ReplaySubject<string>();
// breaker.execute가 비동기로 AI 토큰을 받자마자 subject.next(token) 발행 시작
return { textStream: subject.asObservable() }; // 구독은 호출부에서 "나중에"
- AI 토큰 발행(즉시 시작)과 호출부 구독(나중) 사이 시간차 존재 → 일반 Subject면 첫 글자들 유실
- AI 답은 끝나면 complete = 유한 스트림 → 무한정 안 쌓임 (메모리 안전)
- 모든 토큰이 다 중요 → 전부 재생 필요
세 조건이 맞아서 ReplaySubject가 적합. 무한 스트림이었다면 무제한 버퍼는 메모리 폭탄이 됐을 것.
관련 노트
- stream은 앞으로만 쌓이고 소비되면 사라지는, 버퍼에서 순서대로 꺼내는 추상화다
- Python에서 SSE는 ASGI 환경에서만 운영 가능하다
- Pub:Sub 기반 캐시 무효화는 TTL을 fallback으로 함께 둬야 한다