๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
Kafka

KafkaConsumer

by Soono991 2023. 4. 17.
๐Ÿ’ก ์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„œ๋Š” KafkaConsumer์— ๋Œ€ํ•ด ์ •๋ฆฌํ•ด ๋ณด๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

 

์ด์ „์— KafkaProducer ๊ฐ์ฒด๋ฅผ ํ†ตํ•ด ์นดํ”„์นด ํ† ํ”ฝ์— ๋ฉ”์‹œ์ง€๋ฅผ ์ „์†กํ•ด ๋ณด์•˜์Šต๋‹ˆ๋‹ค.

์ด๋ฒˆ์—๋Š” KafkaConsumer ๊ฐ์ฒด๋ฅผ ํ†ตํ•ด ํ”„๋กœ๋“€์„œ๊ฐ€ ์ „์†กํ•œ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด์˜ค๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

 

public static void main(String[] args) {

    String topicName = "simple-topic";

    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.133:9092");
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "simple-group");
    props.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000"); // optional
    props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "90000"); // optional
    props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000"); // optional

    try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props)) {
        kafkaConsumer.subscribe(List.of(topicName));

        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));

            consumerRecords.forEach(record -> {
                logger.info("record key: {}, record value: {}, partition : {}",
                        record.key(), record.value(), record.partition());
            });
        }
    }
}

 

๊ฐ€์žฅ ๊ธฐ๋ณธ์ ์ธ KafkaConsumer ๊ฐ์ฒด ์ƒ์„ฑ ์ฝ”๋“œ์ž…๋‹ˆ๋‹ค.

Property๋กœ GroupId ๊นŒ์ง€๋Š” ํ•„์ˆ˜ ๊ฐ’์œผ๋กœ ์„ค์ •ํ•ด์•ผ ํ•˜๋ฉฐ, ์ดํ›„ ๋‚˜๋จธ์ง€ ์˜ต์…˜๋“ค์€ ์„ ํƒ ๊ฐ’์œผ๋กœ ์ž…๋ ฅํ•ฉ๋‹ˆ๋‹ค. (์ž…๋ ฅํ•˜์ง€ ์•Š์„ ๊ฒฝ์šฐ ๊ธฐ๋ณธ ๊ฐ’์ด ์ ์šฉ๋ฉ๋‹ˆ๋‹ค.)

 

kafkaConsumer.subscribe(List.of(topicName));

์ƒ์„ฑํ•œ ์ปจ์Šˆ๋จธ๊ฐ€ ์–ด๋–ค ํ† ํ”ฝ์˜ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด์˜ฌ์ง€ ํ† ํ”ฝ๋ช…์„ ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

๊ฐ€์žฅ ๋ณดํŽธ์ ์ธ ๋ฐฉ๋ฒ•์œผ๋กœ ์ปจ์Šˆ๋จธ๊ฐ€ ์ฝ์–ด์˜ฌ ํ† ํ”ฝ์„ ๋ฆฌ์ŠคํŠธ๋กœ ์ „๋‹ฌํ•ฉ๋‹ˆ๋‹ค.

์—ฌ๋Ÿฌ ๊ฐœ์˜ ํ† ํ”ฝ์„ ์ „๋‹ฌํ•˜๊ฒŒ ๋˜๋ฉด ๊ฐ ํ† ํ”ฝ์˜ ๋ฆฌ๋” ํŒŒํ‹ฐ์…˜์„ ์ฐพ์•„ ์ปจ์Šˆ๋จธ๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด์˜ต๋‹ˆ๋‹ค.

 

 

ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));

์ปจ์Šˆ๋จธ๊ฐ€ poll() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•จ์œผ๋กœ์จ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด์˜ต๋‹ˆ๋‹ค.

์ด๋•Œ Duration์„ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋Š”๋ฐ ์ด ์‹œ๊ฐ„์€ ์ปจ์Šˆ๋จธ๊ฐ€ ๋ฉ”์‹œ์ง€๊ฐ€ ์˜ค๊ธฐ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๋Š” ์ตœ๋Œ€ ์‹œ๊ฐ„์ž…๋‹ˆ๋‹ค.

