Skip to content

[feat][queue-service] program Kafka 연동 + ProgramMeta Aggregate 추가#30

Merged
rlaxxwls13 merged 9 commits into
devfrom
feat/27-program-event-handler
May 17, 2026
Merged

[feat][queue-service] program Kafka 연동 + ProgramMeta Aggregate 추가#30
rlaxxwls13 merged 9 commits into
devfrom
feat/27-program-event-handler

Conversation

@rlaxxwls13
Copy link
Copy Markdown
Contributor

@rlaxxwls13 rlaxxwls13 commented May 15, 2026

🌱 설명

이 PR에서 어떤 작업을 했는지 간략하게 설명해주세요.

program-service 의 도메인 이벤트를 Kafka 로 수신해 queue-service 의 program 상태 캐시 (ProgramMeta) 를 갱신합니다.

ProgramMeta 는 QueueToken 과 다른 생명주기 / 책임을 가지므로 별도 Aggregate 로 분리했으며, 두 Aggregate 는 도메인 이벤트로 통신합니다.

주요 변경

  • programmeta/ Aggregate 신규 추가
    • ProgramMeta (Aggregate Root, Read Model)
    • ProgramKafkaConsumer (3 개 토픽 수신)
    • RedisProgramMetaRepository (Redis 저장)
    • ProgramEvents (도메인 이벤트 발행 인터페이스)
  • queuetoken/ 정정
    • QueueTokenRepository.deleteAllByProgramId 추가
    • ProgramCancelledEventListener 추가 (programmeta 이벤트 수신)
  • shared/event/ProgramCancelledEvent 추가 (두 Aggregate 공유)
  • common-messaging 의존성 + Flyway p_inbox 테이블 + Kafka 설정

처리 흐름

program.cancelled (Kafka)
↓
ProgramKafkaConsumer → ProgramMetaService.handleCancelled
↓
ProgramMeta.cancel() + Repository.save
↓
ProgramEvents.publishProgramCancelled
↓ (ApplicationEvent)
ProgramCancelledEventListener (queuetoken 측)
↓
QueueTokenRepository.deleteAllByProgramId → 모든 토큰 정리

📌 관련 이슈

이 PR과 연관된 이슈 번호를 작성해주세요. (이슈 없으면 생략 가능)
Related to #27

💻 커밋 유형

해당하는 항목에 x를 채워주세요.

  • feat : 새로운 기능 추가
  • fix : 버그 수정 (긴급 핫픽스 포함)
  • refactor : 리팩토링
  • chore : 빌드 설정, 의존성 업데이트 등
  • docs : 문서 수정
  • comment : 주석 추가 및 변경
  • rename : 파일 / 폴더 이동 또는 이름 변경
  • remove : 파일 삭제
  • test : 테스트 코드 추가 / 수정

📝 체크리스트

PR 올리기 전에 아래 항목을 확인해주세요.

  • develop 브랜치 기준으로 feature 브랜치를 생성했나요?
  • 코드 컨벤션 및 스타일 가이드를 준수했나요?
  • 관련 이슈와 연결했나요?
  • 설명이 필요한 부분 / 어려운 부분 / 공유가 필요한 부분에 주석을 추가했습니다.
  • 제가 작성한 코드를 스스로 리뷰했습니다.
  • 기존 테스트와 충돌하지 않음을 확인했나요?

📚 추가 설명

리뷰어가 참고해야 할 내용, 첨부 이미지 등이 있다면 자유롭게 추가해주세요. (선택)

미구현 / 미래 작업

  • 통합 테스트 (Kafka + Redis testcontainers) 별도 PR
  • deleteAllByProgramId 의 SCAN 본질을 미래 인덱스 키로 개선 (성능)
  • Redis 휘발 시 ProgramMeta 복구 (Kafka 재구독 의존)
  • 공통 모듈 설정 아직 미적용

Summary by CodeRabbit

  • 새로운 기능

    • 프로그램 메타(오픈/클로즈 시간) 생성·갱신·취소 처리 추가
    • 프로그램 취소 시 대기/입장 토큰 자동 정리 이벤트 발행/처리 추가
    • Kafka 기반 토픽 수신으로 프로그램 상태 동기화 지원
  • 개선사항

    • 분산 추적(Micrometer/Zipkin) 및 모니터링 개선
    • Flyway 기반 DB 마이그레이션 추가 및 테스트용 Kafka/DB 환경 구성 강화

