TL;DR
상품 집계 이후 좋아요·조회수·판매량을 종합한 랭킹 기능이 요구되었다.
단건 처리 대신 배치 리스너와 zip-map-sink 패턴을 도입해 Redis ZSET 연산을 최적화했고, 가중치 기반 점수 계산으로 랭킹 정확도를 높였다.
일별·시간별 TTL 관리와 carry-over 전략을 통해 랭킹 공백 문제를 해결했으며, 상위 100건을 DB에 백업하고 htmx 기반 시뮬레이션을 추가해 안정성과 가시성을 확보했다.
들어가며
상품 집계(product_metrics) 기능을 구축한 이후, 추가로 상품 랭킹에 대한 요구사항이 생겼다.
단순히 이벤트 데이터를 모으는 것에서 그치지 않고, 좋아요 수, 조회수, 판매량을 기준으로 상품의 순위를 매겨야 했다.
기본적으로는 일별/시간별 랭킹 기능을 제공해야 했고, 이를 어떻게 구현했는지, 어떤 고민들을 했는지에 대해 정리해 봤다.
상품 랭킹을 위한 Redis 선택 이유?
랭킹 기능을 구현할 때 가장 중요한 건 빠른 조회 성능이었다.
사용자에게 노출되는 랭킹은 실시간에 가깝게 갱신되어야 하고, 동시에 수많은 사용자가 동시에 접근해도 지연 없이 응답해야 했다.
처음에는 RDBMS만으로도 가능하지 않을까 고민했지만, 문제는 명확했다.
- 매번 ORDER BY와 LIMIT 쿼리를 수행하면 집계가 몰릴 때 DB 부하가 급격히 증가
- 조회수/좋아요/판매량 데이터를 합산해 순위를 계산하려면 실시간 집계 쿼리가 필요
- 트래픽이 많아질 경우 DB는 쉽게 병목 지점이 될 수 있음
결국 이 요구사항을 만족시키기 위해 Redis의 Sorted Set(ZSET) 자료구조를 선택했다.
- score 기반 자동 정렬로 별도 계산 과정 불필요
- ZADD, ZREVRANGE, ZREVRANK, ZSCORE 등을 통해 Top-N, 특정 상품 순위, 점수를 효율적으로 조회 가능
- TTL 설정을 통한 데이터 자동 만료로 메모리 관리 용이
Redis의 ZSET을 사용함으로써 DB 대비 훨씬 빠른 성능과 단순한 구조로 실시간 랭킹 시스템을 구축할 수 있었다.
Product Metrics 집계와 랭킹 집계 분리
처음에는 product_metrics 이벤트(조회수 증가, 좋아요 변경, 판매량 변경 등) Consumer에서 직접 상품 랭킹 집계까지 수행하려고 했다.
하지만 이렇게 되면 메트릭 집계 로직과 랭킹 집계 로직이 강하게 결합되어 장애 전파 위험이 커지고, 코드도 복잡해진다고 생각했다.
무엇보다도 Metrics Consumer는 단순히 원시 이벤트를 집계하는 책임만 가져야 하는데, 랭킹 집계까지 포함시키면 책임이 불필요하게 확장된다고 생각했고, 이는 역할 분리 원칙(SRP, Single Responsibility Principle)에도 어긋나고, 유지보수와 확장성에도 불리하다고 생각했다.
따라서 Product Metrics 집계 Consumer와 랭킹 집계 Consumer를 분리하여 각각의 책임을 명확히 했다.
Product Metrics 집계 컨슈머에서 랭킹 집계까지 처리: 잘못된 방식이라고 생각함 ❌
@KafkaListener(
topics = [
EventType.Topic.PRODUCT_V1_STOCK_ADJUSTED,
EventType.Topic.PRODUCT_V1_LIKE_CHANGED,
EventType.Topic.PRODUCT_V1_VIEWED,
],
groupId = EventType.Group.METRICS_EVENTS,
)
@Transactional
fun listen(
message: String,
ack: Acknowledgment,
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
@Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int,
@Header(KafkaHeaders.OFFSET) offset: Long,
) {
log.info("[MetricsV1EventConsumer.listen] message: $message")
val event = Event.fromJson(message) ?: throw IllegalArgumentException("Invalid event message: $message")
// 1. product_metrics 집계 이벤트 처리
// ...
// 2. product_rank (상품 랭킹) 이벤트 처리
// ...
ack.acknowledge()
}
Product Metrics 집계 컨슈머와 Product Rank 집계 컨슈머를 명확히 분리: 책임과 역할이 잘 구분되었다고 생각함 ✅
// product_metrics 집계 컨슈머
@KafkaListener(
topics = [
EventType.Topic.PRODUCT_V1_STOCK_ADJUSTED,
EventType.Topic.PRODUCT_V1_LIKE_CHANGED,
EventType.Topic.PRODUCT_V1_VIEWED,
],
groupId = EventType.Group.METRICS_EVENTS,
)
@Transactional
fun listen(
message: String,
ack: Acknowledgment,
@Header(KafkaHeaders.RECEIVED_TOPIC) topic: String,
@Header(KafkaHeaders.RECEIVED_PARTITION) partition: Int,
@Header(KafkaHeaders.OFFSET) offset: Long,
) {
log.info("[MetricsV1EventConsumer.listen] message: $message")
val event = Event.fromJson(message) ?: throw IllegalArgumentException("Invalid event message: $message")
// product_metrics 집계 이벤트 처리
// ...
ack.acknowledge()
}
// product_rank 집계 컨슈머
@KafkaListener(
topics = [
EventType.Topic.PRODUCT_V1_STOCK_ADJUSTED,
EventType.Topic.PRODUCT_V1_LIKE_CHANGED,
EventType.Topic.PRODUCT_V1_VIEWED,
],
groupId = EventType.Group.PRODUCT_RANK_DAY_EVENTS,
)
@Transactional
fun productRankDayEventListen(
message: String,
ack: Acknowledgment,
) {
log.info("[ProductRankV1EventConsumer.productRankDayEventListen] message: $message")
val event = Event.fromJson(message) ?: throw IllegalArgumentException("Invalid event message: $message")
// product_rank 집계 이벤트 처리
// ...
ack.acknowledge()
}
단건 처리의 한계와 배치 리스너 도입
Kafka Consumer가 메시지를 단건으로 처리하면 이벤트가 발생할 때마다 곧바로 Redis ZSET 연산과 DB 갱신이 일어난다.
이 방식은 구현은 단순하지만 문제도 명확하다.
- 이벤트 발생량이 많아질수록 Redis와 DB에 대한 쓰기 연산이 폭발적으로 증가
- 동일한 상품에 대해 짧은 시간 안에 여러 번 연산이 수행되면서 불필요한 중복 처리 발생
- 결과적으로 시스템 부하와 비용이 급격히 상승
이를 해결하기 위해 배치 리스너를 도입했다. 배치 단위로 메시지를 모아 한 번에 처리하면, 애플리케이션 레벨에서 먼저 이벤트를 정제할 수 있고, Redis와 DB에 필요한 최소한의 연산만 수행하게 되며 전체 스루풋을 크게 향상할 수 있다.
즉, 단건 처리의 비효율을 줄이고 안정적인 랭킹 집계를 가능하게 만든 핵심이 바로 배치 리스너였다.
단건 메세지 수신 후 이벤트 처리: ❌
@KafkaListener(
topics = [
EventType.Topic.PRODUCT_V1_STOCK_ADJUSTED,
EventType.Topic.PRODUCT_V1_LIKE_CHANGED,
EventType.Topic.PRODUCT_V1_VIEWED,
],
groupId = EventType.Group.PRODUCT_RANK_DAY_EVENTS,
)
@Transactional
fun productRankDayEventListen(
message: String,
ack: Acknowledgment,
)
배치 리스너로 메세지 목록 수신 후 이벤트 처리: ✅
@KafkaListener(
topics = [
EventType.Topic.PRODUCT_V1_STOCK_ADJUSTED,
EventType.Topic.PRODUCT_V1_LIKE_CHANGED,
EventType.Topic.PRODUCT_V1_VIEWED,
],
groupId = EventType.Group.PRODUCT_RANK_DAY_EVENTS,
containerFactory = KafkaConfig.BATCH_LISTENER // 배치 리스너 적용
)
@Transactional
fun productRankDayEventListen(
records: List<ConsumerRecord<String, String>>,
ack: Acknowledgment,
)
Zip-Map-Sink 활용으로 쓰기 연산 줄이기
배치 리스너로 메시지를 모아 처리한다고 해도, 단순히 배치 내 모든 이벤트를 순회하며 Redis에 기록하면 여전히 불필요한 쓰기 연산이 발생한다.
예를 들어 같은 상품에 대한 여러 이벤트가 같은 배치에 포함될 수 있는데, 이를 그대로 처리하면 결국 Redis에는 같은 상품에 대해 반복적으로 ZSET 연산이 일어나게 된다.
즉, 배치 리스너를 도입했다고 해서 모든 문제가 해결되는 건 아니다. 배치 단위로 들어온 데이터를 어떻게 다루느냐가 성능 최적화의 핵심이었다.
이를 해결하기 위해 zip-map-sink 패턴을 적용했다.
- zip: 동일한 상품 이벤트를 묶고
- map: 최종 스코어로 변환한 뒤
- sink: Redis에 일괄 반영
이 과정을 거치면 한 배치에서 동일 상품에 대한 이벤트를 하나의 업데이트로 합칠 수 있고, 결과적으로 Redis ZSET에 대한 불필요한 중복 쓰기 연산을 크게 줄일 수 있었다.
배치 리스너 후 이벤트를 순회하여 처리 ❌
@KafkaListener(
topics = [
EventType.Topic.PRODUCT_V1_STOCK_ADJUSTED,
EventType.Topic.PRODUCT_V1_LIKE_CHANGED,
EventType.Topic.PRODUCT_V1_VIEWED,
],
groupId = EventType.Group.PRODUCT_RANK_DAY_EVENTS,
containerFactory = KafkaConfig.BATCH_LISTENER
)
@Transactional
fun productRankDayEventListen(
records: List<ConsumerRecord<String, String>>,
ack: Acknowledgment,
) {
log.info("[ProductRankV1EventConsumer.productRankDayEventListen] records: $records")
records.forEach { record ->
val message = record.value()
val event = Event.fromJson(message) ?: throw IllegalArgumentException("Invalid event message: $message")
productRankService.handleEvent(
ProductRankCacheKeyGenerator.generate(LocalDate.now()),
event,
)
}
ack.acknowledge()
}
배치 리스너 후 Zip-Map-Sink 처리 ✅
@KafkaListener(
topics = [
EventType.Topic.PRODUCT_V1_STOCK_ADJUSTED,
EventType.Topic.PRODUCT_V1_LIKE_CHANGED,
EventType.Topic.PRODUCT_V1_VIEWED,
],
groupId = EventType.Group.PRODUCT_RANK_DAY_EVENTS,
containerFactory = KafkaConfig.BATCH_LISTENER,
)
fun productRankDayEventListen(
records: List<ConsumerRecord<String, String>>,
ack: Acknowledgment,
) {
log.info("[ProductRankV1EventConsumer.productRankDayEventListen] records: $records")
val eventMap = records
.map { Event.fromJson(it.value()) ?: throw IllegalArgumentException("Invalid event message: ${it.value()}") }
.groupBy { it.eventType }
eventMap.forEach { (eventType, events) ->
productRankService.handleEvent(
ProductRankCacheKeyGenerator.generate(LocalDate.now()),
eventType,
events.map { it.payload },
)
}
ack.acknowledge()
}
랭킹 점수 산정과 가중치 고민
랭킹의 신뢰도를 높이기 위해 조회수 0.1, 좋아요 0.3, 판매량 0.7의 가중치를 적용했다. 단순 합이 아니라 각 지표의 신호 강도/희소성/조작 난이도/비즈니스 목표를 고려해 비율을 정했다.
- 신호 강도: 판매량은 실제 전환을 반영하므로 신뢰도가 가장 높다 → 0.7
- 희소성/스케일: 조회수는 빈도가 높고 분산이 큼, 좋아요는 중간, 판매량은 희소 → 조회수 가중치는 낮게, 판매량은 높게
- 조작 난이도: 조회수는 상대적으로 조작이 쉬움 → 방어적으로 0.1
- 목표 정렬: “보여주기”보다 “팔리는 것”을 우선 → 판매량 가중 강화
- 신상품 보호: 판매량 우위가 과도하면 신상품이 상위 진입이 어렵다 → 아래의 정규화/감쇠로 완화
- 사용자 행동 관점
- 조회수는 실수로라도 발생할 수 있는 가벼운 행동
- 좋아요는 사용자가 명확한 의도를 가지고 누르는 중간 강도의 행동
- 판매는 가장 명확한 목적과 결정을 동반하는 강력한 행동
조회 (0.1) → 좋아요 (0.3) → 구매 (0.7)
가장 가벼운 행동 가장 강력한 행동
(실수로 발생 가능) (명확한 목적·결제 필요)
사용자의 행동이 가벼울수록 가중치를 낮추고, 무겁고 명확할수록 가중치를 높였다.
이렇게 행동의 무게감을 반영함으로써 단순 지표 합산이 아니라 사용자의 실제 의도와 가치에 기반한 랭킹을 만들 수 있었다.
또한 판매량 Score 계산 로직이 (개수 * 금액) * 가중치 이기 때문에 특정 상품의 Score가 과도하게 높아져 순위 변동이 어렵다는 문제가 있었다.
이를 완화하기 위해 판매량 가중치에는 로그 함수를 적용하여 편차를 줄였다.
object ProductRankScoreWeight {
const val PRODUCT_VIEW_COUNT_WEIGHT: Double = 0.1 // 조회수 가중치
const val PRODUCT_LIKE_COUNT_WEIGHT: Double = 0.3 // 좋아요수 가중치
const val PRODUCT_SALES_COUNT_WEIGHT: Double = 0.7 // 판매량수 가중치
}
object ProductRankScoreCalculator {
fun calculateScoreByViewCount(viewCount: Int): Double {
return viewCount * ProductRankScoreWeight.PRODUCT_VIEW_COUNT_WEIGHT
}
fun calculateScoreByLikeCount(likeCount: Int): Double {
return likeCount * ProductRankScoreWeight.PRODUCT_LIKE_COUNT_WEIGHT
}
fun calculateScoreBySalesCount(salesCount: Int, amount: Long): Double {
return log10((salesCount * amount) * ProductRankScoreWeight.PRODUCT_SALES_COUNT_WEIGHT)
}
}
Daily / Hourly 집계 기능
랭킹은 집계 주기에 따라 성격이 달라진다.
- Hourly 랭킹: 짧은 시간 동안의 변화를 반영해 실시간 트래픽 급등에 빠르게 대응
- Daily 랭킹: 하루 전체 데이터를 기반으로 안정적인 트렌드를 보여줌
이 두 가지 집계를 병행함으로써 사용자에게 즉각적인 인기 상품과 지속적인 트렌드를 동시에 제공할 수 있었다.
처음에는 일별/시간별 집계를 하나의 Consumer에서 처리하려고 했지만, 이렇게 되면 단점이 많았다.
- 집계 주기가 서로 달라 처리 로직이 복잡해진다.
- 특정 주기 집계에 문제가 생기면 다른 집계까지 영향을 받는다.
- 장애 추적도 어려워진다.
그래서 Daily 집계 Consumer와 Hourly 집계 Consumer를 분리했다.
이렇게 분리하니 각 Consumer가 자신이 맡은 집계 주기만 처리하면서 코드가 단순해지고, 장애 전파 위험이 줄어들었으며, 운영과 모니터링도 더 명확해졌다.
일별/시간별 집계 컨슈머 분리: ✅
@KafkaListener(
topics = [
EventType.Topic.PRODUCT_V1_STOCK_ADJUSTED,
EventType.Topic.PRODUCT_V1_LIKE_CHANGED,
EventType.Topic.PRODUCT_V1_VIEWED,
],
groupId = EventType.Group.PRODUCT_RANK_DAY_EVENTS,
containerFactory = KafkaConfig.BATCH_LISTENER,
)
fun productRankDayEventListen(
records: List<ConsumerRecord<String, String>>,
ack: Acknowledgment,
) {
log.info("[ProductRankV1EventConsumer.productRankDayEventListen] records: $records")
val eventMap = records
.map { Event.fromJson(it.value()) ?: throw IllegalArgumentException("Invalid event message: ${it.value()}") }
.groupBy { it.eventType }
eventMap.forEach { (eventType, events) ->
productRankService.handleEvent(
ProductRankCacheKeyGenerator.generate(LocalDate.now()),
eventType,
events.map { it.payload },
)
}
ack.acknowledge()
}
@KafkaListener(
topics = [
EventType.Topic.PRODUCT_V1_STOCK_ADJUSTED,
EventType.Topic.PRODUCT_V1_LIKE_CHANGED,
EventType.Topic.PRODUCT_V1_VIEWED,
],
groupId = EventType.Group.PRODUCT_RANK_HOUR_EVENTS,
containerFactory = KafkaConfig.BATCH_LISTENER,
)
fun productRankHourEventListen(
records: List<ConsumerRecord<String, String>>,
ack: Acknowledgment,
) {
log.info("[ProductRankV1EventConsumer.productRankHourEventListen] records: $records")
val eventMap = records
.map { Event.fromJson(it.value()) ?: throw IllegalArgumentException("Invalid event message: ${it.value()}") }
.groupBy { it.eventType }
eventMap.forEach { (eventType, events) ->
productRankService.handleEvent(
ProductRankCacheKeyGenerator.generate(LocalDateTime.now()),
eventType,
events.map { it.payload },
)
}
ack.acknowledge()
}
일별/시간별 랭킹 TTL 설정
Redis ZSET은 랭킹을 빠르게 조회할 수 있다는 장점이 있지만, 시간이 지날수록 데이터가 계속 쌓이면 메모리 압박이 발생한다. 이를 방지하기 위해 각 랭킹 데이터에 TTL을 설정했다.
- 일별 랭킹: Duration.ofDays(2) → 일별 랭킹은 이틀 뒤 자동 삭제
- 시간별 랭킹: Duration.ofHours(2) → 시간별 랭킹은 2시간 뒤 자동 삭제
이렇게 TTL을 두면 불필요하게 오래된 데이터가 자연스럽게 정리되면서, 메모리 사용량을 안정적으로 유지할 수 있다.
object ProductRankCacheKeyGenerator {
fun generate(date: LocalDateTime): CacheKey {
val formatter = DateTimeFormatter.ofPattern("yyyyMMddHH")
val formattedDate = formatter.format(date)
return CacheKey(
CacheNames.PRODUCT_RANK_ALL_KEY_PREFIX,
formattedDate,
// ttl 2시간 설정
Duration.ofHours(2),
)
}
fun generate(date: LocalDate): CacheKey {
val formatter = DateTimeFormatter.ofPattern("yyyyMMdd")
val formattedDate = formatter.format(date)
return CacheKey(
CacheNames.PRODUCT_RANK_ALL_KEY_PREFIX,
formattedDate,
// ttl 2일로 설정
Duration.ofDays(2),
)
}
}
Carry-Over 기능 추가
랭킹을 단순히 “당일/당시 간 데이터만”으로 집계하면 문제가 생긴다.
- 신규 데이터가 적은 시간대에는 랭킹이 텅 비어버리거나, 상위권이 급격히 변동
- 서비스 초반/비인기 상품은 데이터가 거의 없어 순위 자체가 나오지 않음
- 결국 사용자가 보는 랭킹의 연속성이 깨지고 신뢰도가 떨어짐
이 문제를 해결하기 위해 Carry-Over 기능을 도입했다. 전날(또는 전 시간)의 데이터를 일부 복제하여 새로운 집계에 반영하는 방식이다.
- 복제: 이전 집계 상위 N개 데이터를 가져옴
- 노멀라이즈: 기존 점수에 일정 수치(0.01)을 곱해 반영
여기서 노멀라이즈를 하는 이유는 과거 영향은 유지하되, 현재 데이터에 밀려나도록 하기 위해서다.
그대로 반영하면 이전 집계 데이터가 과도하게 현재 순위에 남아 랭킹이 왜곡됨
너무 낮게 반영하면 carry-over 의미가 없어짐
따라서 소수점 계수(0.01)로 "미세한 가중치"를 주어 연속성은 보장하면서도 최신성이 유지되도록 했다.
일별/시간별 상품 랭킹 정보 carry-over 적용: ✅
@Scheduled(cron = "0 50 11 * * *", zone = "Asia/Seoul")
fun carryOverTomorrowRank() {
log.info("[ProductRankScheduler.carryOverTomorrowRank] start")
// 상위 20개 상품 랭킹을 carry-over, 점수는 1% 감소
productRankCarryOverService.carryOverTomorrowRank(20, 0.01)
log.info("[ProductRankScheduler.carryOverTomorrowRank] end")
}
@Scheduled(cron = "0 50 * * * *", zone = "Asia/Seoul")
fun carryOverNextHourRank() {
log.info("[ProductRankScheduler.carryOverNextHourRank] start")
// 상위 20개 상품 랭킹을 carry-over, 점수는 1% 감소
productRankCarryOverService.carryOverNextHourRank(20, 0.01)
log.info("[ProductRankScheduler.carryOverNextHourRank] end")
}
// ProductRankCarryOverService
fun carryOverTomorrowRank(topN: Int, normalize: Double) {
log.info("[ProductRankCarryOverService.carryOverTomorrowRank] topN: {}, normalize: {}", topN, normalize)
val today = LocalDate.now()
val tomorrow = today.plusDays(1)
val productRanksWithScore = cacheRepository.findTopRankByScoreDesc(ProductRankCacheKeyGenerator.generate(today), 0, topN)
val normalizedTuples = productRanksWithScore.map { (productId, score) ->
DefaultTypedTuple(productId, score * normalize)
}.toSet()
if (normalizedTuples.isEmpty()) {
log.info("[ProductRankCarryOverService.carryOverTomorrowRank] 상품 랭킹이 비어있습니다. 오늘 날짜: {}", today)
return
}
cacheRepository.zAddAll(ProductRankCacheKeyGenerator.generate(tomorrow), normalizedTuples)
}
fun carryOverNextHourRank(topN: Int, normalize: Double) {
log.info("[ProductRankCarryOverService.carryOverNextHourRank] topN: {}, normalize: {}", topN, normalize)
val now = LocalDateTime.now()
val nextHour = now.plusHours(1)
val productRanksWithScore = cacheRepository.findTopRankByScoreDesc(ProductRankCacheKeyGenerator.generate(now), 0, topN)
val normalizedTuples = productRanksWithScore.map { (productId, score) ->
DefaultTypedTuple(productId, score * normalize)
}.toSet()
if (normalizedTuples.isEmpty()) {
log.info("[ProductRankCarryOverService.carryOverNextHourRank] 상품 랭킹이 비어있습니다. 현재 시간: {}", now)
return
}
cacheRepository.zAddAll(ProductRankCacheKeyGenerator.generate(nextHour), normalizedTuples)
}
Carry-Over와 TTL 문제
Carry-Over를 도입한 뒤 단순히 Duration 기반 TTL을 사용하면, Carry-Over 실행 시각이 TTL 기준점이 되어버려 캐시 공백이 생기는 문제가 있었다.
예를 들어, 09/11 23:50에 일별 Carry-Over가 실행되면 TTL은 원래 09/14 00:00이어야 하지만, 단순 Duration 적용 시 09/13 23:50에 만료된다.
이 경우 09/13 23:50부터 랭킹 데이터가 사라져 버리는 공백이 발생한다.
이를 막기 위해 TTL을 고정된 기준 시각(anchor time)에 맞춰 보정하는 방식을 적용했다.
- 일별 랭킹 (정오 기준, +2일 자정 만료)
Carry-Over가 정오 이전에 실행되면 → D+2 자정 만료
예: 09/11 11:00 실행 → 09/13 00:00 만료
Carry-Over가 정오 이후에 실행되면 → D+3 자정 만료
예: 09/11 13:05 실행 → 09/14 00:00 만료
Carry-over 스케줄러 시각인 23:50에 실행되면 → D+3 자정 만료
예: 09/11 23:50 실행 → 09/14 00:00 만료
- 시간별 랭킹 (30분 기준, +2시간 만료)
0~29분 실행 → 같은 시의 30분을 기준으로 TTL 설정
예: 10:17 실행 → 기준 10:00 → 만료 12:00
30~59분 실행 → 다음 시의 00분을 기준으로 TTL 설정
예: 10:41 실행 → 기준 11:00 → 만료 13:00
Carry-over 스케줄러 시각인 11:50에 실행
예: 11:50 실행 → 기준 12:00 → 만료 14:00
즉, Carry-Over 실행 시간이 언제든 간에, TTL은 항상 해당 기준 시각을 만료 시각으로 잡도록 보정했다.
object ProductRankCacheKeyGenerator {
fun generate(date: LocalDateTime): CacheKey {
val formatter = DateTimeFormatter.ofPattern("yyyyMMddHH")
val formattedDate = formatter.format(date)
return CacheKey(
CacheNames.PRODUCT_RANK_ALL_KEY_PREFIX,
formattedDate,
// ttl 2시간 설정
TimeCalculatorUtils.calculateDurationByHours(LocalDateTime.now(), 2),
)
}
fun generate(date: LocalDate): CacheKey {
val formatter = DateTimeFormatter.ofPattern("yyyyMMdd")
val formattedDate = formatter.format(date)
return CacheKey(
CacheNames.PRODUCT_RANK_ALL_KEY_PREFIX,
formattedDate,
// ttl 2일로 설정
TimeCalculatorUtils.calculateDurationByDays(LocalDateTime.now(), 2),
)
}
}
object TimeCalculatorUtils {
/**
* N일 뒤 자정(00:00)까지 TTL 계산
* 기준: 정오(12:00)
* @param days 몇 일 뒤 자정까지 만료시킬지
*
* 예:
* - 25.09.11 11:50 실행, days=2 → 25.09.13 00:00 만료
* - 25.09.11 12:10 실행, days=2 → 25.09.14 00:00 만료 (+1일)
* - 25.09.11 23:50 실행, days=2 → 25.09.14 00:00 만료 (+1일)
*/
fun calculateDurationByDays(now: LocalDateTime, days: Long): Duration {
val noon = LocalTime.NOON
val adjustedDays = if (now.toLocalTime().isAfter(noon)) days + 1 else days
val targetMidnight = now.toLocalDate()
.plusDays(adjustedDays)
.atStartOfDay()
return Duration.between(now, targetMidnight)
}
/**
* 현재 시간 기준으로 TTL을 hours 시간 뒤 "정시 시각"으로 설정
* 30분 기준으로 반올림 처리
*
* @param hours 몇 시간 뒤를 기준으로 TTL을 설정할지
*
* 예:
* - 01:20 실행, hours=2 → base=03:20 → 03:00 만료 (30분 이전 → 그대로)
* - 01:50 실행, hours=2 → base=03:50 → 04:00 만료 (30분 이후 → 올림)
* - 02:50 실행, hours=2 → base=04:50 → 05:00 만료 (30분 이후 → 올림)
* - 02:10 실행, hours=2 → base=04:10 → 04:00 만료 (30분 이전 → 그대로)
*/
fun calculateDurationByHours(now: LocalDateTime, hours: Long): Duration {
val base = now.plusHours(hours)
// 30분 기준 반올림 처리
val rounded = if (base.minute < 30) {
base.withMinute(0).withSecond(0).withNano(0)
} else {
base.withMinute(0).withSecond(0).withNano(0).plusHours(1)
}
return Duration.between(now, rounded)
}
}
DB 백업: Top 100 저장
Redis TTL이 지나면 랭킹 데이터는 사라지므로 이력 조회/분석을 위해 RDBMS로 백업이 필요하다. 다만 모든 랭킹을 전량 저장하면 문제가 커진다.
- 비용/용량 폭증
시간별·일별 × 카테고리/채널 × 전체 상품 수 = 급격한 행 수 증가 → 스토리지/백업 비용 증가
- 분석 효용의 체감
상위 구간을 벗어나면 노이즈가 커지고, 의사결정(노출/프로모션)에 쓰이는 정보는 대부분 Top-N에 집중됨
- 쿼리/지표 성능 저하
거대한 테이블은 집계/리포트 성능을 떨어뜨리고, 인덱스 관리 비용과 배치 소요 시간이 증가
- 데이터 품질/중복 이슈
하위권은 변동성이 커서 스냅샷의 의미가 희석되고, 중복/누락 검증에 드는 운영 비용만 늘어남
그래서 일별/시간별 상위 100건만 스냅샷으로으로 저장한다.
이렇게 하면 저장 공간을 크게 줄이면서도, 실무에서 실제로 활용되는 분석/리포팅·리그레션 체크·AB 테스트 회고에 필요한 신뢰 구간을 충분히 보존할 수 있다.
또는 데이터를 상위 10%로 설정할 수도 있다.
@Component
class ProductRankBackupScheduler(
private val productRankService: ProductRankService,
) {
private val log = LoggerFactory.getLogger(this::class.java)
@Scheduled(cron = "1 0 0 * * *", zone = "Asia/Seoul")
fun backupYesterdayProductRank() {
log.info("[ProductRankBackupScheduler.backupYesterdayProductRank] start")
productRankService.backupYesterdayProductRank(LocalDate.now().minusDays(1))
log.info("[ProductRankBackupScheduler.backupYesterdayProductRank] end")
}
@Scheduled(cron = "1 0 * * * *", zone = "Asia/Seoul")
fun backupPrevHourProductRank() {
log.info("[ProductRankBackupScheduler.backupPrevHourProductRank] start")
productRankService.backupPrevHourProductRank(LocalDateTime.now())
log.info("[ProductRankBackupScheduler.backupPrevHourProductRank] end")
}
}
// ProductRankService
fun backupYesterdayProductRank(prevDate: LocalDate) {
log.info("[ProductRankService.backupYesterdayProductRank] prevDate: $prevDate")
val cacheKey = ProductRankCacheKeyGenerator.generate(prevDate)
val yesterdayProductRanks = cacheRepository.findTopRankByScoreDesc(
cacheKey,
0,
100,
)
val productRankDailies = mutableListOf<ProductRankDaily>()
yesterdayProductRanks.onEachIndexed { index, productRank ->
productRankDailies.add(
ProductRankDaily(
productRank.key,
cacheKey.key,
index + 1,
productRank.value,
),
)
}
productRankDailyRepository.saveAll(productRankDailies)
}
fun backupPrevHourProductRank(prevHours: LocalDateTime) {
log.info("[ProductRankService.backupPrevHourProductRank] prevHours: $prevHours")
val cacheKey = ProductRankCacheKeyGenerator.generate(prevHours)
val prevHourProductRanks = cacheRepository.findTopRankByScoreDesc(
cacheKey,
0,
100,
)
val productRankHourlies = mutableListOf<ProductRankHourly>()
prevHourProductRanks.onEachIndexed { index, productRank ->
productRankHourlies.add(
ProductRankHourly(
productRank.key,
cacheKey.key,
index + 1,
productRank.value,
),
)
}
productRankHourlyRepository.saveAll(productRankHourlies)
}
실시간 랭킹 Simulation
마지막으로, 단순히 랭킹 데이터를 제공하는 것에서 끝나지 않고, htmx + 스케줄러를 활용해 실시간 랭킹 시뮬레이션 화면을 구성했다.
htmx를 사용해 전체 페이지 리로드 없이 화면 일부만 동적으로 업데이트
스케줄러로 일정 주기마다 Redis에서 최신 랭킹을 불러와 반영
특히 랭킹은 단순 API 응답만 맞는지 확인하는 것 이상으로, 사용자에게 보이는 화면이 실시간으로 잘 갱신되는지가 중요하다고 생각한다. 그래서 실제 기능 동작 테스트와 더불어 시뮬레이션 환경을 구성해, 랭킹 데이터가 화면에 어떻게 반영되는지 체감하면서 검증할 수 있도록 했다.
마치며
이번 과정은 단순히 지표를 집계하는 수준에서 끝나지 않았다.
배치 처리로 스루풋을 최적화하고
Zip-Map-Sink 패턴으로 Redis 쓰기 연산을 줄였으며
Carry-Over와 TTL 보정으로 랭킹의 연속성과 안정성을 확보했고
DB 백업과 시뮬레이션 환경으로 분석 가능성과 가시성을 강화했다.
이렇게 단계별로 개선을 거치며, 단순 집계를 넘어 실시간성과 안정성을 동시에 갖춘 랭킹 시스템을 구축할 수 있었다.
'Redis' 카테고리의 다른 글
| DB 인덱스 다음 단계, 캐시로 성능 높이기 (5) | 2025.08.18 |
|---|---|
| Redis 설치 (0) | 2023.04.23 |
댓글