๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
Kafka

KafkaProducer

by Soono991 2023. 4. 4.
๐Ÿ’ก ์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” KafkaProducer ๊ฐ์ฒด์— ๋Œ€ํ•ด ์•Œ์•„๋ณด๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

 

kafka์˜ producer์— ๋Œ€ํ•ด ๊ถ๊ธˆํ•˜์‹  ๋ถ„๋“ค์€ ์•„๋ž˜ producer์— ๋Œ€ํ•ด์„œ ์ •๋ฆฌํ•œ ํฌ์ŠคํŒ…์ด ์žˆ์œผ๋‹ˆ ์ฐธ๊ณ ํ•˜์‹œ๋ฉด ์ข‹์„ ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.

 

 

Producer

ํ”„๋กœ๋“€์„œ๋Š” ์นดํ”„์นด์˜ ํ† ํ”ฝ์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•˜๋Š” ์—ญํ• ์„ ๋‹ด๋‹นํ•ฉ๋‹ˆ๋‹ค. ์œ„ ์ด๋ฏธ์ง€๋Š” ํ”„๋กœ๋“€์„œ์˜ ์ „์ฒด์ ์ธ ํ๋ฆ„์„ ๋‚˜ํƒ€๋‚ธ ๊ฒƒ์ž…๋‹ˆ๋‹ค. ๋จผ์ € ์นดํ”„์นด๋กœ ์ „์†กํ•˜๊ธฐ ์œ„ํ•œ ์‹ค์ œ ๋ฐ์ดํ„ฐ์ธ ProducerRecord๋ฅผ ์ƒ

soono-991.tistory.com

 

kafka๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด ์˜์กด์„ฑ์„ ์ถ”๊ฐ€ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

 

implementation 'org.apache.kafka:kafka-clients:3.1.2'

 

๊ทธ๋ฆฌ๊ณ  kafka-client์—์„œ ์ œ๊ณตํ•˜๋Š” KafkaProducer ๊ฐ์ฒด๋ฅผ ํ†ตํ•ด producer๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

์ด๋•Œ, producer ์„ค์ • ์ •๋ณด๋ฅผ ์ „๋‹ฌํ•ด์ค˜์•ผ ํ•˜๋Š”๋ฐ Bootstrap-server, key-serializer, value-serializer ์ •๋ณด๋Š” ํ•„์ˆ˜ ๊ฐ’์ž…๋‹ˆ๋‹ค.

Properties props = new Properties();

// bootstrap.servers, key.serializer.class, value.serializer.class
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.133:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// KafkaProducer ๊ฐ์ฒด ์ƒ์„ฑ
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props)

 

๊ทธ๋ฆฌ๊ณ  ProducerRecord ๊ฐ์ฒด๋ฅผ ์ƒ์„ฑํ•˜์—ฌ ํ”„๋กœ๋“€์„œ์— ์ „๋‹ฌํ•  ๋ฉ”์‹œ์ง€๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

// ProducerRecord ๊ฐ์ฒด ์ƒ์„ฑ
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "hello world");

๊ธฐ๋ณธ์ ์œผ๋กœ๋Š” ํ† ํ”ฝ, ๊ฐ’์€ ํ•„์ˆ˜๊ฐ’์ด๋ฉฐ key, partition์€ ์„ ํƒ์— ๋”ฐ๋ผ ์„ค์ •ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

๐Ÿ“ข producer ํฌ์ŠคํŒ… ์ฐธ๊ณ 

 

์ด๋ฅผ ํ™•์ธํ•ด๋ณด๊ธฐ ์œ„ํ•ด ProducerRecord ๊ฐ์ฒด์˜ ์ƒ์„ฑ์ž๋ฅผ ํ™•์ธํ•ด ๋ณด๋ฉด ๋‹ค์–‘ํ•œ ์ƒ์„ฑ์ž๋“ค์ด ์˜ค๋ฒ„๋กœ๋”ฉ ๋˜์–ด ์žˆ๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

ํ”„๋กœ๋“€์„œ๋กœ ๋ณด๋‚ผ ๋ฉ”์‹œ์ง€๋ฅผ ์ƒ์„ฑํ–ˆ์œผ๋ฉด ์ด์ œ send() ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ์นดํ”„์นด ๋ธŒ๋กœ์ปค์— ์ „๋‹ฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

// KafkaProducer ๋ฉ”์‹œ์ง€ ์ „์†ก
kafkaProducer.send(producerRecord);โ€‹

 

send() ๋ฉ”์„œ๋“œ๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ๋น„๋™๊ธฐ ๋ฐฉ์‹์œผ๋กœ ๋ฉ”์‹œ์ง€๊ฐ€ ์ „์†ก๋˜๋ฉฐ callback ๊ณผ get() ๋ฉ”์„œ๋“œ๋“ฑ์„ ํ†ตํ•ด ๋™๊ธฐ/๋น„๋™๊ธฐ ๋ฐฉ์‹์„ ์„ ํƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

 

๋™๊ธฐ ๋ฐฉ์‹

send() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด ๊ธฐ๋ณธ์ ์œผ๋กœ Future<RecordMetadata> ํƒ€์ž…์ด ๋ฐ˜ํ™˜๋ฉ๋‹ˆ๋‹ค.

