본 글에서는 로컬 서버에 설치한 Kafka의 HealthCheck를 구현해보려 합니다. 우선 로컬 pc에 kafka 설치후, zookeeper와 kafka 서버가 둘다 구동 후 topic 설정 마쳐야합니다. 이전글의 4번까지 완료 되어 있어야합니다.
kafka Window 환경에서 로컬 설치 후 pub/sub 테스트
kafka Window 환경에서 로컬 설치 후 pub/sub 테스트
본 글에서는 마이크로서비스아키텍쳐에서 주로 사용하는 kafka를 로컬 window PC에 설치하여 간단하게 pub/sub하는 것을 구현해보겠습니다. - kafka 설치 https://kafka.apache.org/downloads 먼저 다운로드 주소
dnl1029.tistory.com
- pom.xml
: HealthIndicator를 사용하기 위해 actuator 의존성을 추가해줍니다.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
- KafkaHealthCheckConfig
: springboot 3.0.9 버전 기준으로 아래와같이 health체크를 구현할 수 있습니다. KafkaAdmin은 spring kafka사용시 autoconfiguration을 통해 bean이 만들어져있으니, 그대로 주입해서 사용하면 됩니다.
@Slf4j
@Configuration
@RequiredArgsConstructor
public class KafkaHealthCheckConfig {
private static final String topic = "kyu";
private final KafkaAdmin kafkaAdmin;
@Bean
public AdminClient kafkaAdminClient() {
return AdminClient.create(kafkaAdmin.getConfigurationProperties());
}
@Qualifier("kafkaHealthIndicator")
@Bean("kafkaHealthIndicator")
public HealthIndicator kafkaHealthIndicator(AdminClient adminClient) {
return new AbstractHealthIndicator() {
@Override
protected void doHealthCheck(Health.Builder builder) throws Exception {
KafkaFuture<TopicDescription> kafkaFuture = adminClient.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic);
try{
TopicDescription topicDescription = kafkaFuture.get(3000, TimeUnit.MILLISECONDS);
log.info("kafka server alive. topicDescription : {}",topicDescription);
builder.up().build();
}
catch (TimeoutException | ExecutionException | InterruptedException | NullPointerException e) {
builder.down(e).build();
}
}
};
}
}
- Service
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaHealthCheckService {
@Qualifier("kafkaHealthIndicator")
private final HealthIndicator kafkaHealthIndicator;
@Scheduled(fixedDelay = 5000L)
public void kafkaHealthCheck() {
Health health = kafkaHealthIndicator.health();
log.info("health.getDetails : {}, health.getStatus : {}"
,health.getDetails()
,health.getStatus());
}
}
- Main
: 스케쥴을 사용하기 위해 @EnableScheduling을 까먹지 말고 추가합니다.
@SpringBootApplication
@EnableScheduling
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
- 결과
: 로컬 kafka 서버가 정상적으로 올라가있을때는 아래와 같이 5초마다 UP이라는 Status를 확인할수있습니다.
이제 kafka 서버를 강제로 종료해보겠습니다. health의 Status가 DOWN으로 바뀌었네요.
이 상태에서 kafka 로컬서버를 다시 구동시켜보겠습니다.
Re joining group이라는 메시지가 뜨면서 정상적으로 복구된것을 확인할 수 있습니다.
'kafka' 카테고리의 다른 글
spring kafka ConsumerAwareRebalanceListener 구현 (0) | 2023.07.22 |
---|---|
spring kafka pub/sub 구현 (0) | 2023.07.22 |
kafka Window 환경에서 로컬 설치 후 pub/sub 테스트 (0) | 2023.07.21 |
댓글