Review Change Stack

- queuetoken과 programmeta로 분리함

Related to #27
- programmeta Aggregate 신규 추가 (ProgramMeta Read Model)
- program.created / time.updated / cancelled 토픽 수신
- queuetoken 과 도메인 이벤트로 통신 (ProgramCancelledEvent)
- common-messaging Outbox 스케줄러 비활성 설정

Related to #27
@rlaxxwls13 rlaxxwls13 self-assigned this May 15, 2026
@rlaxxwls13 rlaxxwls13 linked an issue May 15, 2026 that may be closed by this pull request
14 tasks
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 15, 2026

📝 Walkthrough

워크스루

이 PR은 프로그램 취소 시 대기 토큰을 정리하기 위한 도메인 이벤트 기반 아키텍처를 도입합니다. 프로그램 메타 집계를 새로 만들어 프로그램 생명주기를 관리하고, Kafka 소비를 통해 외부 이벤트를 처리하며, 도메인 이벤트로 큐 토큰 정리를 연계합니다. 동시에 큐 토큰 코드베이스를 전용 패키지로 구조화합니다.

변경사항

프로그램 메타 도메인 및 취소 기반 토큰 정리

레이어 / 파일 설명
프로그램 메타 도메인 모델
src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramStatus.java, ProgramId.java, ProgramMeta.java
DRAFTCANCELLED 상태를 정의하고, 프로그램 식별자 VO, 스케줄 기반 활성 상태 판정 로직을 갖춘 집계 루트를 구성합니다.
프로그램 메타 저장소
src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMetaRepository.java, programmeta/infrastructure/redis/RedisProgramMetaRepository.java
저장, 조회, 활성 프로그램 필터링 계약을 정의하고 Redis로 구현하여 프로그램 메타데이터를 영속화합니다.
프로그램 메타 서비스 및 Command
src/main/java/com/firstticket/queueservice/programmeta/application/ProgramMetaService.java, programmeta/application/dto/{CreateProgramMetaCommand, UpdateProgramTimeCommand, CancelProgramCommand}.java
프로그램 생성, 시간 갱신, 취소 커맨드를 정의하고 각각을 처리하는 애플리케이션 로직을 구현합니다.
Kafka 기반 프로그램 이벤트 소비
src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/ProgramKafkaConsumer.java, programmeta/infrastructure/messaging/payload/{ProgramCreatedPayload, ProgramTimeUpdatedPayload, ProgramCancelledPayload}.java
외부 program.\* 토픽을 수신하여 Payload를 역직렬화하고 Command로 변환한 후 서비스 호출 및 오프셋 관리를 처리합니다.
프로그램 취소 도메인 이벤트
src/main/java/com/firstticket/queueservice/programmeta/domain/event/ProgramEvents.java, programmeta/infrastructure/event/ProgramEventsImpl.java, src/main/java/com/firstticket/queueservice/shared/event/ProgramCancelledEvent.java
프로그램 취소 시 도메인 이벤트를 발행하는 계약 및 Spring ApplicationEventPublisher 기반 구현과 이벤트 객체를 정의합니다.
프로그램 취소 이벤트 리스너 및 토큰 정리
src/main/java/com/firstticket/queueservice/queuetoken/infrastructure/event/ProgramCancelledEventListener.java, queuetoken/domain/QueueTokenRepository.java (deleteAllByProgramId 추가), queuetoken/infrastructure/redis/RedisQueueTokenRepository.java (deleteAllByProgramId 추가)
프로그램 취소 이벤트를 수신하여 Redis에서 해당 프로그램의 모든 대기/입장 토큰을 일괄 정리합니다.
큐 토큰 패키지 구조 개편
src/main/java/com/firstticket/queueservice/queuetoken/*
기존 com.firstticket.queueservice.* 패키지의 모든 큐 토큰 관련 클래스(도메인, 애플리케이션, 인프라, 프리젠테이션)를 com.firstticket.queueservice.queuetoken.* 계층으로 재구성합니다.
빌드 및 인프라 설정
build.gradle, src/main/resources/application.yml, src/main/resources/db/migration/V1__create_inbox_table.sql
GitHub 패키지 저장소에서 공통-JPA 및 공통-메시징 모듈을 추가하고, Kafka 및 모니터링(Micrometer/Zipkin) 의존성을 포함하며, Kafka 프로파일 활성화와 inbox 테이블 마이그레이션을 추가합니다.
테스트 패키지 경로 업데이트
src/test/java/com/firstticket/queueservice/queuetoken/*/**.java
큐 토큰 구조 개편에 맞춰 테스트 클래스 패키지 경로와 import를 일관되게 갱신합니다.

