Working With Reactive Kafka Stream and Spring WebFlux

1. 개요

이 글에서는 Reactive Kafka Streams를 탐구하고, 이를 샘플 Spring WebFlux 애플리케이션에 통합하여 이 조합이 우리가 확장성, 효율성 및 실시간 처리가 가능한 완전한 반응형 데이터 집약적 애플리케이션을 어떻게 구축할 수 있게 하는지를 살펴보겠습니다.

이를 위해 Spring Cloud Stream Reactive Kafka Binder, 및 ClickHouse를 사용할 것입니다.

2. Spring Cloud Stream Reactive Kafka Binder

Spring Cloud Stream은 스트림 기반 및 메시지 주도 마이크로서비스에 대한 추상화 계층을 제공합니다. Reactive Kafka Binder는 Kafka 주제, 메시지 브로커 또는 Spring Cloud Stream 애플리케이션을 연결하여 완전한 반응형 파이프라인을 생성할 수 있도록 해줍니다. 이러한 파이프라인은 Project Reactor를 활용하여 데이터 스트림을 반응형으로 처리하여 데이터 흐름 전반에서 비차단, 비동기 및 백프레셔 인식 처리를 보장합니다.

전통적인 Kafka Streams와 달리, Reactive Kafka Streams는 개발자가 실시간으로 각 데이터 조각을 매핑, 변환, 필터링 또는 축소할 수 있는 종단 간 반응형 파이프라인을 정의할 수 있게 해주며, 효율적인 자원 활용을 유지합니다.

이 접근 방식은 반응형 패러다임을 필요로 하는 높은 처리량의 이벤트 기반 애플리케이션에 특히 적합합니다.

2.1. Spring과 함께 하는 Reactive Kafka Streams

Spring Cloud Stream Reactive Kafka Binder를 사용하면 Reactive Kafka Streams를 Spring WebFlux 애플리케이션에 원활하게 통합하여 완전히 반응형이며 비차단 데이터 처리를 가능하게 합니다. Project Reactor에서 제공하는 반응형 API를 활용하여 우리는 백프레셔를 처리하고 비동기 데이터 흐름을 달성하며, 스레드를 차단하지 않고 효율적으로 스트림을 처리할 수 있습니다.

이 Reactive Kafka Streams와 Spring WebFlux의 조합은 분산, 실시간, 반응형 데이터 파이프라인을 구축하는 데 강력한 솔루션을 제공합니다.

이제 이러한 기능이 실제로 어떻게 작동하는지를 보여주기 위한 샘플 애플리케이션으로 들어가 보겠습니다.

3. Reactive Kafka Stream 애플리케이션 구축하기

이 샘플 애플리케이션에서는 주식 가격 데이터를 수신, 처리 및 배포하는 주식 분석 애플리케이션을 시뮬레이션합니다. 이 애플리케이션은 Spring Cloud Stream, Kafka 및 반응형 프로그래밍 패러다임이 Spring 에코시스템 내에서 어떻게 잘 작동하는지를 보여줄 것입니다.

우선, Spring Boot를 사용하여 이러한 애플리케이션을 구축하는 데 필요한 모든 종속성을 가져와야 합니다:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>2023.0.2</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

이 예제에서는 모든 종속성의 버전을 해결하는 Spring Cloud BOM을 사용할 것입니다. 우리는 또한 Spring Boot 및 다음 모듈을 사용할 것입니다:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

이 모듈들은 우리의 웹 레이어와 데이터 수집 파이프라인을 반응형으로 구축할 수 있도록 도와줍니다. 데이터 처리 파이프라인 외에도 결과를 저장하기 위해 일부 데이터 지속성이 필요합니다. 이를 위해 간단하고 아주 강력한 분석 데이터베이스를 사용하겠습니다:

<dependency>
    <groupId>com.clickhouse</groupId>
    <artifactId>clickhouse-r2dbc</artifactId>
    <version>0.7.1</version>
</dependency>

ClickHouse는 SQL 쿼리를 사용하여 실시간 분석 데이터 보고서를 생성하는 빠르고 오픈 소스의 열 지향 데이터베이스 관리 시스템입니다. 우리는 완전한 반응형 애플리케이션을 구축하기 위해 R2DB 드라이버를 사용할 것입니다. 

3.1. Reactive Kafka Producer 설정

데이터 처리 파이프라인을 시작하기 위해 데이터 생성을 담당하는 프로듀서가 필요합니다. 다음으로 Spring이 우리가 프로듀서를 쉽게 정의하고 사용할 수 있도록 도와주는 방법을 살펴보겠습니다:

