본문 바로가기
Kafka

Producer

by Soono991 2023. 4. 3.

프로듀서는 카프카의 토픽으로 메시지를 전송하는 역할을 담당합니다.

 

 

출처: https://dzone.com/articles/take-a-deep-dive-into-kafka-producer-api

위 이미지는 프로듀서의 전체적인 흐름을 나타낸 것입니다.

먼저 카프카로 전송하기 위한 실제 데이터인 ProducerRecord를 생성합니다. 레코드는 Topic, Partition, Key, Value로 구성됩니다.

프로듀서가 카프카로 메시지를 전송할 때 특정 Topic으로 메시지를 전송합니다.

따라서 레코드 구성 요소에서 Topic, Value은 필수 값이며, Partition, Key는 옵션 값입니다.

 

send() 메서드를 통해 레코드를 전송하면 프로듀서는 가장 먼저 데이터가 네트워크를 통해 전송될 수 있도록 레코드를 ByteArrays로 직렬화합니다.

그런 다음 직렬화된 데이터가 파티셔너로 전송됩니다.

 

출처: https://kafka.apache.org/documentation/#theproducer

클라이언트는 메시지를 게시하는 파티션을 제어합니다. 
...
우리는 사용자가 파티션에 대한 키를 지정하고 이를 사용하여 파티션으로 해시할 수 있도록 함으로써 시맨틱 파티션을 위한 인터페이스를 노출합니다(필요한 경우 파티션 기능을 재정의하는 옵션도 있습니다). 
예를 들어 선택한 키가 사용자 ID인 경우 지정된 사용자에 대한 모든 데이터가 동일한 파티션으로 전송됩니다.
...

 

 

💡이 때, 레코드에서 파티션을 설정했다면 파티셔너는 아무 동작도 하지 않고 지정된 파티션으로 레코드를 전달합니다.
하지만 파티션을 설정하지 않은 경우에는 파티셔너가 일반적으로 레코드의 key를 기반으로 파티션을 선택합니다.

만약 레코드의 key가 설정되지 않았다면 아래와 같이 버전에 따라 기본 파티션 분배 전략이 적용됩니다.
ㆍkafka 2.4 버전 이전 : Round Robin (라운드 로빈)
ㆍkafka 2.4 버전 이후 : Sticky Partitioning (스티키 파티셔닝)

📢 Partitioner(파티셔너)에 대해서는 따로 정리해 보도록 하겠습니다.

--230410 링크 추가

 

Partitioner & Partition

💡 이번 포스팅에서는 Partitioner & Partition에 대해 알아보도록 하겠습니다. Topic을 생성할 때 파티션을 지정하지 않을 경우 기본적으로 해당 토픽에 대한 1개의 파티션이 생성이 됩니다. 따라서 토

soono-991.tistory.com

 

 

send() 메서드 이후 레코드들을 파티션별로 잠시 모아두게 됩니다. 레코드들을 모아두는 이유는 프로듀서가 카프카로 전송하기 전, 배치 전송을 하기 위함입니다.

 

별도의 Thread가 해당 레코드 배치를 적절한 카프카 브로커로 보내는 역할을 합니다. 브로커가 메시지를 수신하면 응답을 다시 보냅니다.

메시지가 카프카에 성공적으로 기록된 경우 토픽, 파티션 및 파티션 내 레코드 offset과 함께 RecordMetadata가 반환됩니다. 브로커가 메시지 쓰기에 실패하면 오류를 반환합니다. 프로듀서가 오류를 수신하면 포기하고 오류를 반환하기 전 메시지 전송을 몇 번 더 재시도할 수 있습니다.

 

정리하면 전송이 실패하면 재시도 동작이 이뤄지고, 지정된 횟수만큼의 재시도가 실패하면 최종 실패를 전달하며, 전송이 성공하면 메타데이터를 반환하게 됩니다.

 

 

배치 전송

카프카에서는 토픽의 처리량을 높이기 위한 방법으로 토픽을 파티션으로 나눠 처리하며, 카프카 클라이언트인 프로듀서에서는 처리량을 높이기 위해 배치 전송을 권장합니다. 따라서 프로듀서에서는 카프카로 전송하기 전, 배치 전송을 위해 위 이미지처럼 토픽의 파티션별로 레코드들을 잠시 보관하고 있습니다.

 