์ด์ „ ํฌ์ŠคํŒ…์—์„œ ์ •๋ฆฌํ–ˆ๋“ฏ ์ปจ์Šˆ๋จธ๋Š” Fetcher ๊ฐ์ฒด์— ์ €์žฅ๋˜์–ด ์žˆ๋Š” Linked Queue ํ˜•ํƒœ์˜ ๋ฉ”์‹œ์ง€๋ฅผ ๊ฐ€์ ธ์˜ค๊ฒŒ ๋˜๋Š”๋ฐ Fetcher ๊ฐ์ฒด์— ๋ฉ”์‹œ์ง€๊ฐ€ ์—†์„ ๊ฒฝ์šฐ ๊ธฐ๋‹ค๋ฆด ์ตœ๋Œ€ ์‹œ๊ฐ„์ž…๋‹ˆ๋‹ค.

 

๋ง ๊ทธ๋Œ€๋กœ ์ตœ๋Œ€ ์‹œ๊ฐ„์ด๊ธฐ ๋•Œ๋ฌธ์— ์•„๋ž˜์™€ ๊ฐ™์€ ์˜ต์…˜๋“ค์„ ๋ณ€๊ฒฝํ•˜๋ฉด ์ตœ๋Œ€ ์‹œ๊ฐ„๋งŒํผ ๊ธฐ๋‹ค๋ฆฌ์ง€ ์•Š๊ณ ๋„ ์„ค์ •ํ•œ ์˜ต์…˜์˜ ์‚ฌ์ด์ฆˆ๋งŒํผ ๋ฉ”์‹œ์ง€๊ฐ€ ์Œ“์ด๋ฉด ๋ฐ”๋กœ ์ปจ์Šˆ๋จธ๋กœ ์ „์†กํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

https://soono-991.tistory.com/81

 

Manual Commit

์œ„์—์„œ๋Š” ๋”ฐ๋กœ enable.auto.commit ์˜ต์…˜์„ ์„ค์ •ํ•˜์ง€ ์•Š์•˜๊ธฐ ๋•Œ๋ฌธ์— AutoCommit์ด ์ ์šฉ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

enable.auto.commit์„ false๋กœ ์„ค์ •ํ•œ ํ›„ ์ง์ ‘ commitSync() , commitAsync() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ˆ˜๋™์œผ๋กœ ์˜คํ”„์…‹์„ ์ปค๋ฐ‹ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

 

private static void pollCommitSync(KafkaConsumer<String, String> kafkaConsumer) {
    try (kafkaConsumer) {
        int loopCount = 0;
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));

            logger.info(" ###### logCount : {} consumerRecords count: {}", loopCount++, consumerRecords.count());

            consumerRecords.forEach(record -> {
                logger.info("record key: {}, record value: {}, partition : {}, record offset: {}",
                        record.key(), record.value(), record.partition(), record.offset());
            });

            try {
                if (consumerRecords.count() > 0) {
                    kafkaConsumer.commitSync();                      // ์ˆ˜๋™ ์ปค๋ฐ‹
                    logger.info("commit sync has been called");
                }
            } catch (CommitFailedException e) {
                logger.error(e.getMessage());
            }
        }
    } catch (WakeupException e) {
        logger.error("wakeup exception has been called");
    } catch (Exception e) {
        logger.error(e.getMessage());
    } finally {
        logger.info("finally consumer is closing");
    }
}

 

๋ฉ”์‹œ์ง€๋ฅผ ์ฝ๋Š” ๋กœ์ง์€ ๋™์ผํ•˜์ง€๋งŒ kafkaConsumer.commitSync() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•ด์•ผ ์˜คํ”„์…‹ ์ •๋ณด๊ฐ€ ์ปค๋ฐ‹๋ฉ๋‹ˆ๋‹ค.

ํ•˜์ง€๋งŒ commitSync()๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด Main Thread๊ฐ€ ์˜คํ”„์…‹ ์ •๋ณด๋ฅผ ์ปค๋ฐ‹ํ•  ๋•Œ๊นŒ์ง€ block ๋˜๊ธฐ ๋•Œ๋ฌธ์— ์ด ๋ถ€๋ถ„์ด ์‹ซ๋‹ค๋ฉด commitAsync() ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค.

 

kafkaConsumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        logger.error("offsets {} is not completed, error: {}", offsets, exception.getMessage());
    }
});

 

commitSync()์™€ ๋‹ฌ๋ฆฌ commitAsync()๋Š” OffsetCommitCallback์„ ์ธ์ž๋กœ ๋ฐ›์Šต๋‹ˆ๋‹ค.

