Kafka Producer and Consumer Message Acknowledgement Options

1. 소개

우리가 알고 있듯이, Apache Kafka는 메시징 및 스트리밍 시스템입니다. 신뢰성 보장을 위해 확인 옵션을 제공합니다. 이 튜토리얼에서는 Apache Kafka에서 프로듀서 및 소비자의 확인 옵션에 대해 알아보겠습니다.

2. 프로듀서 확인 옵션

신뢰성이 설정된 Kafka 브로커가 있더라도 프로듀서도 신뢰성을 갖추도록 구성해야 합니다. 우리는 세 가지 확인 모드 중 하나를 사용하여 프로듀서를 구성할 수 있으며, 다음에서 다룰 것입니다.

2.1. 확인 없음

우리는 속성 acks0으로 설정할 수 있습니다:

static KafkaProducer<String, String> producerack0;

static Properties getProducerProperties() {
    Properties producerProperties = new Properties();
    producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return producerProperties;
}

static void setUp() throws IOException, InterruptedException {
    Properties producerProperties = getProducerProperties();
    producerProperties.put(ProducerConfig.ACKS_CONFIG,
      "0");
    producerack0 = new KafkaProducer<>(producerProperties);
}

이 구성에서 프로듀서는 브로커로부터 응답을 기다리지 않습니다. 메시지가 성공적으로 전송되었다고 가정합니다. 만약 문제가 발생하여 브로커가 메시지를 받지 못하면 프로듀서는 이를 알지 못하고 메시지가 손실됩니다.

그러나 프로듀서가 서버의 응답을 기다리지 않기 때문에 네트워크가 지원하는 한 최대한 빨리 메시지를 보낼 수 있어 높은 처리량을 달성할 수 있습니다.

메시지가 Kafka에 성공적으로 기록되려면 프로듀서가 네트워크를 통해 발송하는 데 성공해야 합니다. 객체를 직렬화할 수 없거나 네트워크 카드가 실패하는 등의 오류로 인해 클라이언트 측에서 메시지 전송이 실패할 수 있습니다. 그러나 브로커에 도달한 후에는 파티션이 오프라인 상태이거나 리더 선출이 진행 중이거나 Kafka 클러스터가 전체적으로 사용할 수 없는 경우에도 오류가 발생하지 않습니다.

acks=0으로 실행하면 프로듀서 지연 시간이 짧아지지만, 소비자가 모든 메시지를 복제를 완료할 때까지 볼 수 없기 때문에 종단 간 지연이 개선되지 않습니다.

2.2. 리더 확인

대신 속성 acks1로 설정할 수 있습니다:

static KafkaProducer<String, String> producerack1;

static Properties getProducerProperties() {
    Properties producerProperties = new Properties();
    producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return producerProperties;
}

static void setUp() throws IOException, InterruptedException {
    Properties producerack1Prop = getProducerProperties();
    producerack1Prop.put(ProducerConfig.ACKS_CONFIG,
      "1");
    producerack1 = new KafkaProducer<>(producerack1Prop);
}

리더 복제본이 메시지를 수신하는 순간 프로듀서는 브로커로부터 성공적인 응답을 받습니다. 만약 메시지를 리더에 쓸 수 없는 오류가 발생하면 프로듀서는 오류 응답을 받고 메시지를 다시 전송하여 데이터 손실 가능성을 피할 수 있습니다.

리더가 최신 메시지를 새로운 리더에게 복제하기 전에 시스템이 최신 메시지를 복제하기 전에 충돌이 발생하면 메시지를 잃을 수 있습니다.

리더가 메시지를 수신하고 파티션 데이터 파일에 기록한 순간 확인 또는 오류가 전송됩니다. 리더가 종료되거나 충돌하면 데이터를 잃을 수 있습니다. 이 충돌로 인해 일부 성공적으로 기록되고 확인된 메시지가 충돌 전에 팔로어에게 복제되지 않을 수 있습니다.

acks=1 구성으로 리더가 메시지를 복제할 수 있는 속도보다 더 빠르게 쓸 수 있는 경우, 우리는 과소 복제된 파티션을 가지게 되며, 리더가 프로듀서로부터 메시지를 확인하기 전에 복제합니다. 지연 시간은 한 복제본이 메시지를 수신할 때까지 기다리기 때문에 acks=0 구성보다 높습니다.

2.3. 모든 확인

우리는 속성을 acksall로 설정할 수 있습니다:

static KafkaProducer<String, String> producerackAll;

static Properties getProducerProperties() {
    Properties producerProperties = new Properties();
    producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return producerProperties;
}

static void setUp() throws IOException, InterruptedException {
    Properties producerackAllProp = getProducerProperties();
    producerackAllProp.put(ProducerConfig.ACKS_CONFIG,
      "all");
    producerackAll = new KafkaProducer<>(producerackAllProp);
}

프로듀서는 모든 동기화된 복제본이 메시지를 수신한 후 브로커로부터 성공적인 응답을 받습니다. 이는 가장 안전한 모드이며, 여러 브로커에 메시지가 있다는 것을 확신할 수 있고, 크래시 발생 시에도 메시지가 손실되지 않습니다.

리더는 모든 동기화된 복제본이 메시지를 수신할 때까지 기다린 후 확인 또는 오류를 전송합니다. 브로커의 min.insync.replicas 구성은 프로듀서가 메시지를 확인할 때 수신해야 하는 최소 복제본 수를 지정할 수 있습니다.