프로듀서에서는 배치 전송을 위해 다음과 같은 옵션을 제공하고 있습니다.

  • buffer.memory
    • 카프카로 메시지들을 전송하기 위해 담아두는 프로듀서의 버퍼 메모리 옵션입니다.
    • 기본 값은 32MB이며, 설정값을 조정할 수 있습니다.
  • batch.size
    • 배치 전송을 위해 메시지(레코드)들을 묶는 단위를 설정하는 배치 크기 옵션입니다.
    • 기본값은 16MB이며, 설정값을 조정할 수 있습니다.
  • linger.ms
    • 배치 전송을 위해 버퍼 메모리에서 대기하는 메시지들의 최대 대기시간을 설정하는 옵션입니다.
    • 단위는 밀리초(ms)이며 기본값은 0입니다.
    • 기본값 0으로 설정하면, 배치 전송을 위해 기다리지 않고 메시지들이 즉시 전송됩니다.

프로듀서의 배치 전송 방식은 단건의 메시지를 전송하는 것이 아니라 한 번에 다량의 메시지를 묶어서 전송하는 방법입니다.

이러한 배치 전송은 불필요한 I/O를 줄일 수 있어 매우 효율적이며, 더불어 카프카의 요청 수를 줄여주는 효과도 있습니다.

 

 

 

출처: https://kafka.apache.org/documentation/#theproducer

일괄 처리는 효율성의 큰 동인 중 하나이며 Kafka 생산자는 일괄 처리를 활성화하기 위해 메모리에 데이터를 축적하고 단일 요청으로 더 큰 일괄 처리를 보내려고 시도합니다. 
일괄 처리는 고정된 수의 메시지만 축적하고 고정된 지연 시간(예: 64k 또는 10ms)보다 더 이상 기다리지 않도록 구성할 수 있습니다. 
이렇게 하면 더 많은 바이트를 보낼 수 있고 서버에서 더 큰 I/O 작업을 거의 수행할 수 없습니다. 
이 버퍼링은 구성 가능하며 더 나은 처리량을 위해 소량의 추가 대기 시간을 교환할 수 있는 메커니즘을 제공합니다.

 

하지만 장점이 많다고 해서 무조건 배치 처리만 해야 하는 것은 아니며, 카프카를 사용하는 목적에 따라 처리량을 높일지, 아니면 지연 없는 전송을 해야 할지 선택을 해야 합니다.

처리량을 높이려면 batch.size와 linger.ms의 값을 크게 설정해야 하고, 지연 없는 전송이 목표라면 batch.size와 linger.ms의 값을 작게 설정해야 합니다.

 

사용자가 프로듀서의 높은 처리량을 목표로 배치 전송을 설정하는 경우 주의해야 할 사항이 있습니다. 바로 버퍼 메모리 크기가 충분히 커야 한다는 점입니다. 즉 buffer.memory 크기는 반드시 batch.size보다 커야 합니다.

 

배치 전송과 더불어 압축 기능을 같이 사용한다면, 프로듀서는 메시지들을 더욱 효율적으로 카프카로 전송할 수 있습니다.

클라이언트를 포함해 카프카에서는 gzip, snappy, lz4, zstd등의 압축 포맷을 지원합니다.

높은 압축률을 선호한다면 gzip, zstd를 선택하는 것이 좋고

낮은 지연시간을 선호한다면 lz4, snappy를 선택하는 것이 좋습니다.

 

📢 압축과 관련해서는 따로 정리해보도록 하겠습니다.

 

 

 

 

중복 없는 전송

데이터 프로세싱 과정에서 메시지가 중복되지 않고 처리된다면 데이터 처리 업무를 담당하는 데이터 엔지니어나 개발자에게 매우 유용할 것입니다. 