commitAsync()๋Š” ์˜คํ”„์…‹ ์ปค๋ฐ‹์„ ๋น„๋™๊ธฐ๋กœ ์‹คํ–‰ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์ปค๋ฐ‹ ์‹คํ–‰ ๊ฒฐ๊ณผ๋ฅผ callback์œผ๋กœ ์ „๋‹ฌ๋ฐ›๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

์ด๋•Œ callback์—๋Š” ๋‘ ๊ฐœ์˜ ํŒŒ๋ผ๋ฏธํ„ฐ๊ฐ€ ๋‹ด๊ฒจ ์žˆ์Šต๋‹ˆ๋‹ค.

 

offsets์—๋Š” ์ปค๋ฐ‹ํ•œ ์˜คํ”„์…‹๊ณผ ๊ด€๋ จ๋œ ๋ฉ”ํƒ€ ๋ฐ์ดํ„ฐ๋“ค์ด ๋‹ด๊ฒจ์žˆ์œผ๋ฉฐ, ์˜คํ”„์…‹ ์ปค๋ฐ‹์— ์‹คํŒจํ•  ๊ฒฝ์šฐ ํ•ด๋‹น ์—๋Ÿฌ/์˜ˆ์™ธ๊ฐ€ Exception ๊ฐ์ฒด์— ๋‹ด๊ฒจ ์ „์†ก๋ฉ๋‹ˆ๋‹ค.

callback์—์„œ๋Š” Exception ํŒŒ๋ผ๋ฏธํ„ฐ๊ฐ€ null์ธ์ง€ ์—ฌ๋ถ€์— ๋”ฐ๋ผ ์˜คํ”„์…‹ ์ปค๋ฐ‹์— ์„ฑ๊ณต/์‹คํŒจํ–ˆ๋Š”์ง€๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

๋™๊ธฐ ๋ฐฉ์‹์˜ ์˜คํ”„์…‹ ์ปค๋ฐ‹์€ ์ปค๋ฐ‹์— ์‹คํŒจํ•  ๊ฒฝ์šฐ ๋‹ค์‹œ ์žฌ์‹œ๋„ํ•ฉ๋‹ˆ๋‹ค.

๋ฐ˜๋ฉด ๋น„๋™๊ธฐ ๋ฐฉ์‹์˜ ์˜คํ”„์…‹ ์ปค๋ฐ‹์€ ์ปค๋ฐ‹์— ์‹คํŒจํ•˜์—ฌ๋„ ๋‹ค์‹œ ์žฌ์‹œ๋„๋ฅผ ํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

 

์˜คํ”„์…‹ ์ปค๋ฐ‹์˜ ๊ฒฝ์šฐ ์—ฌ๋Ÿฌ ์ด์œ ๋กœ ์ธํ•ด ๋ฉ”์‹œ์ง€ ์ค‘๋ณต ๋˜๋Š” ์œ ์‹ค์ด ๋ฐœ์ƒํ•œ๋‹ค๊ณ  ํ•ฉ๋‹ˆ๋‹ค.

์ด ๋ถ€๋ถ„์„ ์ •ํ™•ํžˆ ํ•œ๋ฒˆ ์ „์†ก(exactly-once)์„ ์‚ฌ์šฉํ•˜๋ฉด ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ์ง€๋งŒ ๋ณ„๋„์˜ ์„ค์ •์ด ํ•„์š”ํ•ฉ๋‹ˆ๋‹ค.

๊ด€๋ จํ•ด์„œ ์ •๋ฆฌ๋œ ํฌ์ŠคํŒ…์ด ์žˆ์–ด ์ฐธ๊ณ ํ•˜์‹œ๋ฉด ์ข‹์„ ๊ฒƒ ๊ฐ™์Šต๋‹ˆ๋‹ค.

 

 

[KAFKA] Consumer ๋ฉ”์‹œ์ง€ ์ค‘๋ณต ์ฒ˜๋ฆฌ