@Component
public class StockPriceProducer {
    public static final String[] STOCKS = {"AAPL", "GOOG", "MSFT", "AMZN", "TSLA"};
    private static final String CURRENCY = "USD";

    private final ReactiveKafkaProducerTemplate<String, StockUpdate> kafkaProducer;
    private final NewTopic topic;
    private final Random random = new Random();

    public StockPriceProducer(KafkaProperties properties, 
                              @Qualifier(TopicConfig.STOCK_PRICES_IN) NewTopic topic) {
        this.kafkaProducer = new ReactiveKafkaProducerTemplate<>(
          SenderOptions.create(properties.buildProducerProperties())
        );
        this.topic = topic;
    }

    public Flux<SenderResult<Void>> produceStockPrices(int count) {
        return Flux.range(0, count)
          .map(i -> {
              String stock = STOCKS[random.nextInt(STOCKS.length)];
              double price = 100 + (200 * random.nextDouble());
              return MessageBuilder.withPayload(new StockUpdate(stock, price, CURRENCY, Instant.now()))
                .setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();
          })
          .flatMap(stock -> {
              var newRecord = new ProducerRecord<>(
                topic.name(), 
                stock.getPayload().symbol(), 
                stock.getPayload());

              stock.getHeaders()
                .forEach((key, value) -> newRecord.headers().add(key, value.toString().getBytes()));

              return kafkaProducer.send(newRecord);
          });
    }
}

이 클래스는 주식 가격 업데이트를 생성하고 이를 Kafka 주제로 전송합니다.

StockPriceProducer에서 우리는 Kafka 클러스터에 연결하는 데 필요한 모든 정보를 포함하는 KafkaProperties를 주입합니다:

spring:
  kafka:
    producer:
      bootstrap-servers: localhost:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    properties:
      spring:
        json:
          trusted:
            packages: '*'

그런 다음, NewTopic은 우리의 Kafka 주제에 대한 참조를 보유합니다. 이는 우리의 ReactiveKafkaProducerTemplate 인스턴스를 생성하는 데 필요한 모든 것입니다. 이 클래스는 우리의 애플리케이션과 Kafka 주제 간의 통신에 관련된 대부분의 복잡성을 추상화합니다.

produceStockPrices() 메서드에서는 StockUpdate 객체를 생성하고 이를 Message 객체로 포장합니다. Spring은 메시지 페이로드 및 메시지의 콘텐츠 유형으로 포함해야할 모든 필요한 헤더와 같은 메시지 기반 시스템 세부정보를 캡슐화하는 Message 클래스를 제공합니다.

마지막으로, 메시지의 목적지 주제 및 해당 파티션 키를 정의하는 ProducerRecord를 만들고, 이를 전송합니다.

3.2. Reactive Kafka Streams 설정

이제 프로듀서가 동일한 애플리케이션 외부에 있다고 가정해 보겠습니다. 우리는 주식 가격 업데이트 주제에 연결하고 가격을 USD에서 EUR로 변환해야 합니다. 동시에 우리는 특정 시간 창 내에서 원본 주식 가격의 역사도 저장해야 합니다. 그러니 우리의 데이터 스트림 파이프라인을 구성해 보겠습니다:

spring:
  cloud:
    stream:
      default-binder: kafka
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        default:
          content-type: application/json
        processStockPrices-in-0:
          destination: stock-prices-in
          group: live-stock-consumers-x
        processStockPrices-out-0:
          destination: stock-prices-out
          group: live-stock-consumers-y
          producer:
            useNativeEncoding: true

우선, default-binder 속성을 사용하여 Kafka를 기본 바인더로 정의합니다. Spring Cloud Stream은 공급자에 구애받지 않으므로 필요하다면 동일한 애플리케이션 내에서 Kafka와 RabbitMQ와 같은 다양한 메시징 시스템을 사용할 수 있습니다.

다음으로 바인딩을 구성하는데, 이는 메시지 시스템(Kafka 주제 등)과 애플리케이션의 생산자 및 소비자 간의 다리 역할을 합니다:

  • 입력 채널 processStockPrices-in-0은 메시지를 소비하는 stock-prices-in 주제에 바인딩됩니다.
  • 출력 채널 processStockPrices-out-0은 처리된 메시지가 게시되는 stock-prices-out 주제에 바인딩됩니다.