일부 서비스는 메시지 중복을 허용하는 경우도 있지만, 특정 서비스에서 메시지가 중복 처리된다면 치명적인 상황이 발생할 수 있습니다. 만약 온라인 쇼핑 서비스에서 소비자의 구매 내역이 중복 처리되어 동일한 제품이 소비자에게 두 번 배송되거나 중복 결제가 발생한다면, 해당 온라인 쇼핑 서비스를 하는 기업의 입장에서 매우 큰 문제가 아닐 수 없습니다.

 

이에 따라 카프카는 사용자들의 개발 편의를 높이기 위해 중복 없이 전송할 수 있는 기능을 제공합니다.

 

메시지 시스템들의 메시지 전송 방식에는

1) 적어도 한 번 전송 (at-least-one)

2) 최대 한 번 전송 (at-most-one)

3) 중복 없는 전송

4) 정확히 한 번 전송 (exactly-once)

이 있습니다.

 

 

적어도 한 번 전송 (at-least-one)

 

출처: https://dlgldgldgld.github.io/kafka/kafka-producer/

1. 프로듀서가 브로커의 특정 토픽으로 message1을 전송합니다.
2. 브로커는 message1을 기록하고, 잘 받았다는 ACK를 프로듀서에게 응답합니다.
3. 브로커의 ACK를 받은 프로듀서는 다음 메시지인 message2를 브로커에게 전송합니다.
4. 브로커는 message2를 기록하고, 잘 받았다는 ACK를 프로듀서에게 전송하려고 합니다.
하지만 네트워크 오류 또는 브로커 장애가 발생하여 결국 프로듀서는 message2에 대한 ACK를 받지 못합니다.
5. message2를 전송한 후 브로커로부터 ACK를 받지 못한 프로듀서는 브로커가 message2를 받지 못했다고 판단해 message2를 재전송합니다.

 

현 상황을 다시 정리해보면, 프로듀서 입장에서는 브로커가 메시지를 저장하고 ACK만 전송하지 못한 것인지, 메시지를 저장하지 못해서 ACK를 전송하지 못한 것인지는 정확히 알 수 없습니다.

 

하지만 message2에 대한 ACK를 받지 못한 프로듀서는 적어도 한 번 전송 방식에 따라 message2를 다시 한 번 전송합니다.

만약 브로커가 message2를 저장하고 ACK만 전송하지 못한 상황이었다면 message2는 브로커에 중복 저장될 것입니다.

 

 

 

최대 한 번 전송 (at-most-one)

출처: https://dlgldgldgld.github.io/kafka/kafka-producer/

 

최대 한 번 전송은 ACK를 받지 못하더라도 재전송을 하지 않습니다.

최대 한 번 전송 과정에서 프로듀서는 메시지의 중복 가능성을 회피하기 위해 재전송을 하지 않습니다.

다시 말해, 일부 메시지의 손실을 감안하더라도 중복 전송은 하지 않는 경우입니다.

최대 한 번 전송은 일부 메시지가 손실되더라도 높은 처리량을 필요로 하는 대량의 로그 수집이나 IoT 같은 환경에서 사용하곤 합니다.

 

📢 정리하자면, 
적어도 한 번 전송은 메시지 손실 가능성은 없지만 메시지 중복 가능성이 존재하고
최대 한 번 전송은 메시지 손실 가능성은 있지만 메시지 중복 가능성은 없습니다.

 

중복 없는 전송

출처: https://dlgldgldgld.github.io/kafka/kafka-producer/

중복 없는 전송은 적어도 한 번 전송 과정과 동일합니다.

하지만 프로듀서가 message2를 다시 전송한 후 브로커의 동작은 차이가 있습니다.

프로듀서가 재전송한 message2의 헤더에서 PID(0)와 메시지 번호(1)를 비교해서 message2가 이미 브로커에 저장되어 있는 것을 확인한 브로커는 메시지를 중복 저장하지 않고 ACK만 보냅니다. 이러한 브로커의 동작 덕분에 브로커에 저장된 메시지는 중복을 피할 수 있게 됩니다.

 

카프카에서 이용하는 PID메시지 번호가 바로 중복 없는 전송의 핵심입니다.