Future ๊ฐ์ฒด์˜ get() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•จ์œผ๋กœ์จ ํ”„๋กœ๋“€์„œ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•˜๋Š” ๋ฐฉ์‹์„ ๋™๊ธฐ์ ์œผ๋กœ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๊ทธ๋ฆฌ๊ณ  ๋ฐ˜ํ™˜ ํƒ€์ž…์€ RecordMetadata๊ฐ€ ๋˜๋ฉฐ ๋ฐ˜ํ™˜๋œ RecordMetadata์— ์ „๋‹ฌํ•œ ๋ฉ”์‹œ์ง€์˜ ํ† ํ”ฝ, ํŒŒํ‹ฐ์…˜, ์˜คํ”„์…‹๊ณผ ๊ฐ™์€ ์ •๋ณด๋“ค์ด ๋‹ด๊ฒจ ์žˆ์Šต๋‹ˆ๋‹ค.

try {
    RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
    logger.info("\n ##### record metadata received ##### \n");
    logger.info("partition : {}", recordMetadata.partition());
    logger.info("offset : {}", recordMetadata.offset());
    logger.info("timestamp : {}", recordMetadata.timestamp());
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

 

๋น„๋™๊ธฐ ๋ฐฉ์‹

send()๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ๋น„๋™๊ธฐ ๋ฐฉ์‹์ด๊ธฐ ๋•Œ๋ฌธ์— ๋น„๋™๊ธฐ ๋ฐฉ์‹์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ „๋‹ฌํ•˜๊ณ  ์‹ถ์€ ๊ฒฝ์šฐ์—๋Š” ํŠน๋ณ„ํ•œ ์„ค์ •์„ ํ•  ํ•„์š”๋Š” ์—†์ง€๋งŒ, ๋ฉ”์‹œ์ง€ ์ „์†ก ๊ฒฐ๊ณผ๋ฅผ ๋ฐ›์•„๋ณด๊ณ  ์‹ถ์€ ๊ฒฝ์šฐ์—๋Š” callback์„ ์‚ฌ์šฉํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

// KafkaProducer ๋ฉ”์‹œ์ง€ ์ „์†ก (๋น„๋™๊ธฐ, callback ์‚ฌ์šฉ)
kafkaProducer.send(producerRecord, (metadata, exception) -> {
    if (exception == null) {
        logger.info("\n ##### record metadata received ##### \n");
        logger.info("partition : {}", metadata.partition());
        logger.info("offset : {}", metadata.offset());
        logger.info("timestamp : {}", metadata.timestamp());
    } else {
        logger.error("exception error from broker {}", exception.getMessage());
    }
});

callback์„ ์‚ฌ์šฉํ•˜๋ฉด RecordMetadata, Exception์ด ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ์ „๋‹ฌ๋˜๋Š”๋ฐ, ๋ฉ”์‹œ์ง€ ์ „์†ก์— ์‹คํŒจํ•  ๊ฒฝ์šฐ Exception ๊ฐ์ฒด์— ์˜ˆ์™ธ ์ •๋ณด๊ฐ€ ๋‹ด๊ฒจ์žˆ์Šต๋‹ˆ๋‹ค.

๋”ฐ๋ผ์„œ ๊ฐœ๋ฐœ์ž๊ฐ€ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๋Š” ์ถ”๊ฐ€ ์ž‘์—…์ด ํ•„์š”ํ•˜๋ฉฐ, RecordMetadata์—๋Š” ๋™๊ธฐ ๋ฐฉ์‹๊ณผ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ์ „์†กํ•œ ๋ฉ”์‹œ์ง€์˜ ๊ด€ํ•œ ์ •๋ณด๊ฐ€ ๋‹ด๊ฒจ ์žˆ์Šต๋‹ˆ๋‹ค.

 

๊ธฐ๋ณธ์ ์œผ๋กœ๋Š” lambda๋‚˜ ์ต๋ช… ํด๋ž˜์Šค๋กœ callback์„ ์ž‘์„ฑํ•˜๋Š”๋ฐ ๋ณ„๋„์˜ ํด๋ž˜์Šค๋กœ ์ž‘์„ฑํ•˜์—ฌ callback์„ ๊ตฌํ˜„ํ•  ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

public class CustomCallback implements Callback {

    private static final Logger logger = LoggerFactory.getLogger(CustomCallback.class.getName());

    private int seq;

    public CustomCallback(int seq) {
        this.seq = seq;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            logger.info("seq: {} partition: {}, offset: {},", this.seq, metadata.partition(), metadata.offset());
        } else {
            logger.error("exception error from broker {}", exception.getMessage());
        }
    }
}


// ์‚ฌ์šฉ
// KafkaProducer ๋ฉ”์‹œ์ง€ ์ „์†ก (๋น„๋™๊ธฐ, callback ์‚ฌ์šฉ)
Callback callback = new CustomCallback(seq);
kafkaProducer.send(producerRecord, callback);

 

 

์ฐธ๊ณ  ์ž๋ฃŒ

๋Œ“๊ธ€