각 바인딩은 입력 채널에서 데이터를 처리하고 변환을 적용하며 결과를 출력 채널로 전송하는 processStockPrices() 메서드와 연관됩니다.

우리는 또한 메시지가 JSON으로 직렬화 및 역직렬화되도록 콘텐츠 유형을 JSON으로 정의합니다. 또한 프로듀서에서 useNativeEncoding: true를 사용하면 Kafka 프로듀서가 데이터의 인코딩 및 직렬화를 담당하게 됩니다.

3.3. Reactive Kafka Streams 바인딩 설정

앞서 언급했듯이 바인딩은 입력 및 출력 채널 간의 다리로서, 전송 중인 데이터를 처리할 수 있도록 합니다. YAML 파일에 정의된 이름은 중요하며, 이는 입력 및 출력 메시지 간의 매핑을 적용하는 함수에 해당해야 합니다.

다음으로 Spring이 이를 어떻게 처리하는지 살펴보겠습니다:

@Configuration
public class StockPriceProcessor {
    private static final String USD = "USD";
    private static final String EUR = "EUR";

    @Bean
    public Function<Flux<Message<StockUpdate>>, Flux<Message<StockUpdate>>> processStockPrices(
      ClickHouseRepository repository, 
      CurrencyRate currencyRate
    ) {
        return stockPrices -> stockPrices.flatMapSequential(message -> {
            StockUpdate stockUpdate = message.getPayload();
            return repository.saveStockPrice(stockUpdate)
              .flatMap(success -> Boolean.TRUE.equals(success) ? Mono.just(stockUpdate) : Mono.empty())
              .flatMap(stock -> currencyRate.convertRate(USD, EUR, stock.price()))
                .map(newPrice -> convertPrice(stockUpdate, newPrice))
                .map(priceInEuro -> MessageBuilder.withPayload(priceInEuro)
                  .setHeader(KafkaHeaders.KEY, stockUpdate.symbol())
                  .copyHeaders(message.getHeaders())
                  .build());
        });
    }

    private StockUpdate convertPrice(StockUpdate stockUpdate, double newPrice) {
        return new StockUpdate(stockUpdate.symbol(), newPrice, EUR, stockUpdate.timestamp());
    }
}

이 구성은 두 Kafka 주제 간의 주식 가격 업데이트를 반응형으로 처리하고 변환하는 방법을 보여줍니다. processStockPrices() 함수는 입력 stock-prices-in 주제를 출력 stock-prices-out 주제로 바인딩하여 두 주제 간에 처리 층을 추가합니다. 흐름은 다음과 같습니다:

  1. 메시지 처리: 입력 주제에서 들어오는 각 StockUpdate 메시지는 flatMapSequential()를 사용하여 순차적으로 처리됩니다. 이는 입력 메시지의 처리 순서가 일치하도록 보장하여 일관성을 유지하는 데 중요할 수 있습니다.
  2. 데이터베이스 지속성: 각 주식 업데이트는 향후 참조를 위해 ClickHouseRepository를 사용하여 데이터베이스에 저장됩니다. 성공적으로 저장된 업데이트만이 계속 진행됩니다.
  3. 통화 변환: 원래 USD로 되어 있는 주식 가격이 CurrencyRate 서비스를 사용하여 EUR로 변환됩니다.
  4. 메시지 변환: 변환된 가격은 새로운 StockUpdate 객체로 감싸지며, Kafka 메시지 키를 통해 원래 심볼을 유지합니다. 이는 Kafka 주제에서 적절한 메시지 파티셔닝을 보장합니다.
  5. 반응형 파이프라인: 전체 흐름은 비차단 비동기 기능을 활용하여 반응적이며, 확장성과 효율성을 제공합니다.

3.4. 보조 서비스

ClickHouseRepositoryCurrencyRate는 샘플 애플리케이션을 설명하기 위한 간단한 인터페이스입니다:

public interface CurrencyRate {
    Mono<Double> convertRate(String from, String to, double amount);
}

public interface ClickHouseRepository {
    Mono<Boolean> saveStockPrice(StockUpdate stockUpdate);
    Flux<StockUpdate> findMinuteAvgStockPrices(Instant from, Instant to);
}

이 기능들은 특정 데이터 파이프라인을 처리하며 애플리케이션이 적용할 수 있는 비즈니스 로직을 보여줍니다.

3.5. Reactive Kafka Streams 소비자 설정

