๐ก ์ด๋ฒ ํฌ์คํ ์์๋ KafkaProducer ๊ฐ์ฒด์ ๋ํด ์์๋ณด๋๋ก ํ๊ฒ ์ต๋๋ค.
kafka์ producer์ ๋ํด ๊ถ๊ธํ์ ๋ถ๋ค์ ์๋ producer์ ๋ํด์ ์ ๋ฆฌํ ํฌ์คํ ์ด ์์ผ๋ ์ฐธ๊ณ ํ์๋ฉด ์ข์ ๊ฒ ๊ฐ์ต๋๋ค.
Producer
ํ๋ก๋์๋ ์นดํ์นด์ ํ ํฝ์ผ๋ก ๋ฉ์์ง๋ฅผ ์ ์กํ๋ ์ญํ ์ ๋ด๋นํฉ๋๋ค. ์ ์ด๋ฏธ์ง๋ ํ๋ก๋์์ ์ ์ฒด์ ์ธ ํ๋ฆ์ ๋ํ๋ธ ๊ฒ์ ๋๋ค. ๋จผ์ ์นดํ์นด๋ก ์ ์กํ๊ธฐ ์ํ ์ค์ ๋ฐ์ดํฐ์ธ ProducerRecord๋ฅผ ์
soono-991.tistory.com
kafka๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด์๋ ์๋์ ๊ฐ์ด ์์กด์ฑ์ ์ถ๊ฐํด์ผ ํฉ๋๋ค.
implementation 'org.apache.kafka:kafka-clients:3.1.2'
๊ทธ๋ฆฌ๊ณ kafka-client์์ ์ ๊ณตํ๋ KafkaProducer ๊ฐ์ฒด๋ฅผ ํตํด producer๋ฅผ ์์ฑํฉ๋๋ค.
์ด๋, producer ์ค์ ์ ๋ณด๋ฅผ ์ ๋ฌํด์ค์ผ ํ๋๋ฐ Bootstrap-server, key-serializer, value-serializer ์ ๋ณด๋ ํ์ ๊ฐ์ ๋๋ค.
Properties props = new Properties();
// bootstrap.servers, key.serializer.class, value.serializer.class
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.111.133:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// KafkaProducer ๊ฐ์ฒด ์์ฑ
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props)
๊ทธ๋ฆฌ๊ณ ProducerRecord ๊ฐ์ฒด๋ฅผ ์์ฑํ์ฌ ํ๋ก๋์์ ์ ๋ฌํ ๋ฉ์์ง๋ฅผ ์์ฑํฉ๋๋ค.
// ProducerRecord ๊ฐ์ฒด ์์ฑ
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "hello world");
๊ธฐ๋ณธ์ ์ผ๋ก๋ ํ ํฝ, ๊ฐ์ ํ์๊ฐ์ด๋ฉฐ key, partition์ ์ ํ์ ๋ฐ๋ผ ์ค์ ํ ์๋ ์์ต๋๋ค.
๐ข producer ํฌ์คํ ์ฐธ๊ณ
์ด๋ฅผ ํ์ธํด๋ณด๊ธฐ ์ํด ProducerRecord ๊ฐ์ฒด์ ์์ฑ์๋ฅผ ํ์ธํด ๋ณด๋ฉด ๋ค์ํ ์์ฑ์๋ค์ด ์ค๋ฒ๋ก๋ฉ ๋์ด ์๋ ๊ฒ์ ํ์ธํ ์ ์์ต๋๋ค.
ํ๋ก๋์๋ก ๋ณด๋ผ ๋ฉ์์ง๋ฅผ ์์ฑํ์ผ๋ฉด ์ด์ send() ๋ฉ์๋๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ์นดํ์นด ๋ธ๋ก์ปค์ ์ ๋ฌํ ์ ์์ต๋๋ค.
// KafkaProducer ๋ฉ์์ง ์ ์ก
kafkaProducer.send(producerRecord);โ
send() ๋ฉ์๋๋ ๊ธฐ๋ณธ์ ์ผ๋ก ๋น๋๊ธฐ ๋ฐฉ์์ผ๋ก ๋ฉ์์ง๊ฐ ์ ์ก๋๋ฉฐ callback ๊ณผ get() ๋ฉ์๋๋ฑ์ ํตํด ๋๊ธฐ/๋น๋๊ธฐ ๋ฐฉ์์ ์ ํํ ์ ์์ต๋๋ค.
๋๊ธฐ ๋ฐฉ์
send() ๋ฉ์๋๋ฅผ ํธ์ถํ๋ฉด ๊ธฐ๋ณธ์ ์ผ๋ก Future<RecordMetadata> ํ์ ์ด ๋ฐํ๋ฉ๋๋ค.
Future ๊ฐ์ฒด์ get() ๋ฉ์๋๋ฅผ ํธ์ถํจ์ผ๋ก์จ ํ๋ก๋์๋ก ๋ฉ์์ง๋ฅผ ์ ์กํ๋ ๋ฐฉ์์ ๋๊ธฐ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.
๊ทธ๋ฆฌ๊ณ ๋ฐํ ํ์ ์ RecordMetadata๊ฐ ๋๋ฉฐ ๋ฐํ๋ RecordMetadata์ ์ ๋ฌํ ๋ฉ์์ง์ ํ ํฝ, ํํฐ์ , ์คํ์ ๊ณผ ๊ฐ์ ์ ๋ณด๋ค์ด ๋ด๊ฒจ ์์ต๋๋ค.
try {
RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
logger.info("\n ##### record metadata received ##### \n");
logger.info("partition : {}", recordMetadata.partition());
logger.info("offset : {}", recordMetadata.offset());
logger.info("timestamp : {}", recordMetadata.timestamp());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
๋น๋๊ธฐ ๋ฐฉ์
send()๋ ๊ธฐ๋ณธ์ ์ผ๋ก ๋น๋๊ธฐ ๋ฐฉ์์ด๊ธฐ ๋๋ฌธ์ ๋น๋๊ธฐ ๋ฐฉ์์ผ๋ก ๋ฉ์์ง๋ฅผ ์ ๋ฌํ๊ณ ์ถ์ ๊ฒฝ์ฐ์๋ ํน๋ณํ ์ค์ ์ ํ ํ์๋ ์์ง๋ง, ๋ฉ์์ง ์ ์ก ๊ฒฐ๊ณผ๋ฅผ ๋ฐ์๋ณด๊ณ ์ถ์ ๊ฒฝ์ฐ์๋ callback์ ์ฌ์ฉํด์ผ ํฉ๋๋ค.
// KafkaProducer ๋ฉ์์ง ์ ์ก (๋น๋๊ธฐ, callback ์ฌ์ฉ)
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
logger.info("\n ##### record metadata received ##### \n");
logger.info("partition : {}", metadata.partition());
logger.info("offset : {}", metadata.offset());
logger.info("timestamp : {}", metadata.timestamp());
} else {
logger.error("exception error from broker {}", exception.getMessage());
}
});
callback์ ์ฌ์ฉํ๋ฉด RecordMetadata, Exception์ด ํ๋ผ๋ฏธํฐ๋ก ์ ๋ฌ๋๋๋ฐ, ๋ฉ์์ง ์ ์ก์ ์คํจํ ๊ฒฝ์ฐ Exception ๊ฐ์ฒด์ ์์ธ ์ ๋ณด๊ฐ ๋ด๊ฒจ์์ต๋๋ค.
๋ฐ๋ผ์ ๊ฐ๋ฐ์๊ฐ ์์ธ ์ฒ๋ฆฌ๋ฅผ ํ๋ ์ถ๊ฐ ์์ ์ด ํ์ํ๋ฉฐ, RecordMetadata์๋ ๋๊ธฐ ๋ฐฉ์๊ณผ ๋ง์ฐฌ๊ฐ์ง๋ก ์ ์กํ ๋ฉ์์ง์ ๊ดํ ์ ๋ณด๊ฐ ๋ด๊ฒจ ์์ต๋๋ค.
๊ธฐ๋ณธ์ ์ผ๋ก๋ lambda๋ ์ต๋ช ํด๋์ค๋ก callback์ ์์ฑํ๋๋ฐ ๋ณ๋์ ํด๋์ค๋ก ์์ฑํ์ฌ callback์ ๊ตฌํํ ์๋ ์์ต๋๋ค.
public class CustomCallback implements Callback {
private static final Logger logger = LoggerFactory.getLogger(CustomCallback.class.getName());
private int seq;
public CustomCallback(int seq) {
this.seq = seq;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
logger.info("seq: {} partition: {}, offset: {},", this.seq, metadata.partition(), metadata.offset());
} else {
logger.error("exception error from broker {}", exception.getMessage());
}
}
}
// ์ฌ์ฉ
// KafkaProducer ๋ฉ์์ง ์ ์ก (๋น๋๊ธฐ, callback ์ฌ์ฉ)
Callback callback = new CustomCallback(seq);
kafkaProducer.send(producerRecord, callback);
์ฐธ๊ณ ์๋ฃ
'Kafka' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
kafka CLI ์ฌ์ฉํ๊ธฐ - kafka-consumer-group.(sh | bat) (0) | 2023.04.08 |
---|---|
kafka CLI ์ฌ์ฉํ๊ธฐ - kafka-dump-log (0) | 2023.04.04 |
Producer (0) | 2023.04.03 |
kafka CLI ์ฌ์ฉํ๊ธฐ - kafka-console-consumer.(sh | bat) (0) | 2023.03.28 |
kafka CLI ์ฌ์ฉํ๊ธฐ - kafka-console-producer.(sh | bat) (0) | 2023.03.28 |
๋๊ธ