Spring WebFlux DataBuffer의 멀티캐스트 연산자: publish().autoConnect(n), cache(), refCount(n) 완전정리
Spring WebFlux에서 DataBuffer 스트림을 멀티캐스트 방식으로 처리할 때 사용하는 주요 연산자(publish().autoConnect(n), cache(), refCount(n))의 개념과 차이, 그리고 실전 활용법을 예제와 함께 쉽고 명확하게 설명합니다. 초보자도 이해할 수 있도록 실제 동작 흐름과 주의사항까지 꼼꼼히 다룹니다.
들어가며
Spring WebFlux 환경에서 대용량 파일 다운로드, 비동기 데이터 스트림 처리 등에서 DataBuffer
를 다루다 보면, 여러 구독자가 동일한 데이터 스트림을 효율적으로 공유해야 하는 상황이 자주 발생합니다. 이때 사용하는 것이 바로 멀티캐스트(multicast) 연산자입니다. 대표적으로 publish().autoConnect(n)
, cache()
, refCount(n)
등이 있습니다. 이 글에서는 각 연산자의 개념, 동작 방식, 실전 예제, 그리고 언제 어떤 연산자를 써야 하는지 명확하게 정리합니다.
DataBuffer란?
DataBuffer
는 Spring WebFlux에서 비동기적으로 데이터를 처리할 때 사용하는 데이터 블록입니다. 주로 파일 다운로드, 업로드, HTTP 요청/응답 바디 등에서 사용되며, Flux<DataBuffer>
형태로 스트림 처리됩니다.
멀티캐스트(Multicast)란?
멀티캐스트란 하나의 데이터 소스를 여러 구독자(subscriber)가 동시에 구독할 수 있도록 데이터를 브로드캐스트하는 방식입니다. 기본적으로 Flux/Mono는 cold stream(구독자마다 새로 실행)인데, 멀티캐스트 연산자를 사용하면 hot stream(여러 구독자가 동일 데이터 공유)으로 바꿀 수 있습니다.
publish().autoConnect(n)
publish()
는 Flux를 ConnectableFlux로 변환하여 멀티캐스트가 가능하게 합니다.autoConnect(n)
은 n명의 구독자가 subscribe할 때까지 소스 실행을 지연시키고, n명이 모이면 데이터를 브로드캐스트합니다.- n명 이후에 subscribe한 구독자는 이미 방출된 데이터는 받을 수 없습니다.
val flux = Flux.range(1, 5)
.publish()
.autoConnect(2) // 2명 구독해야 시작
flux.subscribe { println("A: $it") }
flux.subscribe { println("B: $it") }
// 여기서부터 데이터가 emit되기 시작
- 실전 활용: 여러 클라이언트가 동시에 파일 다운로드를 시작해야 할 때, 모두 준비될 때까지 대기 후 한 번만 소스를 실행하고 데이터를 공유할 수 있습니다.
- 주의: n명 이전에 구독한 사람만 데이터 전체를 받음. 이후 구독자는 중간부터 받음.
cache()
cache()
는 소스에서 방출된 모든 데이터를 메모리에 저장하여, 이후 구독자도 전체 데이터를 받을 수 있게 합니다.- 첫 번째 구독자가 subscribe할 때 소스가 실행되고, 이후 구독자는 캐시된 데이터를 그대로 받습니다.
val flux = Flux.range(1, 5)
.cache()
flux.subscribe { println("A: $it") }
Thread.sleep(100)
flux.subscribe { println("B: $it") } // A와 동일한 데이터 모두 받음
- 실전 활용: 파일 다운로드, 이미지 등 동일한 데이터를 여러 번 재사용해야 할 때 유용
- 주의: 데이터가 메모리에 모두 저장되므로, 대용량 데이터에는 주의 필요
refCount(n)
refCount(n)
은 n명의 구독자가 subscribe하면 소스가 실행되고, 구독자가 n명 미만이 되면 소스를 종료합니다.- hot stream이지만, 구독자가 모두 사라지면 소스도 종료되어 재구독 시 새로 시작
val flux = Flux.range(1, 5)
.publish()
.refCount(2)
val subscription1 = flux.subscribe { println("A: $it") }
val subscription2 = flux.subscribe { println("B: $it") }
// 두 명이 구독하면 데이터 emit 시작
- 실전 활용: 구독자가 모두 떠나면 리소스를 해제해야 하는 스트림에 적합
- 주의: 구독자가 모두 사라지면 소스가 종료되고, 다시 subscribe하면 새로 시작함
연산자별 비교 및 선택 가이드
연산자 | 시작 조건 | 캐싱 여부 | 구독자 동기화 | 실전 활용 예시 |
---|---|---|---|---|
publish().autoConnect(n) | n명 구독 시 시작 | X | n명 이후 중간 구독자는 일부만 받음 | 동시에 시작해야 하는 멀티 다운로드 등 |
cache() | 1명 구독 시 시작 | O | 모든 구독자 전체 데이터 | 여러 번 재사용하는 파일/이미지 등 |
refCount(n) | n명 구독 시 시작 | X | n명 미만 되면 소스 종료 | 리소스 해제가 중요한 스트림 |
상황별 선택 시나리오와 실전 예시
- publish().autoConnect(n)
- 추천 상황: 여러 사용자가 동시에 대용량 파일을 다운로드해야 할 때, 모든 사용자가 준비될 때까지 기다렸다가 한 번만 소스를 실행하고 데이터를 공유해야 할 때 적합합니다.
- 예시: 실시간 방송 스트림, 동시 시작이 중요한 멀티 다운로드, 여러 서비스가 동시에 동일한 데이터를 받아야 하는 경우.
- 주의: n명 이후에 구독한 사용자는 이미 방출된 데이터는 받을 수 없습니다. 중간에 늦게 들어온 구독자는 데이터 일부만 받게 되어, 데이터 누락이 발생할 수 있습니다.
- cache()
- 추천 상황: 동일한 데이터를 여러 번 재사용해야 하거나, 구독 시점에 상관없이 모든 구독자가 전체 데이터를 받아야 할 때 적합합니다.
- 예시: 이미지, 파일 등 캐싱이 필요한 리소스 제공, 여러 번 재생되는 미디어 스트림, 반복적으로 접근하는 데이터.
- 주의: 모든 데이터를 메모리에 저장하므로, 데이터가 크거나 무한 스트림일 경우 메모리 부족 문제가 발생할 수 있습니다. 대용량 데이터에는 신중하게 사용해야 합니다.
- refCount(n)
- 추천 상황: 구독자가 모두 떠나면 자동으로 리소스를 해제해야 하거나, 구독자가 다시 모이면 소스를 새로 시작해야 하는 경우에 적합합니다.
- 예시: 실시간 센서 데이터, 구독자가 없을 때 자동으로 연결을 끊어야 하는 IoT/네트워크 스트림, 리소스 관리가 중요한 상황.
- 주의: 모든 구독자가 연결을 끊으면 소스가 종료되고, 다시 구독하면 처음부터 재시작합니다. 중간에 들어온 구독자는 데이터 일부만 받을 수 있습니다.
정리: “동시에 시작해야 한다면 publish().autoConnect(n)”, “모든 구독자가 전체 데이터를 받아야 한다면 cache()”, “구독자가 없을 때 자동 종료가 필요하다면 refCount(n)”을 선택하세요. 상황에 따라 잘못 선택하면 데이터 누락, 메모리 부족 등 문제가 발생할 수 있으니, 스트림의 특성과 요구사항을 충분히 고려해야 합니다.
실무에서의 주의사항 및 팁
- 대용량 데이터는 cache() 사용 시 메모리 사용량에 주의
- publish().autoConnect(n), refCount(n)는 구독자 타이밍에 따라 일부 데이터 손실 가능성 있음
- 멀티캐스트 연산자는 hot stream으로 동작하므로, cold stream과의 차이를 꼭 이해하고 사용
참고 레퍼런스
멀티캐스트 연산자를 올바르게 사용하면 WebFlux에서 효율적이고 안정적인 데이터 스트림 처리가 가능합니다. 각 연산자의 동작 특성과 차이를 명확히 이해하고, 상황에 맞게 선택하세요!