처리가 완료된 후, 출력 채널에 전송된 데이터는 동일한 애플리케이션이나 다른 애플리케이션에서 소비될 수 있습니다. 이러한 소비자도 반응형 Kafka 템플릿을 사용하여 구현할 수 있습니다:

@Component
public class StockPriceConsumer {
    private final ReactiveKafkaConsumerTemplate<String, StockUpdate> kafkaConsumerTemplate;

    public StockPriceConsumer(@NonNull KafkaProperties properties, 
                              @Qualifier(TopicConfig.STOCK_PRICES_OUT) NewTopic topic) {
        var receiverOptions = ReceiverOptions
          .<String, StockUpdate>create(properties.buildConsumerProperties())
          .subscription(List.of(topic.name()));
        this.kafkaConsumerTemplate = new ReactiveKafkaConsumerTemplate<>(receiverOptions);
    }

    @PostConstruct
    public void consume() {
       kafkaConsumerTemplate
         .receiveAutoAck()
         .doOnNext(consumerRecord -> {
             // 시뮬레이션 처리
             log.info(
               "received key={}, value={} from topic={}, offset={}, partition={}", consumerRecord.key(),
               consumerRecord.value(),
               consumerRecord.topic(),
               consumerRecord.offset(),
               consumerRecord.partition());
         })
         .doOnError(e -> log.error("Consumer error",  e))
         .doOnComplete(() -> log.info("Consumed all messages"))
         .subscribe();
    }
}

StockPriceConsumer는 반응형으로 stock-prices-out 주제에서 데이터를 소비하는 방법을 보여줍니다:

  1. 초기화: 생성자는 YAML 구성의 Kafka 속성을 사용하여 ReceiverOptions를 생성합니다. 이는 stock-prices-out 주제를 구독하고 모든 파티션을 명시적으로 할당합니다.
  2. 메시지 처리: consume 메서드는 receiveAutoAck()를 사용하여 출력 채널(processStockPrices-out-0)에 구독합니다. 각 메시지는 키, 값, 주제, 오프셋 및 파티션 세부정보와 함께 로그에 기록되어 데이터 처리를 시뮬레이션합니다.
  3. 반응형 기능: 소비자는 메시지가 도착함에 따라 비차단 처리 기능을 통해 메시지를 처리하기 시작합니다. 또한 오류를 doOnError()로 기록하고 완료 사항을 doOnComplete()로 추적합니다.

다음 속성은 우리의 소비자를 구성합니다:

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      group-id: my-group
      properties:
        reactiveAutoCommit: true

이 소비자는 stock-prices-out 주제를 반응형으로 처리하며, 이 구현은 Kafka와 반응형 프로그래밍의 원활한 통합을 강조하여 효율적인 스트림 처리를 제공합니다.

3.6. Reactive WebFlux 애플리케이션

마지막으로, 데이터가 데이터베이스에 저장되면 우리는 사용자에게 이러한 정보를 적절하게 제공할 수 있으며, 데이터는 캐시되어 필요에 따라 처리될 수 있습니다:

@RestController
public class StocksApi {
    private final ClickHouseRepository repository;

    @Autowired
    public StocksApi(ClickHouseRepository repository) {
        this.repository = repository;
    }

    @GetMapping("/stock-prices-out")
    public Flux<StockUpdate> getAvgStockPrices(
        @RequestParam("from") @NotNull Instant from,  
        @RequestParam("to") @NotNull Instant to) {
        if (from.isAfter(to)) {
            throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "'from' must come before 'to'");
        }

        return repository.findMinuteAvgStockPrices(from, to);
    }
}

4. 연결하기

우리는 최소한의 코드로 완전한 반응형 데이터 처리 파이프라인을 구축하여 두 Kafka 주제를 연결하고 비즈니스 로직을 적용하며 높은 처리량을 달성했습니다. 이러한 접근 방식은 실시간 데이터 변환이 필요한 이벤트 기반 시스템에 이상적입니다. Spring Cloud Stream과 Kafka는 이 글에서 다룬 것 이상의 광범위한 기능을 갖춘 강력한 조합입니다.

예를 들어, 바인딩은 여러 입력 및 출력을 지원하며, 데드 레터 큐(DLQs)는 파이프라인의 견고성을 향상시킬 수 있습니다. 또한 다양한 메시징 공급자를 통합하고 채널 간의 트랜잭션 처리를 활성화하는 등의 작업이 가능합니다.

