카프카를 사용하면서 내가 실질적으로 consume한 record가 몇 개인지 알아야 했다.
max.poll.records = 300 // default 500
물론 위와 같은 설정으로 Kafka consumer가 broker로부터 consume한 record을 컨트롤할 수 있지만 이는 프로퍼티 이름에서 볼 수 있듯이 maximum을 제한할 뿐...
내가 원하던 기능은 실제로 consume한 record의 개수를 알아야 했다. 위와 같은 설정에서도 실제 consume한 record의 개수는 100, 150, 200 일수도 있으니깐.
spring-kafka을 사용하지 않았으면 아주 간단하게 해결 가능했는데, spring-kafka을 사용하면서 뭔가 복잡해진 것 같은 느낌이 들지만 그래도 RetryTemplate 기능을 쉽게 구현할 수 있었으니 넘어간다.
이런저런 삽질을 하던 중 구세주 발견!
Consumer Interceptor
Consumer interceptor을 통해서 @KafkaListener로 넘기기 전에 해당 records의 길이를 알면 가능할 것 같은 생각이 들었다. 하지만 Spring에서 제공하는 기능이 아니라 별도의 설정이 필요하다.
주로 third-party 구성으로 모니터링, 로깅 목적으로 사용한다고 하며 구현할 때는 interceptor들 간에 consumer config namespace를 공유하므로 충돌이 나지 않도록 주의할 필요가 있다.
뭔가 해결은 했는데 지저분한 느낌을 지울수 없다. 또, 다른 방법이 있는지는 차차 찾아봐야겠다.
Ref.
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerInterceptor.html
https://docs.spring.io/spring-kafka/reference/html/#interceptors
'spring' 카테고리의 다른 글
[Gradle]Multi Application.properties 사용하기 (1) | 2024.03.23 |
---|---|
스프링부트 2.2.10에서 2.6.6으로 upgrade (0) | 2022.04.05 |
Static Dispatch, Dynamic Method Dispatch (0) | 2022.03.31 |
Post로 전달된 json Body 여러번 읽기 (0) | 2021.09.12 |