Spring WebFlux에서 DataBuffer 스트림을 멀티캐스트 방식으로 처리할 때 사용하는 주요 연산자(publish().autoConnect(n), cache(), refCount(n))의 개념과 차이, 그리고 실전 활용법을 예제와 함께 쉽고 명확하게 설명합니다. 초보자도 이해할 수 있도록 실제 동작 흐름과 주의사항까지 꼼꼼히 다룹니다.


들어가며

Spring WebFlux 환경에서 대용량 파일 다운로드, 비동기 데이터 스트림 처리 등에서 DataBuffer를 다루다 보면, 여러 구독자가 동일한 데이터 스트림을 효율적으로 공유해야 하는 상황이 자주 발생합니다. 이때 사용하는 것이 바로 멀티캐스트(multicast) 연산자입니다. 대표적으로 publish().autoConnect(n), cache(), refCount(n) 등이 있습니다. 이 글에서는 각 연산자의 개념, 동작 방식, 실전 예제, 그리고 언제 어떤 연산자를 써야 하는지 명확하게 정리합니다.


DataBuffer란?

DataBuffer는 Spring WebFlux에서 비동기적으로 데이터를 처리할 때 사용하는 데이터 블록입니다. 주로 파일 다운로드, 업로드, HTTP 요청/응답 바디 등에서 사용되며, Flux<DataBuffer> 형태로 스트림 처리됩니다.


멀티캐스트(Multicast)란?

멀티캐스트란 하나의 데이터 소스를 여러 구독자(subscriber)가 동시에 구독할 수 있도록 데이터를 브로드캐스트하는 방식입니다. 기본적으로 Flux/Mono는 cold stream(구독자마다 새로 실행)인데, 멀티캐스트 연산자를 사용하면 hot stream(여러 구독자가 동일 데이터 공유)으로 바꿀 수 있습니다.


publish().autoConnect(n)

  • publish()는 Flux를 ConnectableFlux로 변환하여 멀티캐스트가 가능하게 합니다.
  • autoConnect(n)은 n명의 구독자가 subscribe할 때까지 소스 실행을 지연시키고, n명이 모이면 데이터를 브로드캐스트합니다.
  • n명 이후에 subscribe한 구독자는 이미 방출된 데이터는 받을 수 없습니다.
val flux = Flux.range(1, 5)
    .publish()
    .autoConnect(2) // 2명 구독해야 시작

flux.subscribe { println("A: $it") }
flux.subscribe { println("B: $it") }
// 여기서부터 데이터가 emit되기 시작
  • 실전 활용: 여러 클라이언트가 동시에 파일 다운로드를 시작해야 할 때, 모두 준비될 때까지 대기 후 한 번만 소스를 실행하고 데이터를 공유할 수 있습니다.
  • 주의: n명 이전에 구독한 사람만 데이터 전체를 받음. 이후 구독자는 중간부터 받음.

cache()

  • cache()는 소스에서 방출된 모든 데이터를 메모리에 저장하여, 이후 구독자도 전체 데이터를 받을 수 있게 합니다.
  • 첫 번째 구독자가 subscribe할 때 소스가 실행되고, 이후 구독자는 캐시된 데이터를 그대로 받습니다.
val flux = Flux.range(1, 5)
    .cache()

flux.subscribe { println("A: $it") }
Thread.sleep(100)
flux.subscribe { println("B: $it") } // A와 동일한 데이터 모두 받음
  • 실전 활용: 파일 다운로드, 이미지 등 동일한 데이터를 여러 번 재사용해야 할 때 유용
  • 주의: 데이터가 메모리에 모두 저장되므로, 대용량 데이터에는 주의 필요

refCount(n)

  • refCount(n)은 n명의 구독자가 subscribe하면 소스가 실행되고, 구독자가 n명 미만이 되면 소스를 종료합니다.
  • hot stream이지만, 구독자가 모두 사라지면 소스도 종료되어 재구독 시 새로 시작
val flux = Flux.range(1, 5)
    .publish()
    .refCount(2)