Spring Cloud Stream은 다재다능한 도구입니다. 이를 반응형 패러다임과 결합하면 회복력과 높은 처리량을 갖춘 강력한 데이터 파이프라인을 열 수 있습니다. 이 글에서는 반응형 Kafka Streams와 Spring WebFlux 작업의 단면만을 보여주었지만, 우리는 지금까지의 주요 이점을 관찰했습니다:

  • 실시간 변환: 이벤트 스트림의 실시간 변환 및 보강을 가능하게 합니다.
  • 백프레셔 관리: 데이터 흐름을 동적으로 처리하여 시스템 과부하를 방지합니다.
  • 원활한 통합: Kafka의 이벤트 기반 강력함과 Spring WebFlux의 비차단 기능을 결합합니다.
  • 확장 가능한 설계: DLQ와 같은 강력한 내결함성 메커니즘을 통해 높은 처리량 시스템을 지원합니다.

이러한 접근 방식은 많은 이점을 제공하지만, 이 글에서 논의한 대로 주의해야 할 몇 가지 사항도 있습니다.

4.1. 실용적인 함정 및 모범 사례

반응형 Kafka 파이프라인은 수많은 장점을 제공하지만, 동시에 도전 과제를 수반합니다:

  • 백프레셔 처리: 백프레셔를 관리하지 않으면 메모리 부풀림 또는 메시지 유실이 발생할 수 있습니다. 필요한 경우 .onBackpressureBuffer() 또는 .onBackpressureDrop()을 활용해야 합니다.
  • 직렬화 문제: 제작자와 소비자 간의 스키마 불일치는 역직렬화 실패를 야기할 수 있습니다. 우리는 스키마 호환성을 보장해야 합니다.
  • 오류 복구: 우리는 적절한 재시도 메커니즘을 보장하거나 transient issues를 효과적으로 처리하기 위해 DLQ를 사용해야 합니다.
  • 자원 관리: 비효율적인 메시지 처리는 애플리케이션 파이프라인을 압도할 수 있습니다. 이 경우 .limitRate() 또는 .take() 연산자를 사용하여 반응형 파이프라인 내에서 처리 속도를 제어할 수 있습니다. Kafka 소비자 페치 크기 및 폴링 간격을 조정하여 Kafka로부터 메시지를 검색하는 속도를 조정하고 애플리케이션 파이프라인이 압도당하지 않도록 할 수 있습니다.
  • 데이터 일관성: 원자 작업이나 적절한 재시도 처리가 없을 경우 비일관적인 데이터 처리가 발생할 수 있습니다. 원자성을 위해 Kafka 트랜잭션을 사용하거나 안전하게 재시도를 처리하는 idempotent 소비자 로직을 작성할 수 있습니다.
  • 스키마 진화: 적절한 버전 관리 없이 스키마를 발전시키면 호환성 문제를 초래할 수 있습니다. 우리는 스키마 레지스트리를 사용하여 버전 관리를 수행하고 역호환 가능한 변경을 적용할 수 있습니다(예: 선택적 필드 추가).
  • 모니터링 및 가시성: 모니터링이 부족하면 파이프라인의 병목 현상이나 실패를 파악하기 어려워질 수 있습니다. 우리는 Micrometer 와 Grafana (또는 다른 기본 제공 도구)를 통합하여 메트릭 및 모니터링을 제공해야 합니다. 우리는 또한 분산 추적을 위해 Kafka 메시지에 추적 ID를 추가할 수 있습니다.

이러한 사항에 주의를 기울이면 시스템에 안정적이고 확장 가능한 데이터 처리 파이프라인을 보장할 수 있습니다.

5. 결론

이 글에서는 Reactive Kafka Streams와 Spring WebFlux로 통합된 방법이 어떻게 완전한 반응형 데이터 집약적 파이프라인을 가능하게 하여 확장 가능하고 효율적이며 실시간 처리 기능을 갖추게 할 수 있는지를 보여주었습니다. 반응형 패러다임을 활용하여 우리는 Kafka 주제 간의 원활한 데이터 흐름을 구축하고 비즈니스 논리를 적용하며 최소한의 코드로 높은 품질의 사건 기반 처리를 달성했습니다. 이러한 강력한 조합은 실시간 데이터 변환을 위해 설계된 견고하고 확장 가능한 시스템을 구축할 수 있는 현대 반응형 기술의 잠재력을 강조합니다.

항상 그렇듯이 이 글에서 사용된 모든 코드 샘플은 GitHub에서 확인하실 수 있습니다.

원본 출처

You may also like...

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다