본문 바로가기
kafka

spring kafka HealthIndicator 사용하여 HealthCheck 구현

by 토망이 2023. 7. 31.

본 글에서는 로컬 서버에 설치한 Kafka의 HealthCheck를 구현해보려 합니다. 우선 로컬 pc에 kafka 설치후, zookeeper와 kafka 서버가 둘다 구동 후 topic 설정 마쳐야합니다. 이전글의 4번까지 완료 되어 있어야합니다.

kafka Window 환경에서 로컬 설치 후 pub/sub 테스트

 

kafka Window 환경에서 로컬 설치 후 pub/sub 테스트

본 글에서는 마이크로서비스아키텍쳐에서 주로 사용하는 kafka를 로컬 window PC에 설치하여 간단하게 pub/sub하는 것을 구현해보겠습니다. - kafka 설치 https://kafka.apache.org/downloads 먼저 다운로드 주소

dnl1029.tistory.com

 

- pom.xml

: HealthIndicator를 사용하기 위해 actuator 의존성을 추가해줍니다.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

 

- KafkaHealthCheckConfig

: springboot 3.0.9 버전 기준으로 아래와같이 health체크를 구현할 수 있습니다. KafkaAdmin은 spring kafka사용시 autoconfiguration을 통해 bean이 만들어져있으니, 그대로 주입해서 사용하면 됩니다. 

@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaHealthCheckConfig {

    private static final String topic = "kyu";
    private final KafkaAdmin kafkaAdmin;

    @Bean
    public AdminClient kafkaAdminClient() {
        return AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    @Qualifier("kafkaHealthIndicator")
    @Bean("kafkaHealthIndicator")
    public HealthIndicator kafkaHealthIndicator(AdminClient adminClient) {
        return new AbstractHealthIndicator() {
            @Override
            protected void doHealthCheck(Health.Builder builder) throws Exception {
                KafkaFuture<TopicDescription> kafkaFuture = adminClient.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic);
                try{
                    TopicDescription topicDescription = kafkaFuture.get(3000, TimeUnit.MILLISECONDS);
                    log.info("kafka server alive. topicDescription : {}",topicDescription);
                    builder.up().build();
                }
                catch (TimeoutException | ExecutionException | InterruptedException | NullPointerException e) {
                    builder.down(e).build();
                }
            }
        };
    }

}

 

- Service

@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaHealthCheckService {

    @Qualifier("kafkaHealthIndicator")
    private final HealthIndicator kafkaHealthIndicator;

    @Scheduled(fixedDelay = 5000L)
    public void kafkaHealthCheck() {
        Health health = kafkaHealthIndicator.health();
        log.info("health.getDetails : {}, health.getStatus : {}"
                ,health.getDetails()
                ,health.getStatus());
    }

}

 

- Main

 : 스케쥴을 사용하기 위해 @EnableScheduling을 까먹지 말고 추가합니다.

@SpringBootApplication
@EnableScheduling
public class KafkaApplication {

   public static void main(String[] args) {
      SpringApplication.run(KafkaApplication.class, args);
   }

}

 

- 결과

 

: 로컬 kafka 서버가 정상적으로 올라가있을때는 아래와 같이 5초마다 UP이라는 Status를 확인할수있습니다.

 

이제 kafka 서버를 강제로 종료해보겠습니다. health의 Status가 DOWN으로 바뀌었네요.

 

이 상태에서 kafka 로컬서버를 다시 구동시켜보겠습니다.

Re joining group이라는 메시지가 뜨면서 정상적으로 복구된것을 확인할 수 있습니다.

 

댓글