프로듀서가 중복 없는 전송을 시작하면, 프로듀서는 고유한 PID를 할당받게 되고, 이 PID와 메시지에 대한 번호를 메시지의 헤더에 포함해 메시지를 전송합니다.

브로커에서는 이 정보를 이용해 브로커에 기록된 메시지들의 중복 여부를 알 수 있습니다.

PID는 사용자가 별도로 생성하는 것이 아니며 프로듀서에 의해 자동 생성됩니다.

또한 이 PID는 프로듀서와 카프카 사이에서 내부적으로만 이용되므로 사용자에게 따로 노출되지 않습니다.

 

PID와 메시지 번호 정보는 브로커의 메모리에 유지되고, 리플리케이션 로그에도 저장됩니다.

따라서 예기치 못한 브로커의 장애 등으로 리더가 변경되는 일이 발생하더라도 새로운 리더가 PID와 시퀀스 번호를 정확히 알 수 있으므로 중복 없는 메시지 전송이 가능합니다.

 

중복 없는 전송을 위한 프로듀서 설정

프로듀서 옵션 설명
enable.idempotence true 프로듀서가 중복 없는 전송을 허용할지 결정하는 옵션입니다. 기본 값은 false이며, true로 변경 시 다음에 나오는 옵션들도 반드시 변경해야 합니다.
그렇지 않으면 ConfigException이 발생합니다.
max.in.flight.requests.per.connection 1 ~ 5 ACK를 받지 않은 상태에서 하나의 커넥션에서 보낼 수 있는 최대 요청 수입니다.
기본값은 5이며, 5 이하로 설정해야 합니다.
acks all 프로듀서 acks와 관련된 옵션으로써, 기본값은 1이며 all로 설정해야 합니다.
retries 5 ACK를 받지 못한 경우 재시도를 해야 하므로 0보다 큰 값으로 설정해야 합니다.

 

정확히 한 번 전송 (exactly-once)

데이터 처리나 가공 작업을 하는 대부분의 사람들은 데이터 파이프라인에서 메시지 중복이나 손실이 발생하지 않기를 원합니다.

가장 이상적인 상황이라면 모든 메시지를 정확히 한번 처리해 주기를 원합니다.

 

만약 메시지를 처리하는 애플리케이션에서 중복 없는 전송 또는 정확히 한 번 처리하는 기능을 제공한다면, 이는 매우 반가운 일입니다.

카프카에서 정확히 한 번 전송은 트랜잭션과 같은 전체적인 프로세스 처리를 의미하며, 중복 없는 전송은 정확히 한 번 전송의 일부 기능이라 할 수 있습니다.

 

전체적인 프로세스를 관리하기 위해 카프카에서는 정확히 한 번 처리를 담당하는 별도의 프로세스가 있는데 이를 트랜잭션 API라고 부릅니다.

 

📢 트랜잭션 API에 대해서는 추후 내용을 보강하도록 하겠습니다. (23.05.07)

 

Transaction Coordinator

카프카에는 트랜잭션 코디네이터(Transaction Coordinator)라는 것이 존재합니다.

트랜잭션 코디네이터의 역할은 프로듀서에 의해 전송된 메시지를 관리하며, 커밋 또는 중단을 표시합니다.

트랜잭션은 트랜잭션 로그를 카프카의 내부 토픽인 __transaction_state에 저장합니다.

(컨슈머 오프셋 관리를 위한 오프셋 정보를 저장하는 __consumer_offsets와 동일합니다.)

 

__transaction_state도 내부 토픽이지만 이 역시도 토픽이기 때문에 파티션 수와 리플리케이션 팩터 수가 존재하며,
브로커 설정을 통해 설정할 수 있습니다.

ㆍtransaction.state.log.num.partitions = 50
ㆍtransaction.state.log.replication.factor = 3

 

프로듀서는 트랜잭션 관련 정보를 트랜잭션 코디네이터에게 알리고 모든 정보의 로그는 트랜잭션 코디네이터가 직접 기록합니다.

 

정확히 한 번 전송을 이용해 전송된 메시지들이 카프카에 저장되면, 카프카의 메시지를 다루는 클라이언트들은 해당 메시지들이 정상적으로 커밋된 것인지 또는 실패한 것인지 식별할 수 있어야 합니다.