val subscription1 = flux.subscribe { println("A: $it") }
val subscription2 = flux.subscribe { println("B: $it") }
// 두 명이 구독하면 데이터 emit 시작
  • 실전 활용: 구독자가 모두 떠나면 리소스를 해제해야 하는 스트림에 적합
  • 주의: 구독자가 모두 사라지면 소스가 종료되고, 다시 subscribe하면 새로 시작함

연산자별 비교 및 선택 가이드

연산자 시작 조건 캐싱 여부 구독자 동기화 실전 활용 예시
publish().autoConnect(n) n명 구독 시 시작 X n명 이후 중간 구독자는 일부만 받음 동시에 시작해야 하는 멀티 다운로드 등
cache() 1명 구독 시 시작 O 모든 구독자 전체 데이터 여러 번 재사용하는 파일/이미지 등
refCount(n) n명 구독 시 시작 X n명 미만 되면 소스 종료 리소스 해제가 중요한 스트림

상황별 선택 시나리오와 실전 예시

  • publish().autoConnect(n)
    • 추천 상황: 여러 사용자가 동시에 대용량 파일을 다운로드해야 할 때, 모든 사용자가 준비될 때까지 기다렸다가 한 번만 소스를 실행하고 데이터를 공유해야 할 때 적합합니다.
    • 예시: 실시간 방송 스트림, 동시 시작이 중요한 멀티 다운로드, 여러 서비스가 동시에 동일한 데이터를 받아야 하는 경우.
    • 주의: n명 이후에 구독한 사용자는 이미 방출된 데이터는 받을 수 없습니다. 중간에 늦게 들어온 구독자는 데이터 일부만 받게 되어, 데이터 누락이 발생할 수 있습니다.
  • cache()
    • 추천 상황: 동일한 데이터를 여러 번 재사용해야 하거나, 구독 시점에 상관없이 모든 구독자가 전체 데이터를 받아야 할 때 적합합니다.
    • 예시: 이미지, 파일 등 캐싱이 필요한 리소스 제공, 여러 번 재생되는 미디어 스트림, 반복적으로 접근하는 데이터.
    • 주의: 모든 데이터를 메모리에 저장하므로, 데이터가 크거나 무한 스트림일 경우 메모리 부족 문제가 발생할 수 있습니다. 대용량 데이터에는 신중하게 사용해야 합니다.
  • refCount(n)
    • 추천 상황: 구독자가 모두 떠나면 자동으로 리소스를 해제해야 하거나, 구독자가 다시 모이면 소스를 새로 시작해야 하는 경우에 적합합니다.
    • 예시: 실시간 센서 데이터, 구독자가 없을 때 자동으로 연결을 끊어야 하는 IoT/네트워크 스트림, 리소스 관리가 중요한 상황.
    • 주의: 모든 구독자가 연결을 끊으면 소스가 종료되고, 다시 구독하면 처음부터 재시작합니다. 중간에 들어온 구독자는 데이터 일부만 받을 수 있습니다.

정리: “동시에 시작해야 한다면 publish().autoConnect(n)”, “모든 구독자가 전체 데이터를 받아야 한다면 cache()”, “구독자가 없을 때 자동 종료가 필요하다면 refCount(n)”을 선택하세요. 상황에 따라 잘못 선택하면 데이터 누락, 메모리 부족 등 문제가 발생할 수 있으니, 스트림의 특성과 요구사항을 충분히 고려해야 합니다.


실무에서의 주의사항 및 팁

  • 대용량 데이터는 cache() 사용 시 메모리 사용량에 주의
  • publish().autoConnect(n), refCount(n)는 구독자 타이밍에 따라 일부 데이터 손실 가능성 있음
  • 멀티캐스트 연산자는 hot stream으로 동작하므로, cold stream과의 차이를 꼭 이해하고 사용

참고 레퍼런스


멀티캐스트 연산자를 올바르게 사용하면 WebFlux에서 효율적이고 안정적인 데이터 스트림 처리가 가능합니다. 각 연산자의 동작 특성과 차이를 명확히 이해하고, 상황에 맞게 선택하세요!