Kafka๋Š” Consumer๊ฐ€ ์ž์‹ ์ด ์ฝ์€ ์œ„์น˜์˜ offset์„ ๊ธฐ๋กํ•˜๋„๋ก commit์„ ํ•˜๋„๋ก ํ•œ๋‹ค. ์ด๋•Œ ์ž๋™ ์ปค๋ฐ‹๊ณผ ์ˆ˜๋™ ์ปค๋ฐ‹์œผ๋กœ ๋‚˜๋‰œ๋‹ค. ์ž๋™ ์ปค๋ฐ‹ ์ž๋™ ์ปค๋ฐ‹์€ ํŠน์ • ์ฃผ๊ธฐ๋งˆ๋‹ค ์ปค๋ฐ‹์„ ์ž๋™์œผ๋กœ ํ•˜๋Š” ๋ฐฉ์‹์ด๋‹ค. ํ•˜

camel-context.tistory.com

 

 

Kafka ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ค‘๋ณต or ๋ˆ„๋ฝ ๋ฌธ์ œ - ํ•œํ˜„์ƒ ๋ธ”๋กœ๊ทธ

[Kafka Case Study] 3ํŽธ - Kafka ๋ฉ”์‹œ์ง€๊ฐ€ ์ค‘๋ณต ์ฒ˜๋ฆฌ๋˜๊ฑฐ๋‚˜ ๋ˆ„๋ฝ๋˜๋Š” ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•˜๋Š” ๊ฒฝ์šฐ Kafka - (7) Offset Commit ์—์„œ ์ด๋ฏธ ์นดํ”„์นด ๋ฉ”์‹œ์ง€์˜ ์ค‘๋ณต ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ์„ฑ์— ๋Œ€ํ•ด ์–ธ๊ธ‰ํ•œ ์  ์žˆ๋Š”๋ฐ ์ด์— ๋Œ€ํ•ด ์ข€๋” ์•Œ

tillog.netlify.app

 

 

ํŠน์ • ํŒŒํ‹ฐ์…˜๋งŒ ํ• ๋‹น

์ปจ์Šˆ๋จธ์˜ subscribe() ๋ฉ”์„œ๋“œ์— ํ† ํ”ฝ๋ช…์„ ์ „๋‹ฌํ•˜๋ฉด ํ•ด๋‹น ํ† ํ”ฝ์˜ ๋ฉ”์‹œ์ง€๋ฅผ ์ปจ์Šˆ๋จธ๊ฐ€ ์ฝ์–ด์˜จ๋‹ค๊ณ  ํ–ˆ์Šต๋‹ˆ๋‹ค.

๋งŒ์•ฝ ํŠน์ • ํŒŒํ‹ฐ์…˜๋งŒ ํ• ๋‹นํ•˜์—ฌ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด์˜ค๊ณ  ์‹ถ์€ ๊ฒฝ์šฐ์—๋Š” assign() ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ํŒŒํ‹ฐ์…˜ ์ •๋ณด๋ฅผ ์ „๋‹ฌํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

 

// ํ† ํ”ฝ ํŒŒํ‹ฐ์…˜ ์ƒ์„ฑ
TopicPartition topicPartition = new TopicPartition(topicName, 0);

// ํŒŒํ‹ฐ์…˜ ์ •๋ณด ์ „๋‹ฌ
kafkaConsumer.assign(List.of(topicPartition));

 

ํŠน์ • ํŒŒํ‹ฐ์…˜์˜ ํŠน์ • ์˜คํ”„์…‹๋ถ€ํ„ฐ ๋ฉ”์‹œ์ง€ ์ฝ๊ธฐ

kafkaConsumer.seek() ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ํŠน์ • ํŒŒํ‹ฐ์…˜์˜ ํŠน์ • ์˜คํ”„์…‹๋ถ€ํ„ฐ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ์–ด์˜ฌ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

// ํ† ํ”ฝ ํŒŒํ‹ฐ์…˜ ์ƒ์„ฑ
TopicPartition topicPartition = new TopicPartition(topicName, 0);

// ํŒŒํ‹ฐ์…˜ ํ• ๋‹น
kafkaConsumer.assign(List.of(topicPartition));

// ์˜คํ”„์…‹ ์„ค์ •
kafkaConsumer.seek(topicPartition, 10);

 

 

'Kafka' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

๋ฉ€ํ‹ฐ ๋…ธ๋“œ ์นดํ”„์นด - 2  (0) 2023.05.01
๋ฉ€ํ‹ฐ ๋…ธ๋“œ ์นดํ”„์นด - 1  (0) 2023.05.01
Consumer - 3  (0) 2023.04.17
Consumer - 2  (0) 2023.04.17
Consumer - 1  (0) 2023.04.16

๋Œ“๊ธ€