예상 코드 리뷰 노력

🎯 4 (Complex) | ⏱️ ~60 minutes

관련 이슈

  • first-ticket/queue-service#27: 이 PR은 program.created, program.time.updated, program.cancelled 토픽 소비자와 프로그램 취소 시 대기 토큰 정리 동작을 직접 구현하여 해당 이슈의 요구사항을 충족합니다.

관련 PR

  • first-ticket/queue-service#8: 메인 PR의 QueueTokenRepository/RedisQueueTokenRepository 인터페이스 및 구현 변경이 PR #8의 Redis 기반 토큰 저장소 영역과 동일 코드 범위에서 연계되므로 관련이 있습니다.
  • first-ticket/queue-service#16: 메인 PR의 QueueTokenController 패키지 이동(com.firstticket.queueservice.queuetoken.presentation.*)이 PR #16의 REST Docs 기반 컨트롤러 테스트와 직접 연결되므로 관련이 있습니다.

제안된 라벨

💻 feature

제안된 검토자

  • sweetRainShin
  • straycat405
  • DDoori
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 56.10% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed PR 제목이 주요 변경 사항을 명확하게 설명합니다. 'program Kafka 연동'과 'ProgramMeta Aggregate 추가'라는 두 가지 핵심 변경 사항을 간결하게 반영했습니다.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/27-program-event-handler

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/main/java/com/firstticket/queueservice/queuetoken/domain/QueueToken.java (1)

55-68: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

restore/admit에서 entryToken 불변식 검증이 빠져 있습니다.

Line 74-78에서 null/blank entryToken으로 ADMITTED 전이가 가능하고, Line 55-68 restore도 같은 조합을 허용합니다. Aggregate가 스스로 상태 불변식을 보장하도록 검증을 추가해 주세요.

제안 수정안
     public static QueueToken restore(
@@
         Objects.requireNonNull(issuedAt, "IssuedAt은 필수입니다");
         Objects.requireNonNull(status, "TokenStatus는 필수입니다");
+        if (status == TokenStatus.ADMITTED && (entryToken == null || entryToken.isBlank())) {
+            throw new IllegalArgumentException("ADMITTED 상태의 entryToken은 비어 있을 수 없습니다");
+        }
         return new QueueToken(id, userId, programId, issuedAt, status, entryToken);
     }
@@
     public void admit(String entryToken) {
         ensureWaiting();
+        if (entryToken == null || entryToken.isBlank()) {
+            throw new IllegalArgumentException("entryToken은 비어 있을 수 없습니다");
+        }
         this.status = TokenStatus.ADMITTED;
         this.entryToken = entryToken;
     }

As per coding guidelines, "도메인 객체가 자신의 불변식(invariant)을 스스로 보장하는지 확인 (생성자/정적 팩토리에서 검증)".

