본문 바로가기
Kafka

카프카 메시지 처리와 재시도 정책

by Soono991 2025. 9. 5.

 

카프카를 사용할 때 주의해야 할 점으로 아래 3가지 내용에 대해서 정리해본다.

  1. 메시지 “처리 완료”의 기준과 Ack
  2. enable.auto.commit 설정과 Lag 확인 방법
  3. addRetryableExceptions / addNotRetryableExceptions에 따른 재처리 정책

 

Ack(Offset 커밋)

카프카에서 메시지를 처리했다고 보는 기준은 단순히 리스너에서 로직을 통과했다는 게 아니라 오프셋이 커밋됐는가다.

커밋이 되지 않으면 브로커 입장에서는 “아직 처리 안 됨” 상태고, 재시도가 발생할 수 있다.

 

Ack Mode는 아래와 같이 4가지 설정이 있으며 각각 특징은 다음과 같다.

AckMode 동작 방식 커밋 시점 장/단점
RECORD 레코드 단위로 처리 후 커밋 한 개 레코드 처리 후 바로 커밋 안정적, 처리 보장
BATCH 한 번 폴링한 배치 단위로 커밋 poll()로 가져온 레코드들 처리 후 한꺼번에 커밋 처리량 유리, RTT 절약
TIME 일정 시간마다 주기적으로 커밋 ackTime 주기마다 커밋 레코드 수와 무관, 안정적인 주기
COUNT 일정 개수 처리 시마다 커밋 ackCount 개수마다 커밋 처리량 안정화
COUNT_TIME 일정 개수 또는 시간 중 먼저 도달 시 커밋 ackCount 또는 ackTime 중 빠른 조건에 따라 커밋 COUNT+TIME 하이브리드, 유연성 높음
MANUAL 개발자가 직접 ack.acknowledge() 호출 → 모아서 커밋 acknowledge 호출 후, poll 단위로 커밋 성능 유리, 배치 처리 가능
MANUAL_IMMEDIATE 개발자가 ack.acknowledge() 호출 시 즉시 커밋 acknowledge 호출 즉시 커밋 가장 보수적, 안정적

 

실무에서는 아무래도 Ack를 직접 호출해 커밋 시점을 명시적으로 제어할 수 있는 "MANUAL", "MANUAL_IMMEDIATE" 방식을 사용하는 것이 더 좋다.

 

@KafkaListener(
    topics = [Topic.PRODUCT_V1_STOCK_ADJUSTED, Topic.PRODUCT_V1_LIKE_CHANGED, Topic.PRODUCT_V1_VIEWED],
    groupId = Group.METRICS_EVENTS,
)
fun listen(
    message: String,
    ack: Acknowledgment,
    @Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
    @Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int,
    @Header(KafkaHeaders.OFFSET) offset: Long,
) {
    log.info("[MetricsV1EventConsumer.listen] message: $message")
    val event = Event.fromJson(message) ?: throw IllegalArgumentException("Invalid event message: $message")

    if (eventHandledService.isAlreadyHandled(event.eventId, Group.METRICS_EVENTS)) {
        log.info("[MetricsV1EventConsumer.listen] already handled eventId: ${event.eventId}, group: ${Group.METRICS_EVENTS}")
        ack.acknowledge()
        return
    }

    productMetricsService.handleEvent(event)
    eventHandledService.markSuccess(
        EventHandledCommand.Succeed(
            event.eventId,
            event.eventType,
            topic,
            partition,
            offset,
            Group.METRICS_EVENTS,
        ),
    )
    // 명시적으로 ack 호출
    ack.acknowledge()
}

 

컨슈머에서 파라미터로 전달 받는 Acknowledgment를 통해서 acknowledge()를 호출해주면 된다.

 

주의할 점

"MANUAL", "MANUAL_IMMEDIATE"를 사용할 때는 주의할 점은 반드시 ack.acknowledge() 를 호출해줘야 한다는 점이다.

ack.acknowledge() 를 호출하지 않으면 메세지는 처리되지 않은 상태이기 때문에 컨슈머는 반복해서 해당 메세지를 읽는다.

멱등 처리가 되어 있지 않은 경우에는 중복 메세지가 발행될 가능성이 있어 주의해야 한다.

 

enable.auto.commit

enable.auto.commit=true(기본값)이면 컨슈머가 주기적으로 자동 커밋을 해버린다.

문제는, 메세지 처리에 실패했는데도 커밋이 먼저 돼서 메시지를 유실할 수 있다는 점이다.

실무에서는 보통 enable.auto.commit=false로 두고 위에서 언급한 AckMode 중 "MANUAL", "MANUAL_IMMEDIATE"를 사용하여 커밋 시점을 직접 제어한다.

 

Retry Policy

스프링 카프카에서는 DefaultErrorHandler를 사용해 예외별로 재처리할지(DLT 전송 전 재시도), 바로 스킵할지 설정할 수 있다.

 

설정 예제

@Bean
fun errorHandler(kafkaTemplate: KafkaTemplate<Any, Any>): DefaultErrorHandler {
    val recoverer = DeadLetterPublishingRecoverer(kafkaTemplate) { r, e ->
        TopicPartition("${r.topic()}.dlt", r.partition())
    }
    val handler = DefaultErrorHandler(recoverer, FixedBackOff(100L, 3L))
    handler.addRetryableExceptions(
        RetriableException::class.java,
        TimeoutException::class.java,
        NetworkException::class.java,
        QueryTimeoutException::class.java,
        SocketTimeoutException::class.java,
        ConnectException::class.java,
        IOException::class.java,
    )
    handler.addNotRetryableExceptions(
        JsonProcessingException::class.java,
        DeserializationException::class.java,
        SerializationException::class.java,
        IllegalArgumentException::class.java,
        DataIntegrityViolationException::class.java,
        DuplicateKeyException::class.java,
    )
    return handler
}

 

기본적으로는 addRetryableExceptions는 빈 배열, addNotRetryableExceptions는 아래와 같이 6개의 예외가 등록되어 있다.

 

addNotRetryableExceptions

 

 

addRetryableExceptions

 

 

DefaultErrorHandler에는 위와 같이 retryListeners가 없기 때문에 재시도 처리를 하지 않는다.

따라서 특정 예외에 대해서 재시도를 하기 위해서는 커스터 ErrorHandler를 작성하여 retryListeners를 등록하면 된다.

'Kafka' 카테고리의 다른 글

로그 컴팩션  (1) 2023.05.07
메시지 로그 세그먼트, 로그 세그먼트  (0) 2023.05.07
컨트롤러  (0) 2023.05.01
리더에포크 & 복구  (0) 2023.05.01
ISR(In Sync Replica)  (0) 2023.05.01

댓글