이것은 가장 안전한 옵션입니다. 프로듀서는 메시지가 완전히 커밋될 때까지 보내려고 시도합니다. 이 구성에서 프로듀서 지연 시간은 가장 높습니다. 프로듀서는 모든 동기화된 복제본이 모든 메시지를 수신할 때까지 기다린 후 메시지 배치를 “완료”로 표시하고 계속할 수 있습니다.

acks 속성을 값 -1로 설정하는 것은 all로 설정하는 것과 같습니다.

속성 **acks0, 1 또는 all/-1의 세 가지 가능한 값 중 하나로만 설정할 수 있습니다. 이 세 가지 값 외의 다른 값으로 설정하면 Kafka는 ConfigException을 발생시킵니다.

acks 구성 1all의 경우, 프로듀서 속성 retries, retry.backoff.ms, 및 delivery.timeout.ms를 사용하여 프로듀서 재시도를 처리할 수 있습니다.

3. 소비자 확인 옵션

데이터는 Kafka가 이를 커밋된 것으로 표시한 후 소비자에게 접근 가능해지며, 이는 시스템이 데이터를 모든 동기화된 복제본에 기록했음을 보장합니다. 이를 통해 소비자는 일관된 데이터를 수신할 수 있습니다. 그들의 유일한 책임은 읽은 메시지와 읽지 않은 메시지를 추적하는 것입니다. 이는 메시지 소비 시 손실되지 않도록 하는 데 중요합니다.

소비자가 파티션에서 데이터를 읽을 때, 소비자는 메시지 배치를 가져오고, 배치에서 마지막 오프셋을 확인한 후 마지막 수신된 오프셋을 시작으로 다른 메시지 배치를 요청합니다. 이는 Kafka 소비자가 항상 새로운 데이터를 올바른 순서로 받으며 메시지를 놓치지 않도록 보장합니다.

소비자의 원하는 신뢰성을 위한 구성에 대해 이해해야 할 네 가지 소비자 구성 속성이 있습니다.

3.1. 그룹 ID

각 Kafka 소비자는 group.id 속성으로 식별되는 그룹에 속합니다.

예를 들어, 그룹에 두 개의 소비자가 있는 경우 두 소비자는 동일한 그룹 ID를 가집니다. Kafka 시스템은 각 소비자를 주제의 파티션 일부에 할당합니다. 각 소비자는 개별적으로 메시지의 일부를 읽습니다. 그룹 전체가 모든 메시지를 읽습니다.

소비자가 구독된 주제에서 모든 메시지를 따로 확인해야 하는 경우, 고유한 group.id가 필요합니다.

3.2. 자동 오프셋 재설정

속성 auto.offset.reset는 커밋된 오프셋이 없거나 유효하지 않은 커밋된 오프셋으로 분할을 읽기 시작할 때 소비자 동작을 결정합니다:

static Properties getConsumerProperties() {
    Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return consumerProperties;
}

Properties consumerProperties = getConsumerProperties(); 
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
    // ...
}

또는

static Properties getConsumerProperties() {
    Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return consumerProperties;
}

Properties consumerProperties = getConsumerProperties(); 
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
    // ...
}

기본 값은 latest로, 의미는 유효한 오프셋이 없으면 소비자가 최신 레코드부터 읽기 시작한다는 것입니다. 소비자는 실행을 시작한 후에 작성된 레코드만 고려합니다.

속성의 대안 값은 earliest입니다. 이는 유효한 오프셋이 없으면 소비자가 파티션의 시작부터 모든 데이터를 읽게 됩니다. auto.offset.resetnone으로 설정되면 유효하지 않은 오프셋에서 소비할 때 예외가 발생합니다.

3.3. 자동 커밋 활성화

속성 enable.auto.commit은 소비자가 오프셋을 자동으로 커밋할지 여부를 제어하며 기본값은 true입니다:

static Properties getConsumerProperties() {
    Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return consumerProperties;
}

Properties consumerProperties = getConsumerProperties(); 
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
    // ...
}

false로 설정하면 시스템이 오프셋을 커밋할 시점을 제어할 수 있습니다. 이는 중복을 최소화하고 데이터 손실을 피하는 데 도움이 됩니다.

enable.auto.commit 속성을 true로 설정하면 auto.commit.interval.ms를 사용하여 커밋 주기를 제어할 수 있습니다.

3.4. 자동 커밋 간격

자동 커밋 구성에서 auto.commit.interval.ms 속성은 Kafka 시스템이 오프셋을 커밋하는 빈도를 구성합니다:

static Properties getConsumerProperties() {
    Properties consumerProperties = new Properties();
    consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
      KAFKA_CONTAINER.getBootstrapServers());
    return consumerProperties;
}

Properties consumerProperties = getConsumerProperties(); 
consumerProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 7000); 
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
    // ...
}

기본값은 5초마다입니다. 일반적으로 더 자주 커밋하면 오버헤드가 추가되지만, 소비자가 중지될 경우 중복이 발생하는 횟수를 줄입니다.

4. 결론

이 기사에서는 Apache Kafka의 프로듀서 및 소비자 확인 옵션에 대해 배우고 이를 활용하는 방법을 알아보았습니다. Kafka의 확인 옵션은 개발자가 성능과 신뢰성 간의 균형을 미세 조정하여 다양한 사용 사례에 적합한 다재다능한 시스템을 만들 수 있도록 합니다.

언제나 그렇듯이 이 기사에서 사용된 전체 코드는 GitHub에서 확인할 수 있습니다.

원본 출처

You may also like...

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다