Also applies to: 74-78

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/java/com/firstticket/queueservice/queuetoken/domain/QueueToken.java`
around lines 55 - 68, The restore and admit flows in QueueToken allow a
null/blank entryToken when TokenStatus == ADMITTED; add invariant checks inside
the QueueToken static factory restore (and the admit method/constructor path) to
validate entryToken is non-null and not blank whenever status ==
TokenStatus.ADMITTED (throw NullPointerException/IllegalArgumentException with a
clear message); update the QueueToken class so that both restore(QueueTokenId,
UserId, ProgramId, IssuedAt, TokenStatus, String) and the admit transition
method enforce the same validation before constructing or mutating the
aggregate.
src/main/java/com/firstticket/queueservice/queuetoken/config/QueueProperties.java (1)

16-32: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

waitingTtl 양수 검증이 없어 잘못된 설정이 그대로 반영됩니다.

Line 23은 @NotNull만 적용되어 PT0S/음수 TTL이 통과할 수 있습니다. 이 경우 토큰이 즉시 만료되거나 비정상 동작할 수 있어요. 생성자에서 양수 검증을 추가해 주세요.

제안 수정안
 `@Validated`
 `@ConfigurationProperties`(prefix = "queue.token")
 public record QueueProperties(
@@
     `@NotNull`
     Duration waitingTtl,
@@
     `@Min`(1)
     int admissionBatchSize

 ) {
+    public QueueProperties {
+        if (waitingTtl == null || waitingTtl.isZero() || waitingTtl.isNegative()) {
+            throw new IllegalArgumentException("queue.token.waiting-ttl must be positive");
+        }
+    }
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@src/main/java/com/firstticket/queueservice/queuetoken/config/QueueProperties.java`
around lines 16 - 32, QueueProperties currently only has `@NotNull` on waitingTtl,
allowing Duration.ZERO or negative durations; add a compact record constructor
in QueueProperties that validates waitingTtl is non-null and strictly greater
than Duration.ZERO (e.g., if (waitingTtl == null ||
waitingTtl.compareTo(Duration.ZERO) <= 0) throw new
IllegalArgumentException("waitingTtl must be positive")), leaving
admissionBatchSize validation as-is; place the constructor inside the record to
enforce the invariant at creation.
src/main/java/com/firstticket/queueservice/queuetoken/domain/vo/ProgramId.java (1)

1-28: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

공유 Value Object 패키지로 ProgramId 추출 필요

queuetokenprogrammeta 두 aggregate에서 동일한 내용의 ProgramId VO가 중복 정의되었습니다. 두 파일 모두 record 구조, Compact Constructor 검증, 팩토리 메서드가 동일합니다.

현재 구조에서 각 aggregate가 자신의 로컬 ProgramId를 사용하므로 즉각적인 타입 충돌은 없으나, DDD에서 여러 aggregate가 동일한 외부 개념(program-service의 Program)을 참조할 때는 Shared Kernel 패턴을 적용해야 합니다. 이렇게 하면:

  • 명시적 공유 경계(Shared Kernel)를 표현
  • 한 곳에서 변경하면 모든 aggregate에 자동 반영
  • 향후 aggregate 간 통신 시 타입 안정성 확보

