본문 바로가기

spring

Spring-Kafka에서 읽은 Record 개수 알기

반응형

카프카를 사용하면서 내가 실질적으로 consume한 record가 몇 개인지 알아야 했다.

max.poll.records = 300 // default 500

물론 위와 같은 설정으로 Kafka consumer가 broker로부터 consume한 record을 컨트롤할 수 있지만 이는 프로퍼티 이름에서 볼 수 있듯이 maximum을 제한할 뿐...

 

내가 원하던 기능은 실제로 consume한 record의 개수를 알아야 했다. 위와 같은 설정에서도 실제 consume한 record의 개수는 100, 150, 200 일수도 있으니깐.

 

spring-kafka을 사용하지 않았으면 아주 간단하게 해결 가능했는데, spring-kafka을 사용하면서 뭔가 복잡해진 것 같은 느낌이 들지만 그래도 RetryTemplate 기능을 쉽게 구현할 수 있었으니 넘어간다.

 

이런저런 삽질을 하던 중 구세주 발견! 

Consumer Interceptor

 

Consumer interceptor을 통해서 @KafkaListener로 넘기기 전에 해당 records의 길이를 알면 가능할 것 같은 생각이 들었다. 하지만 Spring에서 제공하는 기능이 아니라 별도의 설정이 필요하다.

 

주로 third-party 구성으로 모니터링, 로깅 목적으로 사용한다고 하며 구현할 때는 interceptor들 간에 consumer config namespace를 공유하므로 충돌이 나지 않도록 주의할 필요가 있다.

 

뭔가 해결은 했는데 지저분한 느낌을 지울수 없다. 또, 다른 방법이 있는지는 차차 찾아봐야겠다.

 

Ref.

https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerInterceptor.html

 

ConsumerInterceptor (kafka 0.10.0.1 API)

A plugin interface that allows you to intercept (and possibly mutate) records received by the consumer. A primary use-case is for third-party components to hook into the consumer applications for custom monitoring, logging, etc. This class will get consume

kafka.apache.org

https://docs.spring.io/spring-kafka/reference/html/#interceptors

 

Spring for Apache Kafka

When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, such as 2.8.0, you need

docs.spring.io

반응형