KafkaConsumer
๐ก ์ด๋ฒ ํฌ์คํ ์์๋ KafkaConsumer์ ๋ํด ์ ๋ฆฌํด ๋ณด๋๋ก ํ๊ฒ ์ต๋๋ค.
์ด์ ์ KafkaProducer ๊ฐ์ฒด๋ฅผ ํตํด ์นดํ์นด ํ ํฝ์ ๋ฉ์์ง๋ฅผ ์ ์กํด ๋ณด์์ต๋๋ค.
์ด๋ฒ์๋ KafkaConsumer ๊ฐ์ฒด๋ฅผ ํตํด ํ๋ก๋์๊ฐ ์ ์กํ ๋ฉ์์ง๋ฅผ ์ฝ์ด์ค๋๋ก ํ๊ฒ ์ต๋๋ค.
public static void main(String[] args) {
String topicName = "simple-topic";
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.133:9092");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "simple-group");
props.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000"); // optional
props.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "90000"); // optional
props.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "600000"); // optional
try (KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props)) {
kafkaConsumer.subscribe(List.of(topicName));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
consumerRecords.forEach(record -> {
logger.info("record key: {}, record value: {}, partition : {}",
record.key(), record.value(), record.partition());
});
}
}
}
๊ฐ์ฅ ๊ธฐ๋ณธ์ ์ธ KafkaConsumer ๊ฐ์ฒด ์์ฑ ์ฝ๋์ ๋๋ค.
Property๋ก GroupId ๊น์ง๋ ํ์ ๊ฐ์ผ๋ก ์ค์ ํด์ผ ํ๋ฉฐ, ์ดํ ๋๋จธ์ง ์ต์ ๋ค์ ์ ํ ๊ฐ์ผ๋ก ์ ๋ ฅํฉ๋๋ค. (์ ๋ ฅํ์ง ์์ ๊ฒฝ์ฐ ๊ธฐ๋ณธ ๊ฐ์ด ์ ์ฉ๋ฉ๋๋ค.)
kafkaConsumer.subscribe(List.of(topicName));
์์ฑํ ์ปจ์๋จธ๊ฐ ์ด๋ค ํ ํฝ์ ๋ฉ์์ง๋ฅผ ์ฝ์ด์ฌ์ง ํ ํฝ๋ช ์ ์ ๋ฌํฉ๋๋ค.
๊ฐ์ฅ ๋ณดํธ์ ์ธ ๋ฐฉ๋ฒ์ผ๋ก ์ปจ์๋จธ๊ฐ ์ฝ์ด์ฌ ํ ํฝ์ ๋ฆฌ์คํธ๋ก ์ ๋ฌํฉ๋๋ค.
์ฌ๋ฌ ๊ฐ์ ํ ํฝ์ ์ ๋ฌํ๊ฒ ๋๋ฉด ๊ฐ ํ ํฝ์ ๋ฆฌ๋ ํํฐ์ ์ ์ฐพ์ ์ปจ์๋จธ๊ฐ ๋ฉ์์ง๋ฅผ ์ฝ์ด์ต๋๋ค.
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
์ปจ์๋จธ๊ฐ poll() ๋ฉ์๋๋ฅผ ํธ์ถํจ์ผ๋ก์จ ๋ฉ์์ง๋ฅผ ์ฝ์ด์ต๋๋ค.
์ด๋ Duration์ ์ค์ ํ ์ ์๋๋ฐ ์ด ์๊ฐ์ ์ปจ์๋จธ๊ฐ ๋ฉ์์ง๊ฐ ์ค๊ธฐ๋ฅผ ๊ธฐ๋ค๋ฆฌ๋ ์ต๋ ์๊ฐ์ ๋๋ค.
์ด์ ํฌ์คํ ์์ ์ ๋ฆฌํ๋ฏ ์ปจ์๋จธ๋ Fetcher ๊ฐ์ฒด์ ์ ์ฅ๋์ด ์๋ Linked Queue ํํ์ ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ค๊ฒ ๋๋๋ฐ Fetcher ๊ฐ์ฒด์ ๋ฉ์์ง๊ฐ ์์ ๊ฒฝ์ฐ ๊ธฐ๋ค๋ฆด ์ต๋ ์๊ฐ์ ๋๋ค.
๋ง ๊ทธ๋๋ก ์ต๋ ์๊ฐ์ด๊ธฐ ๋๋ฌธ์ ์๋์ ๊ฐ์ ์ต์ ๋ค์ ๋ณ๊ฒฝํ๋ฉด ์ต๋ ์๊ฐ๋งํผ ๊ธฐ๋ค๋ฆฌ์ง ์๊ณ ๋ ์ค์ ํ ์ต์ ์ ์ฌ์ด์ฆ๋งํผ ๋ฉ์์ง๊ฐ ์์ด๋ฉด ๋ฐ๋ก ์ปจ์๋จธ๋ก ์ ์กํ๊ฒ ๋ฉ๋๋ค.
Manual Commit
์์์๋ ๋ฐ๋ก enable.auto.commit ์ต์ ์ ์ค์ ํ์ง ์์๊ธฐ ๋๋ฌธ์ AutoCommit์ด ์ ์ฉ๋์์ต๋๋ค.
enable.auto.commit์ false๋ก ์ค์ ํ ํ ์ง์ commitSync() , commitAsync() ๋ฉ์๋๋ฅผ ํธ์ถํ์ฌ ์๋์ผ๋ก ์คํ์ ์ ์ปค๋ฐํ ์ ์์ต๋๋ค.
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
private static void pollCommitSync(KafkaConsumer<String, String> kafkaConsumer) {
try (kafkaConsumer) {
int loopCount = 0;
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
logger.info(" ###### logCount : {} consumerRecords count: {}", loopCount++, consumerRecords.count());
consumerRecords.forEach(record -> {
logger.info("record key: {}, record value: {}, partition : {}, record offset: {}",
record.key(), record.value(), record.partition(), record.offset());
});
try {
if (consumerRecords.count() > 0) {
kafkaConsumer.commitSync(); // ์๋ ์ปค๋ฐ
logger.info("commit sync has been called");
}
} catch (CommitFailedException e) {
logger.error(e.getMessage());
}
}
} catch (WakeupException e) {
logger.error("wakeup exception has been called");
} catch (Exception e) {
logger.error(e.getMessage());
} finally {
logger.info("finally consumer is closing");
}
}
๋ฉ์์ง๋ฅผ ์ฝ๋ ๋ก์ง์ ๋์ผํ์ง๋ง kafkaConsumer.commitSync() ๋ฉ์๋๋ฅผ ํธ์ถํด์ผ ์คํ์ ์ ๋ณด๊ฐ ์ปค๋ฐ๋ฉ๋๋ค.
ํ์ง๋ง commitSync()๋ฅผ ํธ์ถํ๋ฉด Main Thread๊ฐ ์คํ์ ์ ๋ณด๋ฅผ ์ปค๋ฐํ ๋๊น์ง block ๋๊ธฐ ๋๋ฌธ์ ์ด ๋ถ๋ถ์ด ์ซ๋ค๋ฉด commitAsync() ๋ฉ์๋๋ฅผ ์ฌ์ฉํฉ๋๋ค.
kafkaConsumer.commitAsync((offsets, exception) -> {
if (exception != null) {
logger.error("offsets {} is not completed, error: {}", offsets, exception.getMessage());
}
});
commitSync()์ ๋ฌ๋ฆฌ commitAsync()๋ OffsetCommitCallback์ ์ธ์๋ก ๋ฐ์ต๋๋ค.
commitAsync()๋ ์คํ์ ์ปค๋ฐ์ ๋น๋๊ธฐ๋ก ์คํํ๊ธฐ ๋๋ฌธ์ ์ปค๋ฐ ์คํ ๊ฒฐ๊ณผ๋ฅผ callback์ผ๋ก ์ ๋ฌ๋ฐ๊ฒ ๋ฉ๋๋ค.
์ด๋ callback์๋ ๋ ๊ฐ์ ํ๋ผ๋ฏธํฐ๊ฐ ๋ด๊ฒจ ์์ต๋๋ค.
offsets์๋ ์ปค๋ฐํ ์คํ์ ๊ณผ ๊ด๋ จ๋ ๋ฉํ ๋ฐ์ดํฐ๋ค์ด ๋ด๊ฒจ์์ผ๋ฉฐ, ์คํ์ ์ปค๋ฐ์ ์คํจํ ๊ฒฝ์ฐ ํด๋น ์๋ฌ/์์ธ๊ฐ Exception ๊ฐ์ฒด์ ๋ด๊ฒจ ์ ์ก๋ฉ๋๋ค.
callback์์๋ Exception ํ๋ผ๋ฏธํฐ๊ฐ null์ธ์ง ์ฌ๋ถ์ ๋ฐ๋ผ ์คํ์ ์ปค๋ฐ์ ์ฑ๊ณต/์คํจํ๋์ง๋ฅผ ํ์ธํ ์ ์์ต๋๋ค.
๋๊ธฐ ๋ฐฉ์์ ์คํ์ ์ปค๋ฐ์ ์ปค๋ฐ์ ์คํจํ ๊ฒฝ์ฐ ๋ค์ ์ฌ์๋ํฉ๋๋ค.
๋ฐ๋ฉด ๋น๋๊ธฐ ๋ฐฉ์์ ์คํ์ ์ปค๋ฐ์ ์ปค๋ฐ์ ์คํจํ์ฌ๋ ๋ค์ ์ฌ์๋๋ฅผ ํ์ง ์์ต๋๋ค.
์คํ์ ์ปค๋ฐ์ ๊ฒฝ์ฐ ์ฌ๋ฌ ์ด์ ๋ก ์ธํด ๋ฉ์์ง ์ค๋ณต ๋๋ ์ ์ค์ด ๋ฐ์ํ๋ค๊ณ ํฉ๋๋ค.
์ด ๋ถ๋ถ์ ์ ํํ ํ๋ฒ ์ ์ก(exactly-once)์ ์ฌ์ฉํ๋ฉด ํด๊ฒฐํ ์ ์์ง๋ง ๋ณ๋์ ์ค์ ์ด ํ์ํฉ๋๋ค.
๊ด๋ จํด์ ์ ๋ฆฌ๋ ํฌ์คํ ์ด ์์ด ์ฐธ๊ณ ํ์๋ฉด ์ข์ ๊ฒ ๊ฐ์ต๋๋ค.
[KAFKA] Consumer ๋ฉ์์ง ์ค๋ณต ์ฒ๋ฆฌ
Kafka๋ Consumer๊ฐ ์์ ์ด ์ฝ์ ์์น์ offset์ ๊ธฐ๋กํ๋๋ก commit์ ํ๋๋ก ํ๋ค. ์ด๋ ์๋ ์ปค๋ฐ๊ณผ ์๋ ์ปค๋ฐ์ผ๋ก ๋๋๋ค. ์๋ ์ปค๋ฐ ์๋ ์ปค๋ฐ์ ํน์ ์ฃผ๊ธฐ๋ง๋ค ์ปค๋ฐ์ ์๋์ผ๋ก ํ๋ ๋ฐฉ์์ด๋ค. ํ
camel-context.tistory.com
Kafka ๋ฉ์์ง ์ฒ๋ฆฌ ์ค๋ณต or ๋๋ฝ ๋ฌธ์ - ํํ์ ๋ธ๋ก๊ทธ
[Kafka Case Study] 3ํธ - Kafka ๋ฉ์์ง๊ฐ ์ค๋ณต ์ฒ๋ฆฌ๋๊ฑฐ๋ ๋๋ฝ๋๋ ๋ฌธ์ ๊ฐ ๋ฐ์ํ๋ ๊ฒฝ์ฐ Kafka - (7) Offset Commit ์์ ์ด๋ฏธ ์นดํ์นด ๋ฉ์์ง์ ์ค๋ณต ์ฒ๋ฆฌ ๊ฐ๋ฅ์ฑ์ ๋ํด ์ธ๊ธํ ์ ์๋๋ฐ ์ด์ ๋ํด ์ข๋ ์
tillog.netlify.app
ํน์ ํํฐ์ ๋ง ํ ๋น
์ปจ์๋จธ์ subscribe() ๋ฉ์๋์ ํ ํฝ๋ช ์ ์ ๋ฌํ๋ฉด ํด๋น ํ ํฝ์ ๋ฉ์์ง๋ฅผ ์ปจ์๋จธ๊ฐ ์ฝ์ด์จ๋ค๊ณ ํ์ต๋๋ค.
๋ง์ฝ ํน์ ํํฐ์ ๋ง ํ ๋นํ์ฌ ๋ฉ์์ง๋ฅผ ์ฝ์ด์ค๊ณ ์ถ์ ๊ฒฝ์ฐ์๋ assign() ๋ฉ์๋๋ฅผ ์ฌ์ฉํ์ฌ ํํฐ์ ์ ๋ณด๋ฅผ ์ ๋ฌํ๋ฉด ๋ฉ๋๋ค.
// ํ ํฝ ํํฐ์
์์ฑ
TopicPartition topicPartition = new TopicPartition(topicName, 0);
// ํํฐ์
์ ๋ณด ์ ๋ฌ
kafkaConsumer.assign(List.of(topicPartition));
ํน์ ํํฐ์ ์ ํน์ ์คํ์ ๋ถํฐ ๋ฉ์์ง ์ฝ๊ธฐ
kafkaConsumer.seek() ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ฉด ํน์ ํํฐ์ ์ ํน์ ์คํ์ ๋ถํฐ ๋ฉ์์ง๋ฅผ ์ฝ์ด์ฌ ์ ์์ต๋๋ค.
// ํ ํฝ ํํฐ์
์์ฑ
TopicPartition topicPartition = new TopicPartition(topicName, 0);
// ํํฐ์
ํ ๋น
kafkaConsumer.assign(List.of(topicPartition));
// ์คํ์
์ค์
kafkaConsumer.seek(topicPartition, 10);