src/main/java/com/firstticket/queueservice/shared/vo/ProgramId.java로 추출하고, 각 aggregate의 정의를 제거한 후 shared 버전을 import하도록 수정하세요.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@src/main/java/com/firstticket/queueservice/queuetoken/domain/vo/ProgramId.java`
around lines 1 - 28, Extract the duplicated ProgramId record into a shared VO
package and update usages: create a single ProgramId record in the shared
package (retain the record declaration, compact constructor null check, static
factory methods of(UUID) and fromString(String), and asString()), remove the
duplicate ProgramId files from queuetoken and programmeta, and change those
modules to import the shared ProgramId; ensure references to ProgramId.of(...),
ProgramId.fromString(...), and ProgramId#asString() continue to compile and
adjust package declarations accordingly.
🧹 Nitpick comments (1)
src/main/resources/db/migration/V1__create_inbox_table.sql (1)

1-7: ⚡ Quick win

processed_at 컬럼에 인덱스 추가를 권장합니다.

p_inbox 테이블은 Kafka 메시지 중복 제거를 위한 inbox 패턴 구현으로 보입니다. 향후 오래된 레코드를 정리하거나 처리 이력을 조회할 때 processed_at 기반 쿼리가 필요할 가능성이 높습니다.

📊 제안하는 인덱스 추가
 CREATE TABLE p_inbox
 (
     message_id   UUID      NOT NULL,
     processed_at TIMESTAMP NOT NULL,
 
     CONSTRAINT pk_inbox PRIMARY KEY (message_id)
 );
+
+CREATE INDEX idx_inbox_processed_at ON p_inbox (processed_at);

이 인덱스는 다음과 같은 쿼리를 효율적으로 지원합니다:

  • 주기적인 레코드 정리: DELETE FROM p_inbox WHERE processed_at < ?
  • 처리 이력 모니터링: SELECT COUNT(*) FROM p_inbox WHERE processed_at > ?
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main/resources/db/migration/V1__create_inbox_table.sql` around lines 1 -
7, Add an index on the processed_at column of the p_inbox table to speed
time-based deletes and queries: update the migration that creates p_inbox
(V1__create_inbox_table.sql) to create an index on processed_at (e.g., CREATE
INDEX idx_p_inbox_processed_at ON p_inbox(processed_at)) so DELETE FROM p_inbox
WHERE processed_at < ? and SELECT ... WHERE processed_at > ? run efficiently.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@src/main/java/com/firstticket/queueservice/programmeta/application/dto/CancelProgramCommand.java`:
- Around line 15-19: CancelProgramCommand.of currently calls
ProgramStatus.valueOf(status) (and CreateProgramMetaCommand does the same),
which throws IllegalArgumentException on unexpected casing/format from Kafka;
update these constructors to validate and normalize incoming status: either
implement a ProgramStatusParser.parse(String) that normalizes (trim, upper-case,
maps known variants) and returns an Optional/throws a clear domain exception, or
wrap ProgramStatus.valueOf(status) in a try-catch converting
IllegalArgumentException into a domain-level exception (e.g.,
InvalidProgramStatusException) with a clear message; update
CancelProgramCommand.of and CreateProgramMetaCommand to use
ProgramStatusParser.parse(...) or the try-catch path so external message formats
are validated at the application boundary.

In
`@src/main/java/com/firstticket/queueservice/programmeta/application/dto/CreateProgramMetaCommand.java`:
- Around line 21-32: CreateProgramMetaCommand.of (and similarly
CancelProgramCommand) currently calls ProgramStatus.valueOf(status) which throws
on null/blank/lowercase input; replace this with a small parser that normalizes
the input (null-check, trim(), toUpperCase()) and then calls
ProgramStatus.valueOf, and throw an explicit IllegalArgumentException with a
clear message when status is null/blank or not a valid enum; update
CreateProgramMetaCommand.of to call the new parseStatus helper instead of
ProgramStatus.valueOf(status), do the same change in CancelProgramCommand, and
apply the identical normalization/validation in
RedisProgramMetaRepository.deserialize() where status is parsed.

In
`@src/main/java/com/firstticket/queueservice/programmeta/application/ProgramMetaService.java`:
- Around line 29-39: handleCreated currently unconditionally creates and saves a
ProgramMeta, which allows a delayed program.created event to overwrite an
existing CANCELLED (or otherwise final) entity; update handleCreated to first
load existing via programMetaRepository.findById(command.programId()), and if a
ProgramMeta exists then either no-op or enforce your state-transition rule
(e.g., if existing.getStatus() == ProgramStatus.CANCELLED then return/do not
save) instead of overwriting; only call ProgramMeta.of(...) and
programMetaRepository.save(...) when no existing entity is present or when the
transition is allowed.

In
`@src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMeta.java`:
- Around line 36-43: Add invariant checks to the ProgramMeta factory/ctor so
invalid state cannot be created: in ProgramMeta.of(...) and the ProgramMeta
constructor validate that status is not null and that openAt is not after
closeAt (openAt.compareTo(closeAt) <= 0), and throw an appropriate runtime
exception (e.g., IllegalArgumentException or NullPointerException) with a clear
message when violated; ensure all other overloads/factories that construct
ProgramMeta perform the same checks so events cannot create a ProgramMeta with
null status or openAt > closeAt.

In
`@src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/ProgramKafkaConsumer.java`:
- Around line 45-47: In ProgramKafkaConsumer, replace the broad catch(Exception
e) handlers around the record processing (e.g., the block that logs
"program.created 처리 실패. record={}", and similar catches at the other locations)
with exception classification: detect non-retryable errors (JSON parse/schema
validation exceptions, deserialization errors) and for those call ack/commit or
forward the record to a DLT handler, logging the detailed error; for retryable
transient errors (network/timeouts from external calls) allow the exception to
propagate or trigger controlled retry logic with backoff and then nack/requeue
as appropriate; ensure external calls used in the processing have explicit
timeouts and retry/recovery policies; update the catch sites to log full error
details and either ack+skip/DLT for non-retryable or rethrow/retry for transient
errors.

In
`@src/main/java/com/firstticket/queueservice/programmeta/infrastructure/redis/RedisProgramMetaRepository.java`:
- Around line 131-142: The deserialize method currently only catches
JsonProcessingException, but calls like UUID.fromString, ProgramStatus.valueOf
and parseDateTime can throw runtime exceptions (IllegalArgumentException,
DateTimeParseException, NullPointerException) that would escape; update
deserialize in RedisProgramMetaRepository to catch those parsing/runtime
exceptions as well and rethrow them as the same IllegalStateException (or
another appropriate infra-level exception) with the original exception chained
and a clear message so malformed/partial data does not propagate as unchecked
runtime errors from ProgramMeta.of / parseDateTime.
- Line 133: In RedisProgramMetaRepository replace the unchecked call
objectMapper.readValue(json, Map.class) with a typed readValue using Jackson's
TypeReference to preserve generic information (i.e., readValue(json, new
TypeReference<Map<String,String>>(){})); import
com.fasterxml.jackson.core.type.TypeReference if necessary and update the
Map<String,String> data declaration to remove the unchecked warning.

In
`@src/main/java/com/firstticket/queueservice/queuetoken/infrastructure/redis/RedisQueueTokenRepository.java`:
- Around line 415-447: deleteAllByProgramId() currently collects
userProgramKeysToDelete and calls redisTemplate.delete(...) without verifying
that the stored reverse-index value hasn't changed, which can orphan
concurrently created tokens; fix by recording the expected token id when
scanning (use hashOps.get(tokenKey, FIELD_TOKEN_ID) and build a
Map<String,String> expectedUserProgramValue keyed by the constructed
userProgramKey), then instead of blindly adding userProgramKeysToDelete call a
safe delete: either run a Lua script to atomically DEL only if the key's value
equals the expected token id, or iterate the map and perform a conditional
delete per key (read current value with
redisTemplate.opsForValue().get(userProgramKey) and only delete that
userProgramKey if it equals expected value); update references in this method
(scanKeys, tokenKey loop, userProgramKeysToDelete, programKey,
redisTemplate.delete) or replace the final redisTemplate.delete(...) with the
safe conditional/Lua delete logic.

---

Outside diff comments:
In
`@src/main/java/com/firstticket/queueservice/queuetoken/config/QueueProperties.java`:
- Around line 16-32: QueueProperties currently only has `@NotNull` on waitingTtl,
allowing Duration.ZERO or negative durations; add a compact record constructor
in QueueProperties that validates waitingTtl is non-null and strictly greater
than Duration.ZERO (e.g., if (waitingTtl == null ||
waitingTtl.compareTo(Duration.ZERO) <= 0) throw new
IllegalArgumentException("waitingTtl must be positive")), leaving
admissionBatchSize validation as-is; place the constructor inside the record to
enforce the invariant at creation.

In
`@src/main/java/com/firstticket/queueservice/queuetoken/domain/QueueToken.java`:
- Around line 55-68: The restore and admit flows in QueueToken allow a
null/blank entryToken when TokenStatus == ADMITTED; add invariant checks inside
the QueueToken static factory restore (and the admit method/constructor path) to
validate entryToken is non-null and not blank whenever status ==
TokenStatus.ADMITTED (throw NullPointerException/IllegalArgumentException with a
clear message); update the QueueToken class so that both restore(QueueTokenId,
UserId, ProgramId, IssuedAt, TokenStatus, String) and the admit transition
method enforce the same validation before constructing or mutating the
aggregate.

In
`@src/main/java/com/firstticket/queueservice/queuetoken/domain/vo/ProgramId.java`:
- Around line 1-28: Extract the duplicated ProgramId record into a shared VO
package and update usages: create a single ProgramId record in the shared
package (retain the record declaration, compact constructor null check, static
factory methods of(UUID) and fromString(String), and asString()), remove the
duplicate ProgramId files from queuetoken and programmeta, and change those
modules to import the shared ProgramId; ensure references to ProgramId.of(...),
ProgramId.fromString(...), and ProgramId#asString() continue to compile and
adjust package declarations accordingly.

---

Nitpick comments:
In `@src/main/resources/db/migration/V1__create_inbox_table.sql`:
- Around line 1-7: Add an index on the processed_at column of the p_inbox table
to speed time-based deletes and queries: update the migration that creates
p_inbox (V1__create_inbox_table.sql) to create an index on processed_at (e.g.,
CREATE INDEX idx_p_inbox_processed_at ON p_inbox(processed_at)) so DELETE FROM
p_inbox WHERE processed_at < ? and SELECT ... WHERE processed_at > ? run
efficiently.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: first-ticket/coderabbit/.coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: e2008114-aa38-4490-82bd-560801302c1b

📥 Commits

Reviewing files that changed from the base of the PR and between 30ea9f9 and 1070fbc.

📒 Files selected for processing (47)
  • build.gradle
  • src/main/java/com/firstticket/queueservice/programmeta/application/ProgramMetaService.java
  • src/main/java/com/firstticket/queueservice/programmeta/application/dto/CancelProgramCommand.java
  • src/main/java/com/firstticket/queueservice/programmeta/application/dto/CreateProgramMetaCommand.java
  • src/main/java/com/firstticket/queueservice/programmeta/application/dto/UpdateProgramTimeCommand.java
  • src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMeta.java
  • src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMetaRepository.java
  • src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramStatus.java
  • src/main/java/com/firstticket/queueservice/programmeta/domain/event/ProgramEvents.java
  • src/main/java/com/firstticket/queueservice/programmeta/domain/vo/ProgramId.java
  • src/main/java/com/firstticket/queueservice/programmeta/infrastructure/event/ProgramEventsImpl.java
  • src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/ProgramKafkaConsumer.java
  • src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramCancelledPayload.java
  • src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramCreatedPayload.java
  • src/main/java/com/firstticket/queueservice/programmeta/infrastructure/messaging/payload/ProgramTimeUpdatedPayload.java
  • src/main/java/com/firstticket/queueservice/programmeta/infrastructure/redis/RedisProgramMetaRepository.java
  • src/main/java/com/firstticket/queueservice/queuetoken/application/QueueTokenService.java
  • src/main/java/com/firstticket/queueservice/queuetoken/application/dto/CancelQueueTokenCommand.java
  • src/main/java/com/firstticket/queueservice/queuetoken/application/dto/GetQueueTokenQuery.java
  • src/main/java/com/firstticket/queueservice/queuetoken/application/dto/IssueQueueTokenCommand.java
  • src/main/java/com/firstticket/queueservice/queuetoken/application/dto/QueueTokenResult.java
  • src/main/java/com/firstticket/queueservice/queuetoken/config/JwtProperties.java
  • src/main/java/com/firstticket/queueservice/queuetoken/config/QueueProperties.java
  • src/main/java/com/firstticket/queueservice/queuetoken/domain/QueueToken.java
  • src/main/java/com/firstticket/queueservice/queuetoken/domain/QueueTokenRepository.java
  • src/main/java/com/firstticket/queueservice/queuetoken/domain/TokenStatus.java
  • src/main/java/com/firstticket/queueservice/queuetoken/domain/exception/DuplicateTokenException.java
  • src/main/java/com/firstticket/queueservice/queuetoken/domain/exception/InvalidTokenStateException.java
  • src/main/java/com/firstticket/queueservice/queuetoken/domain/exception/QueueErrorCode.java
  • src/main/java/com/firstticket/queueservice/queuetoken/domain/exception/TokenNotFoundException.java
  • src/main/java/com/firstticket/queueservice/queuetoken/domain/vo/IssuedAt.java
  • src/main/java/com/firstticket/queueservice/queuetoken/domain/vo/ProgramId.java
  • src/main/java/com/firstticket/queueservice/queuetoken/domain/vo/QueueTokenId.java
  • src/main/java/com/firstticket/queueservice/queuetoken/domain/vo/UserId.java
  • src/main/java/com/firstticket/queueservice/queuetoken/infrastructure/event/ProgramCancelledEventListener.java
  • src/main/java/com/firstticket/queueservice/queuetoken/infrastructure/jwt/EntryTokenIssuer.java
  • src/main/java/com/firstticket/queueservice/queuetoken/infrastructure/redis/RedisQueueTokenRepository.java
  • src/main/java/com/firstticket/queueservice/queuetoken/infrastructure/scheduler/AdmissionScheduler.java
  • src/main/java/com/firstticket/queueservice/queuetoken/presentation/QueueSuccessCode.java
  • src/main/java/com/firstticket/queueservice/queuetoken/presentation/QueueTokenController.java
  • src/main/java/com/firstticket/queueservice/queuetoken/presentation/dto/QueueTokenResponse.java
  • src/main/java/com/firstticket/queueservice/shared/event/ProgramCancelledEvent.java
  • src/main/resources/application.yml
  • src/main/resources/db/migration/V1__create_inbox_table.sql
  • src/test/java/com/firstticket/queueservice/queuetoken/domain/QueueTokenTest.java
  • src/test/java/com/firstticket/queueservice/queuetoken/infrastructure/redis/RedisQueueTokenRepositoryTest.java
  • src/test/java/com/firstticket/queueservice/queuetoken/presentation/QueueTokenControllerTest.java

- inbox만 사용하도록 jpaConfig 추가

Related to: #27
-handleCreated 의 취소 회귀 가드
-ProgramMeta 불변식 검증

Related to #27
- 역직렬화 예외 처리 범위 확대
- TypeReference<Map<String, String>>을 사용하여 제네릭 타입 정보 보존

Related to #27
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/test/resources/application-test.yml`:
- Around line 24-28: The YAML uses consumer.auto-startup which is not a
recognized Spring Boot property; change the key from consumer.auto-startup to
listener.auto-startup so the property becomes spring.kafka.listener.auto-startup
(move the auto-startup entry under the listener block alongside
missing-topics-fatal) to ensure the Kafka listener is actually disabled during
tests.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository: first-ticket/coderabbit/.coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: ef9ad8f8-00fd-4e2f-894c-d694670ef0dc