카프카에서는 이를 식별하기 위한 정보로 컨트롤 메시지라고 불리는 특별한 타입의 메시지가 추가로 사용됩니다.

 

 

단계별 동작

 

가장 먼저 트랜잭션 코디네이터를 찾습니다.

프로듀서는 브로커에게 FindCoorinatorRequest를 보내서 트랜잭션 코디네이터의 위치를 찾습니다.

트랜잭션 코디네이터는 브로커에 위치하고 있으며,

PID(Producer ID)와 transactional.id를 매핑하고 해당 트랜잭션 전체를 관리하는 것입니다.

이때 트랜잭션 코디네이터가 존재하지 않을 경우 트랜잭션 코디네이터를 새로 생성합니다.

 

__transaction_state 토픽의 파티션 번호는 transactional.id를 기반으로 해시하여 결정되고, 이 파티션의 리더가 있는 브로커가 트랜잭션 코디네이터의 브로커로 최종 선정됩니다.

 

프로듀서는 initTransactions() 메서드를 이용해 트랜잭션 전송을 위한 InitPidRequest를 트랜잭션 코디네이터로 보냅니다.

이 때 TID(transactional.id)가 설정된 경우에는 InitPidRequest와 함께 TID가 트랜잭션 코디네이터로 전송됩니다.

 

트랜잭션 코디네이터는 PID, TID를 매핑하고 해당 정보를 트랜잭션 로그에 기록합니다.

그 후 PID 에포크를 한 단계 올리는 동작을 하고, PID 에포크가 올라감에 따라 이전의 동일한 PID와 이전 에포크에 대한 쓰기 요청은 무시됩니다.

 

 

 

프로듀서는 beginTransaction() 메서드를 이용해 새로운 트랜잭션의 시작을 알립니다.

이때 프로듀서는 내부적으로 트랜잭션이 시작됐음을 기록하지만, 트랜잭션 코디네이터의 입장에서는 첫 번째 레코드가 전송될 때까지 트랜잭션이 시작된 것은 아닙니다.

 

 

트랜잭션 코디네이터는 전체 트랜잭션을 관리합니다.

프로듀서는 토픽 파티션 정보를 트랜잭션 코디네이터에게 전달하고, 트랜잭션 코디네이터는 해당 정보를 트랜잭션 로그에 기록합니다.

트랜잭션 로그에 기록될 때, 트랜잭션의 현재 상태를 Ongoing으로 표시합니다.

 

프로듀서는 대상 토픽의 파티션으로 메시지를 전송합니다.

 

 

메시지 전송을 완료한 프로듀서는 commitTransaction() 메서드 또는 abortTransaction() 메서드 중 하나를 반드시 호출해야 하며, 해당 메서드의 호출을 통해 트랜잭션이 완료됨을 트랜잭션 코디네이터에게 알립니다.

트랜잭션 코디네이터는 두 단계의 커밋 과정을 시작하게 되며, 첫 번째 단계로 트랜잭션 로그에 해당 트랜잭션에 대한 PrepareCommit 또는 PrepareAbort를 기록합니다.

 

 

트랜잭션 코디네이터는 두 번째 단계로서 트랜잭션 로그에 기록된 토픽의 파티션에 트랜잭션 커밋 표시를 기록합니다.

여기서 기록하는 메시지가 바로 컨트롤 메시지입니다.

 

 

트랜잭션 코디네이터는 완료됨(Committed)이라고 트랜잭션 로그에 기록합니다.

그리고 프로듀서에게 해당 트랜잭션이 완료됨을 알린 다음 해당 트랜잭션에 대한 처리는 모두 마무리됩니다.

 

 

참고 자료

 

카프카 완벽 가이드 - 코어편 - 인프런 | 강의

카프카(Kafka)의 핵심부터 내부 메커니즘에 대한 심화 수준의 내용까지, 상세한 이론 설명과 핸즈온 실습 & 실전 카프카 애플리케이션 개발 실습을 통해 카프카를 시작하는 사람도 단숨에 전문가

www.inflearn.com

 

댓글