📥 Commits

Reviewing files that changed from the base of the PR and between 1070fbc and 67cd728.

📒 Files selected for processing (6)
  • build.gradle
  • src/main/java/com/firstticket/queueservice/config/JpaConfig.java
  • src/main/java/com/firstticket/queueservice/programmeta/application/ProgramMetaService.java
  • src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMeta.java
  • src/main/java/com/firstticket/queueservice/programmeta/infrastructure/redis/RedisProgramMetaRepository.java
  • src/test/resources/application-test.yml
🚧 Files skipped from review as they are similar to previous changes (4)
  • build.gradle
  • src/main/java/com/firstticket/queueservice/programmeta/domain/ProgramMeta.java
  • src/main/java/com/firstticket/queueservice/programmeta/infrastructure/redis/RedisProgramMetaRepository.java
  • src/main/java/com/firstticket/queueservice/programmeta/application/ProgramMetaService.java

Comment thread src/test/resources/application-test.yml
- application-test.yml 카프카 비활성화 설정 수정
- Status 정규화 로직 추가

Related to #27
코드래빗 리뷰 반영
- ProgramKafkaConsumer: 재시도 가능/불가능 예외 분리 (poison message 방어)

Related to #27
@rlaxxwls13 rlaxxwls13 added the 💻 feature 새로운 기능 개발 label May 17, 2026
@first-ticket first-ticket deleted a comment from coderabbitai Bot May 17, 2026
@rlaxxwls13 rlaxxwls13 merged commit 00999a5 into dev May 17, 2026
3 checks passed
@github-project-automation github-project-automation Bot moved this from Todo to Done in First Ticket May 17, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

💻 feature 새로운 기능 개발

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

[feat][queue-service] program 도메인 이벤트 처리 (Kafka Consumer)

2 participants